Coverage for yaptide/routes/utils/utils.py: 87%

31 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-22 07:31 +0000

1from typing import Optional 

2 

3from yaptide.persistence.db_methods import fetch_simulation_by_job_id 

4from yaptide.persistence.models import UserModel 

5from yaptide.utils.enums import InputType 

6from yaptide.utils.sim_utils import files_dict_with_adjusted_primaries 

7 

8 

9def check_if_job_is_owned_and_exist(job_id: str, user: UserModel) -> tuple[bool, str, int]: 

10 """Function checking if provided task is owned by user managing action""" 

11 simulation = fetch_simulation_by_job_id(job_id=job_id) 

12 

13 if not simulation: 

14 return False, 'Job with provided ID does not exist', 404 

15 if simulation.user_id == user.id: 

16 return True, "", 200 

17 return False, 'Job with provided ID does not belong to the user', 403 

18 

19 

20def determine_input_type(payload_dict: dict) -> Optional[str]: 

21 """Function returning input type determined from payload""" 

22 if payload_dict["input_type"] == "editor": 

23 if "input_json" not in payload_dict: 

24 return None 

25 return InputType.EDITOR.value 

26 if payload_dict["input_type"] == "files": 

27 if "input_files" not in payload_dict: 

28 return None 

29 return InputType.FILES.value 

30 return None 

31 

32 

33def make_input_dict(payload_dict: dict, input_type: str) -> dict: 

34 """Function returning input dict""" 

35 input_dict = { 

36 "input_type": input_type, 

37 } 

38 if input_type == InputType.EDITOR.value: 

39 files_dict, number_of_all_primaries = files_dict_with_adjusted_primaries(payload_dict=payload_dict) 

40 input_dict["input_json"] = payload_dict["input_json"] 

41 else: 

42 files_dict, number_of_all_primaries = files_dict_with_adjusted_primaries(payload_dict=payload_dict) 

43 input_dict["number_of_all_primaries"] = number_of_all_primaries 

44 input_dict["input_files"] = files_dict 

45 

46 return input_dict