Skip to content

common_sim_routes

routes.common_sim_routes

InputsResource

Bases: Resource

Class responsible for returning simulation input

Source code in yaptide/routes/common_sim_routes.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class InputsResource(Resource):
    """Class responsible for returning simulation input"""

    class APIParametersSchema(Schema):
        """Class specifies API parameters"""

        job_id = fields.String()

    @staticmethod
    @requires_auth()
    def get(user: UserModel):
        """Method returning simulation input"""
        schema = InputsResource.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return yaptide_response(message="Wrong parameters", code=400, content=errors)
        param_dict: dict = schema.load(request.args)
        job_id = param_dict['job_id']

        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        simulation = fetch_simulation_by_job_id(job_id=job_id)

        input_model = fetch_input_by_sim_id(sim_id=simulation.id)
        if not input_model:
            return yaptide_response(message="Input of simulation is unavailable", code=404)

        return yaptide_response(message="Input of simulation", code=200, content={"input": input_model.data})

APIParametersSchema

Bases: Schema

Class specifies API parameters

Source code in yaptide/routes/common_sim_routes.py
214
215
216
217
class APIParametersSchema(Schema):
    """Class specifies API parameters"""

    job_id = fields.String()
job_id class-attribute instance-attribute
job_id = String()

get staticmethod

get(user)

Method returning simulation input

Source code in yaptide/routes/common_sim_routes.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@staticmethod
@requires_auth()
def get(user: UserModel):
    """Method returning simulation input"""
    schema = InputsResource.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return yaptide_response(message="Wrong parameters", code=400, content=errors)
    param_dict: dict = schema.load(request.args)
    job_id = param_dict['job_id']

    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    simulation = fetch_simulation_by_job_id(job_id=job_id)

    input_model = fetch_input_by_sim_id(sim_id=simulation.id)
    if not input_model:
        return yaptide_response(message="Input of simulation is unavailable", code=404)

    return yaptide_response(message="Input of simulation", code=200, content={"input": input_model.data})

JobsResource

Bases: Resource

Class responsible for managing common jobs

Source code in yaptide/routes/common_sim_routes.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class JobsResource(Resource):
    """Class responsible for managing common jobs"""

    class APIParametersSchema(Schema):
        """Class specifies API parameters for GET and DELETE request"""

        job_id = fields.String()

    @staticmethod
    @requires_auth()
    def get(user: UserModel):
        """Method returning info about job"""
        schema = JobsResource.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return yaptide_response(message="Wrong parameters", code=400, content=errors)
        param_dict: dict = schema.load(request.args)

        # get job_id from request parameters and check if user owns this job
        job_id = param_dict['job_id']
        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        simulation = fetch_simulation_by_job_id(job_id=job_id)
        if simulation.job_state == EntityState.UNKNOWN.value:
            return yaptide_response(message="Job state is unknown",
                                    code=200,
                                    content={"job_state": simulation.job_state})

        tasks = fetch_tasks_by_sim_id(sim_id=simulation.id)

        job_tasks_status = [task.get_status_dict() for task in tasks]

        if simulation.job_state in (EntityState.COMPLETED.value,
                                    EntityState.FAILED.value):
            return yaptide_response(message=f"Job state: {simulation.job_state}",
                                    code=200,
                                    content={
                                        "job_state": simulation.job_state,
                                        "job_tasks_status": job_tasks_status,
                                    })

        job_info = {
            "job_state": simulation.job_state
        }
        status_counter = Counter([task["task_state"] for task in job_tasks_status])
        if status_counter[EntityState.PENDING.value] == len(job_tasks_status):
            job_info["job_state"] = EntityState.PENDING.value
        elif status_counter[EntityState.FAILED.value] == len(job_tasks_status):
            job_info["job_state"] = EntityState.FAILED.value
        elif status_counter[EntityState.RUNNING.value] > 0:
            job_info["job_state"] = EntityState.RUNNING.value

        update_simulation_state(simulation=simulation, update_dict=job_info)

        job_info["job_tasks_status"] = job_tasks_status

        return yaptide_response(message=f"Job state: {job_info['job_state']}", code=200, content=job_info)

APIParametersSchema

Bases: Schema

Class specifies API parameters for GET and DELETE request

Source code in yaptide/routes/common_sim_routes.py
28
29
30
31
class APIParametersSchema(Schema):
    """Class specifies API parameters for GET and DELETE request"""

    job_id = fields.String()
job_id class-attribute instance-attribute
job_id = String()

get staticmethod

get(user)

Method returning info about job

Source code in yaptide/routes/common_sim_routes.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@staticmethod
@requires_auth()
def get(user: UserModel):
    """Method returning info about job"""
    schema = JobsResource.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return yaptide_response(message="Wrong parameters", code=400, content=errors)
    param_dict: dict = schema.load(request.args)

    # get job_id from request parameters and check if user owns this job
    job_id = param_dict['job_id']
    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    simulation = fetch_simulation_by_job_id(job_id=job_id)
    if simulation.job_state == EntityState.UNKNOWN.value:
        return yaptide_response(message="Job state is unknown",
                                code=200,
                                content={"job_state": simulation.job_state})

    tasks = fetch_tasks_by_sim_id(sim_id=simulation.id)

    job_tasks_status = [task.get_status_dict() for task in tasks]

    if simulation.job_state in (EntityState.COMPLETED.value,
                                EntityState.FAILED.value):
        return yaptide_response(message=f"Job state: {simulation.job_state}",
                                code=200,
                                content={
                                    "job_state": simulation.job_state,
                                    "job_tasks_status": job_tasks_status,
                                })

    job_info = {
        "job_state": simulation.job_state
    }
    status_counter = Counter([task["task_state"] for task in job_tasks_status])
    if status_counter[EntityState.PENDING.value] == len(job_tasks_status):
        job_info["job_state"] = EntityState.PENDING.value
    elif status_counter[EntityState.FAILED.value] == len(job_tasks_status):
        job_info["job_state"] = EntityState.FAILED.value
    elif status_counter[EntityState.RUNNING.value] > 0:
        job_info["job_state"] = EntityState.RUNNING.value

    update_simulation_state(simulation=simulation, update_dict=job_info)

    job_info["job_tasks_status"] = job_tasks_status

    return yaptide_response(message=f"Job state: {job_info['job_state']}", code=200, content=job_info)

LogfilesResource

Bases: Resource

Class responsible for managing logfiles

Source code in yaptide/routes/common_sim_routes.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
class LogfilesResource(Resource):
    """Class responsible for managing logfiles"""

    @staticmethod
    def post():
        """
        Method for saving logfiles
        Used by the jobs when the simulation fails
        Structure required by this method to work properly:
        {
            "simulation_id": <int>,
            "update_key": <string>,
            "logfiles": <dict>
        }
        """
        payload_dict: dict = request.get_json(force=True)
        if {"simulation_id", "update_key", "logfiles"} != set(payload_dict.keys()):
            return yaptide_response(message="Incomplete JSON data", code=400)

        sim_id = payload_dict["simulation_id"]
        simulation = fetch_simulation_by_sim_id(sim_id=sim_id)

        if not simulation:
            return yaptide_response(message="Simulation does not exist", code=400)

        if not simulation.check_update_key(payload_dict["update_key"]):
            return yaptide_response(message="Invalid update key", code=400)

        logfiles = LogfilesModel(simulation_id=simulation.id)
        logfiles.data = payload_dict["logfiles"]
        add_object_to_db(logfiles)

        return yaptide_response(message="Log files saved", code=202)

    class APIParametersSchema(Schema):
        """Class specifies API parameters"""

        job_id = fields.String()

    @staticmethod
    @requires_auth()
    def get(user: UserModel):
        """Method returning job status and results"""
        schema = ResultsResource.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return yaptide_response(message="Wrong parameters", code=400, content=errors)
        param_dict: dict = schema.load(request.args)

        job_id = param_dict['job_id']
        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        simulation = fetch_simulation_by_job_id(job_id=job_id)

        logfile = fetch_logfiles_by_sim_id(sim_id=simulation.id)
        if not logfile:
            return yaptide_response(message="Logfiles are unavailable", code=404)

        logging.debug("Returning logfiles from database")

        return yaptide_response(message="Logfiles", code=200, content={"logfiles": logfile.data})

APIParametersSchema

Bases: Schema

Class specifies API parameters

Source code in yaptide/routes/common_sim_routes.py
277
278
279
280
class APIParametersSchema(Schema):
    """Class specifies API parameters"""

    job_id = fields.String()
job_id class-attribute instance-attribute
job_id = String()

get staticmethod

get(user)

Method returning job status and results

Source code in yaptide/routes/common_sim_routes.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
@staticmethod
@requires_auth()
def get(user: UserModel):
    """Method returning job status and results"""
    schema = ResultsResource.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return yaptide_response(message="Wrong parameters", code=400, content=errors)
    param_dict: dict = schema.load(request.args)

    job_id = param_dict['job_id']
    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    simulation = fetch_simulation_by_job_id(job_id=job_id)

    logfile = fetch_logfiles_by_sim_id(sim_id=simulation.id)
    if not logfile:
        return yaptide_response(message="Logfiles are unavailable", code=404)

    logging.debug("Returning logfiles from database")

    return yaptide_response(message="Logfiles", code=200, content={"logfiles": logfile.data})

post staticmethod

post()

Method for saving logfiles Used by the jobs when the simulation fails Structure required by this method to work properly: { "simulation_id": , "update_key": , "logfiles": }

Source code in yaptide/routes/common_sim_routes.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
@staticmethod
def post():
    """
    Method for saving logfiles
    Used by the jobs when the simulation fails
    Structure required by this method to work properly:
    {
        "simulation_id": <int>,
        "update_key": <string>,
        "logfiles": <dict>
    }
    """
    payload_dict: dict = request.get_json(force=True)
    if {"simulation_id", "update_key", "logfiles"} != set(payload_dict.keys()):
        return yaptide_response(message="Incomplete JSON data", code=400)

    sim_id = payload_dict["simulation_id"]
    simulation = fetch_simulation_by_sim_id(sim_id=sim_id)

    if not simulation:
        return yaptide_response(message="Simulation does not exist", code=400)

    if not simulation.check_update_key(payload_dict["update_key"]):
        return yaptide_response(message="Invalid update key", code=400)

    logfiles = LogfilesModel(simulation_id=simulation.id)
    logfiles.data = payload_dict["logfiles"]
    add_object_to_db(logfiles)

    return yaptide_response(message="Log files saved", code=202)

ResultsResource

Bases: Resource

Class responsible for managing results

Source code in yaptide/routes/common_sim_routes.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class ResultsResource(Resource):
    """Class responsible for managing results"""

    @staticmethod
    def post():
        """
        Method for saving results
        Used by the jobs at the end of simulation
        Structure required by this method to work properly:
        {
            "simulation_id": <int>,
            "update_key": <string>,
            "estimators": <dict>
        }
        """
        payload_dict: dict = request.get_json(force=True)
        if {"simulation_id", "update_key", "estimators"} != set(payload_dict.keys()):
            return yaptide_response(message="Incomplete JSON data", code=400)

        sim_id = payload_dict["simulation_id"]
        simulation = fetch_simulation_by_sim_id(sim_id=sim_id)

        if not simulation:
            return yaptide_response(message="Simulation does not exist", code=400)

        if not simulation.check_update_key(payload_dict["update_key"]):
            return yaptide_response(message="Invalid update key", code=400)

        for estimator_dict in payload_dict["estimators"]:
            # We forsee the possibility of the estimator being created earlier as element of partial results
            estimator = fetch_estimator_by_sim_id_and_est_name(sim_id=sim_id, est_name=estimator_dict["name"])

            if not estimator:
                estimator = EstimatorModel(name=estimator_dict["name"], simulation_id=simulation.id)
                estimator.data = estimator_dict["metadata"]
                add_object_to_db(estimator)

            for page_dict in estimator_dict["pages"]:
                page = fetch_page_by_est_id_and_page_number(
                    est_id=estimator.id, page_number=int(page_dict["metadata"]["page_number"]))

                page_existed = bool(page)
                if not page_existed:
                    # create new page
                    page = PageModel(page_number=int(page_dict["metadata"]["page_number"]), estimator_id=estimator.id)
                # we always update the data
                page.data = page_dict
                if not page_existed:
                    # if page was created, we add it to the session
                    add_object_to_db(page, False)

        make_commit_to_db()
        logging.debug("Marking simulation as completed")
        update_dict = {
            "job_state": EntityState.COMPLETED.value,
            "end_time": datetime.utcnow().isoformat(sep=" ")
        }
        update_simulation_state(simulation=simulation, update_dict=update_dict)
        return yaptide_response(message="Results saved", code=202)

    class APIParametersSchema(Schema):
        """Class specifies API parameters"""

        job_id = fields.String()

    @staticmethod
    @requires_auth()
    def get(user: UserModel):
        """Method returning job status and results"""
        schema = ResultsResource.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return yaptide_response(message="Wrong parameters", code=400, content=errors)
        param_dict: dict = schema.load(request.args)

        job_id = param_dict['job_id']
        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        simulation = fetch_simulation_by_job_id(job_id=job_id)

        estimators = fetch_estimators_by_sim_id(sim_id=simulation.id)
        if len(estimators) == 0:
            if not isinstance(simulation, BatchSimulationModel):  # also CODE TO REMOVE
                return yaptide_response(message="Results are unavailable", code=404)
            # Code below is for backward compatibility with old method of saving results
            # later on we are going to remove it because it's functionality will be covered
            # by the post method
            # BEGIN CODE TO REMOVE

            cluster = fetch_cluster_by_id(cluster_id=simulation.cluster_id)

            result: dict = get_job_results(simulation=simulation, user=user, cluster=cluster)
            if "estimators" not in result:
                logging.debug("Results for job %s are unavailable", job_id)
                return yaptide_response(message="Results are unavailable", code=404, content=result)

            for estimator_dict in result["estimators"]:
                estimator = EstimatorModel(name=estimator_dict["name"], simulation_id=simulation.id)
                estimator.data = estimator_dict["metadata"]
                add_object_to_db(estimator)
                for page_dict in estimator_dict["pages"]:
                    page = PageModel(estimator_id=estimator.id,
                                     page_number=int(page_dict["metadata"]["page_number"]))
                    page.data = page_dict
                    add_object_to_db(page, False)
                make_commit_to_db()
            estimators = fetch_estimators_by_sim_id(sim_id=simulation.id)
            # END CODE TO REMOVE

        logging.debug("Returning results from database")
        result_estimators = []
        for estimator in estimators:
            pages = fetch_pages_by_estimator_id(est_id=estimator.id)
            estimator_dict = {
                "metadata": estimator.data,
                "name": estimator.name,
                "pages": [page.data for page in pages]
            }
            result_estimators.append(estimator_dict)
        return yaptide_response(message=f"Results for job: {job_id}", code=200,
                                content={"estimators": result_estimators})

APIParametersSchema

Bases: Schema

Class specifies API parameters

Source code in yaptide/routes/common_sim_routes.py
146
147
148
149
class APIParametersSchema(Schema):
    """Class specifies API parameters"""

    job_id = fields.String()
job_id class-attribute instance-attribute
job_id = String()

get staticmethod

get(user)

Method returning job status and results

Source code in yaptide/routes/common_sim_routes.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@staticmethod
@requires_auth()
def get(user: UserModel):
    """Method returning job status and results"""
    schema = ResultsResource.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return yaptide_response(message="Wrong parameters", code=400, content=errors)
    param_dict: dict = schema.load(request.args)

    job_id = param_dict['job_id']
    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    simulation = fetch_simulation_by_job_id(job_id=job_id)

    estimators = fetch_estimators_by_sim_id(sim_id=simulation.id)
    if len(estimators) == 0:
        if not isinstance(simulation, BatchSimulationModel):  # also CODE TO REMOVE
            return yaptide_response(message="Results are unavailable", code=404)
        # Code below is for backward compatibility with old method of saving results
        # later on we are going to remove it because it's functionality will be covered
        # by the post method
        # BEGIN CODE TO REMOVE

        cluster = fetch_cluster_by_id(cluster_id=simulation.cluster_id)

        result: dict = get_job_results(simulation=simulation, user=user, cluster=cluster)
        if "estimators" not in result:
            logging.debug("Results for job %s are unavailable", job_id)
            return yaptide_response(message="Results are unavailable", code=404, content=result)

        for estimator_dict in result["estimators"]:
            estimator = EstimatorModel(name=estimator_dict["name"], simulation_id=simulation.id)
            estimator.data = estimator_dict["metadata"]
            add_object_to_db(estimator)
            for page_dict in estimator_dict["pages"]:
                page = PageModel(estimator_id=estimator.id,
                                 page_number=int(page_dict["metadata"]["page_number"]))
                page.data = page_dict
                add_object_to_db(page, False)
            make_commit_to_db()
        estimators = fetch_estimators_by_sim_id(sim_id=simulation.id)
        # END CODE TO REMOVE

    logging.debug("Returning results from database")
    result_estimators = []
    for estimator in estimators:
        pages = fetch_pages_by_estimator_id(est_id=estimator.id)
        estimator_dict = {
            "metadata": estimator.data,
            "name": estimator.name,
            "pages": [page.data for page in pages]
        }
        result_estimators.append(estimator_dict)
    return yaptide_response(message=f"Results for job: {job_id}", code=200,
                            content={"estimators": result_estimators})

post staticmethod

post()

Method for saving results Used by the jobs at the end of simulation Structure required by this method to work properly: { "simulation_id": , "update_key": , "estimators": }

Source code in yaptide/routes/common_sim_routes.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@staticmethod
def post():
    """
    Method for saving results
    Used by the jobs at the end of simulation
    Structure required by this method to work properly:
    {
        "simulation_id": <int>,
        "update_key": <string>,
        "estimators": <dict>
    }
    """
    payload_dict: dict = request.get_json(force=True)
    if {"simulation_id", "update_key", "estimators"} != set(payload_dict.keys()):
        return yaptide_response(message="Incomplete JSON data", code=400)

    sim_id = payload_dict["simulation_id"]
    simulation = fetch_simulation_by_sim_id(sim_id=sim_id)

    if not simulation:
        return yaptide_response(message="Simulation does not exist", code=400)

    if not simulation.check_update_key(payload_dict["update_key"]):
        return yaptide_response(message="Invalid update key", code=400)

    for estimator_dict in payload_dict["estimators"]:
        # We forsee the possibility of the estimator being created earlier as element of partial results
        estimator = fetch_estimator_by_sim_id_and_est_name(sim_id=sim_id, est_name=estimator_dict["name"])

        if not estimator:
            estimator = EstimatorModel(name=estimator_dict["name"], simulation_id=simulation.id)
            estimator.data = estimator_dict["metadata"]
            add_object_to_db(estimator)

        for page_dict in estimator_dict["pages"]:
            page = fetch_page_by_est_id_and_page_number(
                est_id=estimator.id, page_number=int(page_dict["metadata"]["page_number"]))

            page_existed = bool(page)
            if not page_existed:
                # create new page
                page = PageModel(page_number=int(page_dict["metadata"]["page_number"]), estimator_id=estimator.id)
            # we always update the data
            page.data = page_dict
            if not page_existed:
                # if page was created, we add it to the session
                add_object_to_db(page, False)

    make_commit_to_db()
    logging.debug("Marking simulation as completed")
    update_dict = {
        "job_state": EntityState.COMPLETED.value,
        "end_time": datetime.utcnow().isoformat(sep=" ")
    }
    update_simulation_state(simulation=simulation, update_dict=update_dict)
    return yaptide_response(message="Results saved", code=202)