Skip to content

manage_tasks

celery.utils.manage_tasks

cancel_job

cancel_job(merge_id, celery_ids)

Cancels simulation

Source code in yaptide/celery/utils/manage_tasks.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def cancel_job(merge_id: str, celery_ids: list[str]) -> dict:
    """Cancels simulation"""

    def cancel_task(job_id: str, state_key: str) -> dict:
        """Cancels (if possible) every task in the workflow"""
        job = AsyncResult(id=job_id, app=celery_app)
        job_state: str = translate_celery_state_naming(job.state)

        if job_state in [EntityState.CANCELED.value, EntityState.COMPLETED.value, EntityState.FAILED.value]:
            logging.warning("Cannot cancel job %s which is already %s", job_id, job_state)
            return {state_key: job_state, "message": f"Job already {job_state}"}
        try:
            celery_app.control.revoke(job_id, terminate=True, signal="SIGINT")
        except Exception as e:  # skipcq: PYL-W0703
            logging.error("Cannot cancel job %s, due to %s", job_id, e)
            return {
                state_key: job_state,
                "message": f"Cannot cancel job {job_id}, leaving at current state {job_state}"
            }

        return {state_key: EntityState.CANCELED.value, "message": f"Job {job_id} canceled"}

    result = {
        "merge": cancel_task(merge_id, "job_state"),
        "tasks": [cancel_task(job_id, "task_state") for job_id in celery_ids]
    }
    return result

get_job_results

get_job_results(job_id)

Returns simulation results

Source code in yaptide/celery/utils/manage_tasks.py
88
89
90
91
92
93
def get_job_results(job_id: str) -> dict:
    """Returns simulation results"""
    job = AsyncResult(id=job_id, app=celery_app)
    if "result" not in job.info:
        return {}
    return job.info.get("result")

get_job_status

get_job_status(merge_id, celery_ids)

Returns simulation state, results are not returned here Simulation may consist of multiple tasks, so we need to check all of them

Source code in yaptide/celery/utils/manage_tasks.py
46
47
48
49
50
51
52
53
54
55
56
def get_job_status(merge_id: str, celery_ids: list[str]) -> dict:
    """
    Returns simulation state, results are not returned here
    Simulation may consist of multiple tasks, so we need to check all of them
    """
    result = {
        "merge": get_task_status(merge_id, "job_state"),
        "tasks": [get_task_status(job_id, "task_state") for job_id in celery_ids]
    }

    return result

get_task_status

get_task_status(job_id, state_key)

Gets status of each task in the workflow

Source code in yaptide/celery/utils/manage_tasks.py
32
33
34
35
36
37
38
39
40
41
42
43
def get_task_status(job_id: str, state_key: str) -> dict:
    """Gets status of each task in the workflow"""
    job = AsyncResult(id=job_id, app=celery_app)
    job_state: str = translate_celery_state_naming(job.state)

    # we still need to convert string to enum and operate later on Enum
    result = {state_key: job_state}
    if job_state == EntityState.FAILED.value:
        result["message"] = str(job.info)
    if "end_time" in job.info:
        result["end_time"] = job.info["end_time"]
    return result

run_job

run_job(
    files_dict,
    update_key,
    simulation_id,
    ntasks,
    sim_type="shieldhit",
)

Runs asynchronous simulation job

Source code in yaptide/celery/utils/manage_tasks.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def run_job(files_dict: dict, update_key: str, simulation_id: int, ntasks: int, sim_type: str = 'shieldhit') -> str:
    """Runs asynchronous simulation job"""
    logging.debug("Starting run_simulation task for %d tasks", ntasks)
    logging.debug("Simulation id: %d", simulation_id)
    logging.debug("Update key: %s", update_key)
    map_group = group([
        run_single_simulation.s(
            files_dict=files_dict,  # simulation input, keys: filenames, values: file contents
            task_id=i,
            update_key=update_key,
            simulation_id=simulation_id,
            sim_type=sim_type) for i in range(ntasks)
    ])

    workflow = chord(map_group, merge_results.s())

    job: AsyncResult = workflow.delay()

    return job.id

translate_celery_state_naming

translate_celery_state_naming(job_state)

Function translating celery states' names to ones used in YAPTIDE

Source code in yaptide/celery/utils/manage_tasks.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def translate_celery_state_naming(job_state: str) -> str:
    """Function translating celery states' names to ones used in YAPTIDE"""
    if job_state in ["RECEIVED", "RETRY"]:
        return EntityState.PENDING.value
    if job_state in ["PROGRESS", "STARTED"]:
        return EntityState.RUNNING.value
    if job_state in ["FAILURE"]:
        return EntityState.FAILED.value
    if job_state in ["REVOKED"]:
        return EntityState.CANCELED.value
    if job_state in ["SUCCESS"]:
        return EntityState.COMPLETED.value
    # Others are the same
    return job_state