Skip to content

batch_methods

batch.batch_methods

delete_job

delete_job(simulation, user, cluster)

Dummy version of delete_job

Source code in yaptide/batch/batch_methods.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def delete_job(simulation: BatchSimulationModel, user: KeycloakUserModel,
               cluster: ClusterModel) -> tuple[dict, int]:  # skipcq: PYL-W0613
    """Dummy version of delete_job"""
    job_dir = simulation.job_dir
    array_id = simulation.array_id
    collect_id = simulation.collect_id

    try:
        con = get_connection(user=user, cluster=cluster)

        con.run(f'scancel {array_id}')
        con.run(f'scancel {collect_id}')
        con.run(f'rm -rf {job_dir}')
    except Exception as e:  # skipcq: PYL-W0703
        logging.error(e)
        return {"message": "Job cancelation failed"}, 500

    return {"message": "Job canceled"}, 200

get_cluster

get_cluster(db_con, metadata, clusterId)

Queries database for user

Source code in yaptide/batch/batch_methods.py
41
42
43
44
45
46
47
48
49
50
def get_cluster(db_con, metadata, clusterId):
    """Queries database for user"""
    clusters = metadata.tables[TableTypes.Cluster.name]
    stmt = db.select(clusters).filter_by(id=clusterId)
    try:
        cluster: ClusterModel = db_con.execute(stmt).first()
    except Exception:
        logging.error('Error getting cluster object with id: %s from database', str(clusterId))
        return None
    return cluster

get_connection

get_connection(user, cluster)

Returns connection object to cluster

Source code in yaptide/batch/batch_methods.py
53
54
55
56
57
58
59
60
61
62
63
64
def get_connection(user: KeycloakUserModel, cluster: ClusterModel) -> Connection:
    """Returns connection object to cluster"""
    pkey = RSAKey.from_private_key(io.StringIO(user.private_key))
    pkey.load_certificate(user.cert)

    con = Connection(host=f"{user.username}@{cluster.cluster_name}",
                     connect_kwargs={
                         "pkey": pkey,
                         "allow_agent": False,
                         "look_for_keys": False
                     })
    return con

get_job_results

get_job_results(simulation, user, cluster)

Returns simulation results

Source code in yaptide/batch/batch_methods.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def get_job_results(simulation: BatchSimulationModel, user: KeycloakUserModel, cluster: ClusterModel) -> dict:
    """Returns simulation results"""
    job_dir = simulation.job_dir
    collect_id = simulation.collect_id

    con = get_connection(user=user, cluster=cluster)

    fabric_result: Result = con.run(f'sacct -j {collect_id} --format State', hide=True)
    collect_state = fabric_result.stdout.split()[-1].split()[0]

    if collect_state == "COMPLETED":
        fabric_result: Result = con.run(f'ls -f {job_dir}/output | grep .json', hide=True)
        result_estimators = []
        with tempfile.TemporaryDirectory() as tmp_dir_path:
            for filename in fabric_result.stdout.split():
                file_path = Path(tmp_dir_path, filename)
                with open(file_path, "wb") as writer:
                    con.get(f'{job_dir}/output/{filename}', writer)
                with open(file_path, "r") as json_file:
                    est_dict = json.load(json_file)
                    est_dict["name"] = filename.split('.')[0]
                    result_estimators.append(est_dict)

        return {"estimators": result_estimators}
    return {"message": "Results not available"}

get_job_status

get_job_status(simulation, user, cluster)

Get SLURM job status

Source code in yaptide/batch/batch_methods.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_job_status(simulation: BatchSimulationModel, user: KeycloakUserModel, cluster: ClusterModel) -> dict:
    """Get SLURM job status"""
    array_id = simulation.array_id
    collect_id = simulation.collect_id

    con = get_connection(user=user, cluster=cluster)

    fabric_result: Result = con.run(f'sacct -j {array_id} --format State', hide=True)
    job_state = fabric_result.stdout.split()[-1].split()[0]

    fabric_result: Result = con.run(f'sacct -j {collect_id} --format State', hide=True)
    collect_state = fabric_result.stdout.split()[-1].split()[0]

    if job_state == "FAILED" or collect_state == "FAILED":
        return {"job_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")}
    if collect_state == "COMPLETED":
        return {"job_state": EntityState.COMPLETED.value, "end_time": datetime.utcnow().isoformat(sep=" ")}
    if collect_state == "RUNNING":
        logging.debug("Collect job is in RUNNING state")
        return {"job_state": EntityState.MERGING_RUNNING.value}
    if job_state == "COMPLETED" and collect_state == "PENDING":
        logging.debug("Collect job is in PENDING state")
        return {"job_state": EntityState.MERGING_QUEUED.value}
    if job_state == "RUNNING":
        logging.debug("Main job is in RUNNING state")
    if job_state == "PENDING":
        logging.debug("Main job is in PENDING state")

    return {"job_state": EntityState.RUNNING.value}

get_user

get_user(db_con, metadata, userId)

Queries database for user

Source code in yaptide/batch/batch_methods.py
26
27
28
29
30
31
32
33
34
35
36
37
38
def get_user(db_con, metadata, userId):
    """Queries database for user"""
    users = metadata.tables[TableTypes.User.name]
    keycloackUsers = metadata.tables[TableTypes.KeycloakUser.name]
    stmt = db.select(users,
                     keycloackUsers).select_from(users).join(keycloackUsers,
                                                             KeycloakUserModel.id == UserModel.id).filter_by(id=userId)
    try:
        user: KeycloakUserModel = db_con.execute(stmt).first()
    except Exception:
        logging.error('Error getting user object wiht id: %s from database', str(userId))
        return None
    return user

post_update

post_update(dict_to_send)

For sending requests with information to flask

Source code in yaptide/batch/batch_methods.py
67
68
69
70
def post_update(dict_to_send):
    """For sending requests with information to flask"""
    flask_url = os.environ.get("BACKEND_INTERNAL_URL")
    return requests.Session().post(url=f"{flask_url}/jobs", json=dict_to_send)

prepare_script_files

prepare_script_files(
    payload_dict, job_dir, sim_id, update_key, con
)

Prepares script files to run them on cluster

Source code in yaptide/batch/batch_methods.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def prepare_script_files(payload_dict: dict, job_dir: str, sim_id: int, update_key: str,
                         con: Connection) -> tuple[str, dict]:
    """Prepares script files to run them on cluster"""
    submit_file = f'{job_dir}/yaptide_submitter.sh'
    array_file = f'{job_dir}/array_script.sh'
    collect_file = f'{job_dir}/collect_script.sh'

    array_options = convert_dict_to_sbatch_options(payload_dict=payload_dict, target_key="array_options")
    array_header = extract_sbatch_header(payload_dict=payload_dict, target_key="array_header")

    collect_options = convert_dict_to_sbatch_options(payload_dict=payload_dict, target_key="collect_options")
    collect_header = extract_sbatch_header(payload_dict=payload_dict, target_key="collect_header")

    backend_url = os.environ.get("BACKEND_EXTERNAL_URL", "")

    submit_script = SUBMIT_SHIELDHIT.format(array_options=array_options,
                                            collect_options=collect_options,
                                            root_dir=job_dir,
                                            n_tasks=str(payload_dict["ntasks"]),
                                            convertmc_version=pymchelper.__version__)
    array_script = ARRAY_SHIELDHIT_BASH.format(array_header=array_header,
                                               root_dir=job_dir,
                                               sim_id=sim_id,
                                               update_key=update_key,
                                               backend_url=backend_url)
    collect_script = COLLECT_BASH.format(collect_header=collect_header,
                                         root_dir=job_dir,
                                         clear_bdos="true",
                                         sim_id=sim_id,
                                         update_key=update_key,
                                         backend_url=backend_url)

    con.run(f'echo \'{array_script}\' >> {array_file}')
    con.run(f'chmod +x {array_file}')
    con.run(f'echo \'{submit_script}\' >> {submit_file}')
    con.run(f'chmod +x {submit_file}')
    con.run(f'echo \'{collect_script}\' >> {collect_file}')
    con.run(f'chmod +x {collect_file}')

    return submit_file, {"submit": submit_script, "array": array_script, "collect": collect_script}

submit_job

submit_job(
    payload_dict,
    files_dict,
    userId,
    clusterId,
    sim_id,
    update_key,
)

Submits job to cluster

Source code in yaptide/batch/batch_methods.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@celery_app.task()
def submit_job(  # skipcq: PY-R1000
        payload_dict: dict, files_dict: dict, userId: int, clusterId: int, sim_id: int, update_key: str):
    """Submits job to cluster"""
    utc_now = int(datetime.utcnow().timestamp() * 1e6)
    try:
        db_con, metadata, _ = connect_to_db(
        )  # Connection to database and quering objects looks like that because celery task works outside flask context
    except Exception as e:
        logging.error('Async worker couldn\'t connect to db. Error message:"%s"', str(e))

    user = get_user(db_con=db_con, metadata=metadata, userId=userId)
    cluster = get_cluster(db_con=db_con, metadata=metadata, clusterId=clusterId)

    if user.cert is None or user.private_key is None:
        dict_to_send = {
            "sim_id": sim_id,
            "job_state": EntityState.FAILED.value,
            "update_key": update_key,
            "log": {
                "error": f"User {user.username} has no certificate or private key"
            }
        }
        post_update(dict_to_send)
        return

    try:
        con = get_connection(user=user, cluster=cluster)
        fabric_result: Result = con.run("echo $SCRATCH", hide=True)
    except Exception as e:
        dict_to_send = {
            "sim_id": sim_id,
            "job_state": EntityState.FAILED.value,
            "update_key": update_key,
            "log": {
                "error": str(e)
            }
        }
        post_update(dict_to_send)
        return

    scratch = fabric_result.stdout.split()[0]
    logging.debug("Scratch directory: %s", scratch)

    job_dir = f"{scratch}/yaptide_runs/{utc_now}"
    logging.debug("Job directory: %s", job_dir)

    try:
        con.run(f"mkdir -p {job_dir}")
    except Exception as e:
        dict_to_send = {"sim_id": sim_id, "job_state": EntityState.FAILED.value, "update_key": update_key}
        post_update(dict_to_send)
        return
    with tempfile.TemporaryDirectory() as tmp_dir_path:
        logging.debug("Preparing simulation input in: %s", tmp_dir_path)
        zip_path = Path(tmp_dir_path) / "input.zip"
        write_simulation_input_files(files_dict=files_dict, output_dir=Path(tmp_dir_path))
        logging.debug("Zipping simulation input to %s", zip_path)
        with ZipFile(zip_path, mode="w") as archive:
            for file in Path(tmp_dir_path).iterdir():
                if file.name == "input.zip":
                    continue
                archive.write(file, arcname=file.name)
        con.put(zip_path, job_dir)
        logging.debug("Transfering simulation input %s to %s", zip_path, job_dir)

    WATCHER_SCRIPT = Path(__file__).parent.resolve() / "watcher.py"
    SIMULATION_DATA_SENDER_SCRIPT = Path(__file__).parent.resolve() / "simulation_data_sender.py"

    logging.debug("Transfering watcher script %s to %s", WATCHER_SCRIPT, job_dir)
    con.put(WATCHER_SCRIPT, job_dir)
    logging.debug("Transfering result sender script %s to %s", SIMULATION_DATA_SENDER_SCRIPT, job_dir)
    con.put(SIMULATION_DATA_SENDER_SCRIPT, job_dir)

    submit_file, sh_files = prepare_script_files(payload_dict=payload_dict,
                                                 job_dir=job_dir,
                                                 sim_id=sim_id,
                                                 update_key=update_key,
                                                 con=con)

    array_id = collect_id = None
    if not submit_file.startswith(job_dir):
        logging.error("Invalid submit file path: %s", submit_file)
        dict_to_send = {
            "sim_id": sim_id,
            "job_state": EntityState.FAILED.value,
            "update_key": update_key,
            "log": {
                "error": "Job submission failed due to invalid submit file path"
            }
        }
        post_update(dict_to_send)
        return
    fabric_result: Result = con.run(f'sh {submit_file}', hide=True)
    submit_stdout = fabric_result.stdout
    submit_stderr = fabric_result.stderr
    for line in submit_stdout.split("\n"):
        if line.startswith("Job id"):
            try:
                array_id = int(line.split()[-1])
            except (ValueError, IndexError):
                logging.error("Could not parse array id from line: %s", line)
        if line.startswith("Collect id"):
            try:
                collect_id = int(line.split()[-1])
            except (ValueError, IndexError):
                logging.error("Could not parse collect id from line: %s", line)

    if array_id is None or collect_id is None:
        logging.debug("Job submission failed")
        logging.debug("Sbatch stdout: %s", submit_stdout)
        logging.debug("Sbatch stderr: %s", submit_stderr)
        dict_to_send = {
            "sim_id": sim_id,
            "job_state": EntityState.FAILED.value,
            "update_key": update_key,
            "log": {
                "message": "Job submission failed",
                "submit_stdout": submit_stdout,
                "sh_files": sh_files,
                "submit_stderr": submit_stderr
            }
        }
        post_update(dict_to_send)
        return

    dict_to_send = {
        "sim_id": sim_id,
        "update_key": update_key,
        "job_dir": job_dir,
        "array_id": array_id,
        "collect_id": collect_id,
        "submit_stdout": submit_stdout,
        "sh_files": sh_files
    }
    post_update(dict_to_send)
    return