Skip to content

helper_tasks

utils.helper_tasks

get_tasks_from_celery

get_tasks_from_celery(simulation_id)

returns celery ids from celery based on simulation_id. Can take up to few seconds when celry is busy

Source code in yaptide/utils/helper_tasks.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def get_tasks_from_celery(simulation_id: int):
    """returns celery ids from celery based on simulation_id. Can take up to few seconds when celry is busy"""
    simulation_task_ids = []

    retry_treshold = 10
    while retry_treshold > 0:
        # Sometimes celery_app.control is None
        try:
            for simulation in celery_app.control.inspect().active(
            )['celery@yaptide-simulation-worker'] + celery_app.control.inspect().reserved(
            )['celery@yaptide-simulation-worker']:
                if simulation['kwargs']['simulation_id'] == simulation_id:
                    simulation_task_ids.append({
                        "celery_id": simulation['id'],
                        "task_id": simulation['kwargs']['task_id']
                    })
            break
        except Exception:
            retry_treshold -= 1
            continue
    return simulation_task_ids

terminate_unfinished_tasks

terminate_unfinished_tasks(simulation_id)

Function for stopping tasks that wasn't finished with first try

Source code in yaptide/utils/helper_tasks.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@celery_app.task
def terminate_unfinished_tasks(simulation_id: int):
    """Function for stopping tasks that wasn't finished with first try"""
    number_of_tasks = get_tasks_from_celery(simulation_id)
    previous_number_of_tasks = 0
    # Wait until amount of active tasks doesn't fall
    while number_of_tasks != previous_number_of_tasks:
        previous_number_of_tasks = number_of_tasks
        sleep(1)
        number_of_tasks = get_tasks_from_celery(simulation_id)
    celery_app.control.revoke([celery_pair['celery_id'] for celery_pair in get_tasks_from_celery(simulation_id)],
                              terminate=True,
                              signal="SIGINT")