Coverage for yaptide/utils/helper_tasks.py: 21%

24 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-22 07:31 +0000

1from time import sleep 

2from yaptide.utils.helper_worker import celery_app 

3 

4 

5@celery_app.task 

6def terminate_unfinished_tasks(simulation_id: int): 

7 """Function for stopping tasks that wasn't finished with first try""" 

8 number_of_tasks = get_tasks_from_celery(simulation_id) 

9 previous_number_of_tasks = 0 

10 # Wait until amount of active tasks doesn't fall 

11 while number_of_tasks != previous_number_of_tasks: 

12 previous_number_of_tasks = number_of_tasks 

13 sleep(1) 

14 number_of_tasks = get_tasks_from_celery(simulation_id) 

15 celery_app.control.revoke([celery_pair['celery_id'] for celery_pair in get_tasks_from_celery(simulation_id)], 

16 terminate=True, 

17 signal="SIGINT") 

18 

19 

20def get_tasks_from_celery(simulation_id: int): 

21 """returns celery ids from celery based on simulation_id. Can take up to few seconds when celry is busy""" 

22 simulation_task_ids = [] 

23 

24 retry_treshold = 10 

25 while retry_treshold > 0: 

26 # Sometimes celery_app.control is None 

27 try: 

28 for simulation in celery_app.control.inspect().active( 

29 )['celery@yaptide-simulation-worker'] + celery_app.control.inspect().reserved( 

30 )['celery@yaptide-simulation-worker']: 

31 if simulation['kwargs']['simulation_id'] == simulation_id: 

32 simulation_task_ids.append({ 

33 "celery_id": simulation['id'], 

34 "task_id": simulation['kwargs']['task_id'] 

35 }) 

36 break 

37 except Exception: 

38 retry_treshold -= 1 

39 continue 

40 return simulation_task_ids