Skip to content

tasks

celery.tasks

MonitorTask dataclass

Class representing monitoring task

Source code in yaptide/celery/tasks.py
282
283
284
285
286
287
@dataclass
class MonitorTask:
    """Class representing monitoring task"""

    path_to_monitor: Path
    task: threading.Thread

path_to_monitor instance-attribute

path_to_monitor

task instance-attribute

task

__init__

__init__(path_to_monitor, task)

SimulationTaskResult dataclass

Class representing result of single simulation task

Source code in yaptide/celery/tasks.py
120
121
122
123
124
125
126
127
128
129
@dataclass
class SimulationTaskResult:
    """Class representing result of single simulation task"""

    process_exit_success: bool
    command_stdout: str
    command_stderr: str
    simulated_primaries: int
    requested_primaries: int
    estimators_dict: dict

command_stderr instance-attribute

command_stderr

command_stdout instance-attribute

command_stdout

estimators_dict instance-attribute

estimators_dict

process_exit_success instance-attribute

process_exit_success

requested_primaries instance-attribute

requested_primaries

simulated_primaries instance-attribute

simulated_primaries

__init__

__init__(
    process_exit_success,
    command_stdout,
    command_stderr,
    simulated_primaries,
    requested_primaries,
    estimators_dict,
)

convert_input_files

convert_input_files(payload_dict)

Function converting output

Source code in yaptide/celery/tasks.py
21
22
23
24
25
@celery_app.task
def convert_input_files(payload_dict: dict) -> dict:
    """Function converting output"""
    files_dict = check_and_convert_payload_to_files_dict(payload_dict=payload_dict)
    return {"input_files": files_dict}

merge_results

merge_results(results)

Merge results from multiple simulation's tasks

Source code in yaptide/celery/tasks.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@celery_app.task
def merge_results(results: list[dict]) -> dict:
    """Merge results from multiple simulation's tasks"""
    logging.debug("Merging results from %d tasks", len(results))
    logfiles = {}

    averaged_estimators = None
    simulation_id = results[0].pop("simulation_id", None)
    update_key = results[0].pop("update_key", None)
    if simulation_id and update_key:
        dict_to_send = {
            "sim_id": simulation_id,
            "job_state": EntityState.MERGING_RUNNING.value,
            "update_key": update_key
        }
        post_update(dict_to_send)
    for i, result in enumerate(results):
        if simulation_id is None:
            simulation_id = result.pop("simulation_id", None)
        if update_key is None:
            update_key = result.pop("update_key", None)
        if "logfiles" in result:
            logfiles.update(result["logfiles"])
            continue

        if averaged_estimators is None:
            averaged_estimators: list[dict] = result.get("estimators", [])
            # There is nothing to average yet
            continue

        averaged_estimators = average_estimators(averaged_estimators, result.get("estimators", []), i)

    final_result = {"end_time": datetime.utcnow().isoformat(sep=" ")}

    if len(logfiles.keys()) > 0 and not send_simulation_logfiles(
            simulation_id=simulation_id, update_key=update_key, logfiles=logfiles):
        final_result["logfiles"] = logfiles

    if averaged_estimators:
        # send results to the backend and mark whole simulation as completed
        sending_results_ok = send_simulation_results(simulation_id=simulation_id,
                                                     update_key=update_key,
                                                     estimators=averaged_estimators)
        if not sending_results_ok:
            final_result["estimators"] = averaged_estimators

    return final_result

monitor_fluka

monitor_fluka(
    event, tmp_work_dir, task_id, update_key, simulation_id
)

Function running the monitoring process for Fluka simulation

Source code in yaptide/celery/tasks.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def monitor_fluka(event: threading.Event, tmp_work_dir: str, task_id: int, update_key: str,
                  simulation_id: int) -> Optional[MonitorTask]:
    """Function running the monitoring process for Fluka simulation"""
    # we would like to monitor the progress of simulation
    # this is done by reading the log file and sending the updates to the backend
    # if we have update_key and simulation_id the monitoring task can submit the updates to backend
    # We use dir instead path, because fluka simulator generates direcoty with PID in name of its process
    dir_to_monitor = Path(tmp_work_dir)
    if update_key and simulation_id is not None:
        current_logging_level = logging.getLogger().getEffectiveLevel()
        task = threading.Thread(target=read_fluka_file,
                                kwargs=dict(event=event,
                                            dirpath=dir_to_monitor,
                                            simulation_id=simulation_id,
                                            task_id=task_id,
                                            update_key=update_key,
                                            logging_level=current_logging_level))

        task.start()
        logging.info("Started monitoring process for task %d", task_id)
        return MonitorTask(path_to_monitor=dir_to_monitor, task=task)

    logging.info("No monitoring processes started for task %d", task_id)
    return None

monitor_shieldhit

monitor_shieldhit(
    event, tmp_work_dir, task_id, update_key, simulation_id
)

Function monitoring progress of SHIELD-HIT12A simulation

Source code in yaptide/celery/tasks.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def monitor_shieldhit(event: threading.Event, tmp_work_dir: str, task_id: int, update_key: str,
                      simulation_id: str) -> Optional[MonitorTask]:
    """Function monitoring progress of SHIELD-HIT12A simulation"""
    # we would like to monitor the progress of simulation
    # this is done by reading the log file and sending the updates to the backend
    # if we have update_key and simulation_id the monitoring task can submit the updates to backend
    path_to_monitor = Path(tmp_work_dir) / f"shieldhit_{task_id:04d}.log"
    if update_key and simulation_id is not None:
        current_logging_level = logging.getLogger().getEffectiveLevel()
        task = threading.Thread(target=read_file,
                                kwargs=dict(event=event,
                                            filepath=path_to_monitor,
                                            simulation_id=simulation_id,
                                            task_id=task_id,
                                            update_key=update_key,
                                            logging_level=current_logging_level))
        task.start()
        logging.info("Started monitoring process for task %d", task_id)
        return MonitorTask(path_to_monitor=path_to_monitor, task=task)

    logging.info("No monitoring processes started for task %d", task_id)
    return None

run_single_simulation

run_single_simulation(
    self,
    files_dict,
    task_id,
    update_key="",
    simulation_id=None,
    keep_tmp_files=False,
    sim_type="shieldhit",
)

Function running single simulation

Source code in yaptide/celery/tasks.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@celery_app.task(bind=True)
def run_single_simulation(self,
                          files_dict: dict,
                          task_id: int,
                          update_key: str = '',
                          simulation_id: int = None,
                          keep_tmp_files: bool = False,
                          sim_type: str = 'shieldhit') -> dict:
    """Function running single simulation"""
    # for the purpose of running this function in pytest we would like to have some control
    # on the temporary directory used by the function

    logging.info("Running simulation, simulation_id: %s, task_id: %d", simulation_id, task_id)

    logging.info("Sending initial update for task %d, setting celery id %s", task_id, self.request.id)
    send_task_update(simulation_id, task_id, update_key, {"celery_id": self.request.id})

    # we would like to have some control on the temporary directory used by the function
    tmp_dir = get_tmp_dir()
    logging.info("Temporary directory is: %s", tmp_dir)

    # with tempfile.TemporaryDirectory(dir=tmp_dir) as tmp_dir_path:
    # use the selected temporary directory to create a temporary directory
    with (contextlib.nullcontext(tempfile.mkdtemp(dir=tmp_dir)) if keep_tmp_files else tempfile.TemporaryDirectory(
            dir=tmp_dir)) as tmp_work_dir:

        write_simulation_input_files(files_dict=files_dict, output_dir=Path(tmp_work_dir))
        logging.debug("Generated input files: %s", files_dict.keys())

        if sim_type == 'shieldhit':
            simulation_result = run_single_simulation_for_shieldhit(tmp_work_dir, task_id, update_key, simulation_id)
        elif sim_type == 'fluka':
            simulation_result = run_single_simulation_for_fluka(tmp_work_dir, task_id, update_key, simulation_id)

        # there is no simulation output
        if not simulation_result.estimators_dict:
            # first we notify the backend that the task with simulation has failed
            logging.info("Simulation failed for task %d, sending update that it has failed", task_id)
            update_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")}
            send_task_update(simulation_id, task_id, update_key, update_dict)

            # then we send the logfiles to the backend, if available
            logfiles = simulation_logfiles(path=Path(tmp_work_dir))
            logging.info("Simulation failed, logfiles: %s", logfiles.keys())
            # the method below is in particular broken,
            # as there may be several logfiles, for some of the tasks
            # lets imagine following sequence of actions:
            # task 1 fails, with some usefule message in the logfile,
            # i.e. after 100 primaries the SHIELD-HIT12A binary crashed
            # then the useful logfiles are being sent to the backend
            # task 2 fails later, but here the SHIELD-HIT12A binary crashes
            # at the beginning of the simulation, without producing of the logfiles
            # then again the logfiles are being sent to the backend, but this time they are empty
            # so the useful logfiles are overwritten by the empty ones
            # we temporarily disable sending logfiles to the backend
            # if logfiles:
            #     pass
            # sending_logfiles_status = send_simulation_logfiles(simulation_id=simulation_id,
            #                                                 update_key=update_key,
            #                                                 logfiles=logfiles)
            # if not sending_logfiles_status:
            #     logging.error("Sending logfiles failed for task %s", task_id)

            # finally we return from the celery task, returning the logfiles and stdout/stderr as result
            return {
                "logfiles": logfiles,
                "stdout": simulation_result.command_stdout,
                "stderr": simulation_result.command_stderr,
                "simulation_id": simulation_id,
                "update_key": update_key
            }

        # otherwise we have simulation output
        logging.debug("Converting simulation results to JSON")
        estimators = estimators_to_list(estimators_dict=simulation_result.estimators_dict, dir_path=Path(tmp_work_dir))

    # We do not have any information if monitoring process sent the last update
    # so we send it here to make sure that we have the end_time and COMPLETED state
    end_time = datetime.utcnow().isoformat(sep=" ")
    update_dict = {
        "task_state": EntityState.COMPLETED.value,
        "end_time": end_time,
        "simulated_primaries": simulation_result.requested_primaries,
        "requested_primaries": simulation_result.requested_primaries
    }
    send_task_update(simulation_id, task_id, update_key, update_dict)

    # finally return from the celery task, returning the estimators and stdout/stderr as result
    # the estimators will be merged by subsequent celery task
    return {"estimators": estimators, "simulation_id": simulation_id, "update_key": update_key}

run_single_simulation_for_fluka

run_single_simulation_for_fluka(
    tmp_work_dir, task_id, update_key="", simulation_id=None
)

Function running single simulation for shieldhit

Source code in yaptide/celery/tasks.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def run_single_simulation_for_fluka(tmp_work_dir: str,
                                    task_id: int,
                                    update_key: str = '',
                                    simulation_id: Optional[int] = None) -> SimulationTaskResult:
    """Function running single simulation for shieldhit"""
    command_as_list = command_to_run_fluka(dir_path=Path(tmp_work_dir), task_id=task_id)
    logging.info("Command to run FLUKA: %s", " ".join(command_as_list))

    command_stdout, command_stderr = '', ''
    simulated_primaries, requested_primaries = 0, 0
    event = threading.Event()
    # start monitoring process if possible
    # is None if monitoring if monitor was not started
    task_monitor = monitor_fluka(event, tmp_work_dir, task_id, update_key, simulation_id)

    # run the simulation
    logging.info("Running Fluka process in %s", tmp_work_dir)
    process_exit_success, command_stdout, command_stderr = execute_simulation_subprocess(
        dir_path=Path(tmp_work_dir), command_as_list=command_as_list)
    logging.info("Fluka process finished with status %s", process_exit_success)

    # terminate monitoring process
    if task_monitor:
        logging.debug("Terminating monitoring process for task %s", task_id)
        event.set()
        task_monitor.task.join()
        logging.debug("Monitoring process for task %s terminated", task_id)
    # TO BE IMPLEMENTED
    # if watcher didn't finish yet, we need to read the log file and send the last update to the backend
    # reading of the log file for fluka after simulation was finished
    # fluka copies the file back to main directory from temporary directory

    # both simulation execution and monitoring process are finished now, we can read the estimators
    estimators_dict = get_fluka_estimators(dir_path=Path(tmp_work_dir))

    return SimulationTaskResult(process_exit_success=process_exit_success,
                                command_stdout=command_stdout,
                                command_stderr=command_stderr,
                                simulated_primaries=simulated_primaries,
                                requested_primaries=requested_primaries,
                                estimators_dict=estimators_dict)

run_single_simulation_for_shieldhit

run_single_simulation_for_shieldhit(
    tmp_work_dir,
    task_id,
    update_key="",
    simulation_id=Optional[None],
)

Function running single simulation for shieldhit

Source code in yaptide/celery/tasks.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def run_single_simulation_for_shieldhit(tmp_work_dir: str,
                                        task_id: int,
                                        update_key: str = '',
                                        simulation_id: int = Optional[None]) -> SimulationTaskResult:
    """Function running single simulation for shieldhit"""
    command_as_list = command_to_run_shieldhit(dir_path=Path(tmp_work_dir), task_id=task_id)
    logging.info("Command to run SHIELD-HIT12A: %s", " ".join(command_as_list))

    command_stdout, command_stderr = '', ''
    simulated_primaries, requested_primaries = 0, 0
    event = threading.Event()

    # start monitoring process if possible
    # is None if monitoring if monitor was not started
    task_monitor = monitor_shieldhit(event, tmp_work_dir, task_id, update_key, simulation_id)
    # run the simulation
    logging.info("Running SHIELD-HIT12A process in %s", tmp_work_dir)
    process_exit_success, command_stdout, command_stderr = execute_simulation_subprocess(
        dir_path=Path(tmp_work_dir), command_as_list=command_as_list)
    logging.info("SHIELD-HIT12A process finished with status %s", process_exit_success)

    # terminate monitoring process
    if task_monitor:
        logging.debug("Terminating monitoring process for task %d", task_id)
        event.set()
        task_monitor.task.join()
        logging.debug("Monitoring process for task %d terminated", task_id)
    # if watcher didn't finish yet, we need to read the log file and send the last update to the backend
    if task_monitor:
        simulated_primaries, requested_primaries = read_file_offline(task_monitor.path_to_monitor)

    # both simulation execution and monitoring process are finished now, we can read the estimators
    estimators_dict = get_shieldhit_estimators(dir_path=Path(tmp_work_dir))

    return SimulationTaskResult(process_exit_success=process_exit_success,
                                command_stdout=command_stdout,
                                command_stderr=command_stderr,
                                simulated_primaries=simulated_primaries,
                                requested_primaries=requested_primaries,
                                estimators_dict=estimators_dict)

set_merging_queued_state

set_merging_queued_state(results)

Celery task to set simulation state as MERGING_QUEUED

Source code in yaptide/celery/tasks.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@celery_app.task
def set_merging_queued_state(results: list[dict]) -> list[dict]:
    """Celery task to set simulation state as MERGING_QUEUED"""
    logging.debug("send_state")
    simulation_id = results[0].get("simulation_id", None)
    update_key = results[0].get("update_key", None)
    if simulation_id and update_key:
        dict_to_send = {
            "sim_id": simulation_id,
            "job_state": EntityState.MERGING_QUEUED.value,
            "update_key": update_key
        }
        post_update(dict_to_send)
    return results