Coverage for yaptide/routes/utils/utils.py: 87%
31 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
1from typing import Optional
3from yaptide.persistence.db_methods import fetch_simulation_by_job_id
4from yaptide.persistence.models import UserModel
5from yaptide.utils.enums import InputType
6from yaptide.utils.sim_utils import files_dict_with_adjusted_primaries
9def check_if_job_is_owned_and_exist(job_id: str, user: UserModel) -> tuple[bool, str, int]:
10 """Function checking if provided task is owned by user managing action"""
11 simulation = fetch_simulation_by_job_id(job_id=job_id)
13 if not simulation:
14 return False, 'Job with provided ID does not exist', 404
15 if simulation.user_id == user.id:
16 return True, "", 200
17 return False, 'Job with provided ID does not belong to the user', 403
20def determine_input_type(payload_dict: dict) -> Optional[str]:
21 """Function returning input type determined from payload"""
22 if payload_dict["input_type"] == "editor":
23 if "input_json" not in payload_dict:
24 return None
25 return InputType.EDITOR.value
26 if payload_dict["input_type"] == "files":
27 if "input_files" not in payload_dict:
28 return None
29 return InputType.FILES.value
30 return None
33def make_input_dict(payload_dict: dict, input_type: str) -> dict:
34 """Function returning input dict"""
35 input_dict = {
36 "input_type": input_type,
37 }
38 if input_type == InputType.EDITOR.value:
39 files_dict, number_of_all_primaries = files_dict_with_adjusted_primaries(payload_dict=payload_dict)
40 input_dict["input_json"] = payload_dict["input_json"]
41 else:
42 files_dict, number_of_all_primaries = files_dict_with_adjusted_primaries(payload_dict=payload_dict)
43 input_dict["number_of_all_primaries"] = number_of_all_primaries
44 input_dict["input_files"] = files_dict
46 return input_dict