Coverage for yaptide/routes/user_routes.py: 87%

77 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-09-09 08:12 +0000

1import logging 

2from enum import Enum 

3from typing import List 

4 

5from flask import request 

6from flask_restful import Resource 

7from marshmallow import Schema, fields, ValidationError 

8from sqlalchemy import asc, desc 

9 

10from yaptide.persistence.models import SimulationModel, UserModel 

11from yaptide.routes.utils.decorators import requires_auth 

12from yaptide.routes.utils.response_templates import (error_validation_response, yaptide_response) 

13from yaptide.persistence.db_methods import (delete_object_from_db, fetch_simulation_by_job_id) 

14from yaptide.utils.enums import EntityState 

15 

16DEFAULT_PAGE_SIZE = 6 # default number of simulations per page 

17DEFAULT_PAGE_IDX = 1 # default page index 

18 

19 

20class OrderType(Enum): 

21 """Order type""" 

22 

23 ASCEND = "ascend" 

24 DESCEND = "descend" 

25 

26 

27class OrderBy(Enum): 

28 """Order by column""" 

29 

30 START_TIME = "start_time" 

31 END_TIME = "end_time" 

32 

33 

34def validate_job_state(states: List[str]): 

35 """check if all states are correct values of EntityState enum""" 

36 if not set(states).issubset({es.value for es in EntityState}): 

37 raise ValidationError('Invalid job state') 

38 

39 

40class JobStateField(fields.Field): 

41 """custom deserializer for job_state field""" 

42 

43 @staticmethod 

44 def _deserialize(value, attr, data, **kwargs): 

45 """deserializes job_state, which is expected to come as comma-separated list of states""" 

46 return value.split(',') if isinstance(value, str) else [] 

47 

48 

49class UserSimulations(Resource): 

50 """Class responsible for returning user's simulations' basic infos""" 

51 

52 class GetAPIParametersSchema(Schema): 

53 """Class specifies Get API parameters""" 

54 

55 page_size = fields.Integer(load_default=DEFAULT_PAGE_SIZE) 

56 page_idx = fields.Integer(load_default=DEFAULT_PAGE_IDX) 

57 order_by = fields.String(load_default=OrderBy.START_TIME.value) 

58 order_type = fields.String(load_default=OrderType.DESCEND.value) 

59 job_state = JobStateField(validate=validate_job_state, load_default=[]) 

60 

61 class DeleteAPIParametersSchema(Schema): 

62 """Schema for DELETE method parameters""" 

63 

64 job_id = fields.String(required=True) # job_id is mandatory for DELETE 

65 

66 @staticmethod 

67 @requires_auth() 

68 def get(user: UserModel): 

69 """Method returning simulations from the database""" 

70 schema = UserSimulations.GetAPIParametersSchema() 

71 params_dict: dict = schema.load(request.args) 

72 logging.info('User %s requested simulations with parameters: %s', user.username, params_dict) 

73 

74 # Query the database for the paginated results 

75 sorting = desc if params_dict['order_type'] == OrderType.DESCEND.value else asc 

76 query = SimulationModel.query.\ 

77 filter(SimulationModel.job_id != None).\ 

78 filter_by(user_id=user.id) 

79 if len(params_dict['job_state']) > 0: 

80 query = query.filter(SimulationModel.job_state.in_(params_dict['job_state'])) 

81 query = query.order_by(sorting(params_dict['order_by'])) 

82 pagination = query.paginate(page=params_dict['page_idx'], per_page=params_dict['page_size'], error_out=False) 

83 simulations = pagination.items 

84 

85 result = { 

86 'simulations': [ 

87 { 

88 'title': simulation.title, 

89 'job_id': simulation.job_id, 

90 'start_time': simulation.start_time, 

91 # submission time, when user send the request to the backend - jobs may start much later than that 

92 'end_time': simulation.end_time, 

93 # end time, when the all jobs are finished and results are merged 

94 'metadata': { 

95 'platform': simulation.platform, 

96 'server': 'Yaptide', 

97 'input_type': simulation.input_type, 

98 'sim_type': simulation.sim_type 

99 } 

100 } for simulation in simulations 

101 ], 

102 'page_count': 

103 pagination.pages, 

104 'simulations_count': 

105 pagination.total, 

106 } 

107 return yaptide_response(message='User Simulations', code=200, content=result) 

108 

109 @staticmethod 

110 @requires_auth() 

111 def delete(user: UserModel): 

112 """Method deleting simulation from database""" 

113 schema = UserSimulations.DeleteAPIParametersSchema() 

114 errors: dict[str, list[str]] = schema.validate(request.args) 

115 if errors: 

116 return error_validation_response(content=errors) 

117 params_dict: dict = schema.load(request.args) 

118 

119 job_id = params_dict['job_id'] 

120 simulation = fetch_simulation_by_job_id(job_id) 

121 

122 if simulation is None: 

123 return yaptide_response(message=f'Simulation with job_id={job_id} do not exist', code=404) 

124 

125 if simulation.user_id != user.id: 

126 return yaptide_response(message='Unauthorized: You do not have permission to delete this simulation', 

127 code=401) 

128 

129 # Simulation has to be completed/cancelled before deleting it. 

130 if simulation.job_state in (EntityState.UNKNOWN.value, EntityState.PENDING.value, EntityState.RUNNING.value): 

131 return yaptide_response(message=f'''Simulation with job_id={job_id} is currently running. 

132 Please cancel simulation or wait for it to finish''', 

133 code=403) 

134 

135 delete_object_from_db(simulation) 

136 return yaptide_response(message=f'Simulation with job_id={job_id} successfully deleted from database', code=200) 

137 

138 

139class UserUpdate(Resource): 

140 """Class responsible for updating the user""" 

141 

142 @staticmethod 

143 @requires_auth() 

144 def post(user: UserModel): 

145 """Updates user with provided parameters""" 

146 json_data: dict = request.get_json(force=True) 

147 if not json_data: 

148 return error_validation_response() 

149 return yaptide_response(message=f'User {user.username} updated', code=202)