Coverage for yaptide/utils/helper_tasks.py: 21%
24 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
1from time import sleep
2from yaptide.utils.helper_worker import celery_app
5@celery_app.task
6def terminate_unfinished_tasks(simulation_id: int):
7 """Function for stopping tasks that wasn't finished with first try"""
8 number_of_tasks = get_tasks_from_celery(simulation_id)
9 previous_number_of_tasks = 0
10 # Wait until amount of active tasks doesn't fall
11 while number_of_tasks != previous_number_of_tasks:
12 previous_number_of_tasks = number_of_tasks
13 sleep(1)
14 number_of_tasks = get_tasks_from_celery(simulation_id)
15 celery_app.control.revoke([celery_pair['celery_id'] for celery_pair in get_tasks_from_celery(simulation_id)],
16 terminate=True,
17 signal="SIGINT")
20def get_tasks_from_celery(simulation_id: int):
21 """returns celery ids from celery based on simulation_id. Can take up to few seconds when celry is busy"""
22 simulation_task_ids = []
24 retry_treshold = 10
25 while retry_treshold > 0:
26 # Sometimes celery_app.control is None
27 try:
28 for simulation in celery_app.control.inspect().active(
29 )['celery@yaptide-simulation-worker'] + celery_app.control.inspect().reserved(
30 )['celery@yaptide-simulation-worker']:
31 if simulation['kwargs']['simulation_id'] == simulation_id:
32 simulation_task_ids.append({
33 "celery_id": simulation['id'],
34 "task_id": simulation['kwargs']['task_id']
35 })
36 break
37 except Exception:
38 retry_treshold -= 1
39 continue
40 return simulation_task_ids