Method updating task state
Structure required by this method to work properly:
{
"simulation_id": ,
"task_id": ,
"update_key": ,
"update_dict":
}
simulation_id and task_id self explanatory
Source code in yaptide/routes/task_routes.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 | @staticmethod
def post():
"""
Method updating task state
Structure required by this method to work properly:
{
"simulation_id": <int>,
"task_id": <string>,
"update_key": <string>,
"update_dict": <dict>
}
simulation_id and task_id self explanatory
"""
payload_dict: dict = request.get_json(force=True)
required_keys = {"simulation_id", "task_id", "update_key", "update_dict"}
if required_keys != set(payload_dict.keys()):
diff = required_keys.difference(set(payload_dict.keys()))
return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400)
sim_id: int = payload_dict["simulation_id"]
simulation = fetch_simulation_by_sim_id(sim_id=sim_id)
if not simulation:
return yaptide_response(message=f"Simulation {sim_id} does not exist", code=400)
decoded_token = decode_auth_token(payload_dict["update_key"], payload_key_to_return="simulation_id")
if decoded_token != sim_id:
return yaptide_response(message="Invalid update key", code=400)
task = fetch_task_by_sim_id_and_task_id(sim_id=simulation.id, task_id=payload_dict["task_id"])
if not task:
return yaptide_response(message=f"Task {payload_dict['task_id']} does not exist", code=400)
update_task_state(task=task, update_dict=payload_dict["update_dict"])
return yaptide_response(message="Task updated", code=202)
|