Skip to content

celery_routes

routes.celery_routes

ConvertResource

Bases: Resource

Class responsible for returning input_model files converted from front JSON

Source code in yaptide/routes/celery_routes.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class ConvertResource(Resource):
    """Class responsible for returning input_model files converted from front JSON"""

    @staticmethod
    @requires_auth()
    def post(_: UserModel):
        """Method handling input_model files convertion"""
        payload_dict: dict = request.get_json(force=True)
        if not payload_dict:
            return yaptide_response(message="No JSON in body", code=400)

        # Rework in later PRs to match pattern from jobs endpoint
        job = convert_input_files.delay(payload_dict=payload_dict)
        result: dict = job.wait()

        return yaptide_response(message="Converted Input Files", code=200, content=result)

post staticmethod

post(_)

Method handling input_model files convertion

Source code in yaptide/routes/celery_routes.py
250
251
252
253
254
255
256
257
258
259
260
261
262
@staticmethod
@requires_auth()
def post(_: UserModel):
    """Method handling input_model files convertion"""
    payload_dict: dict = request.get_json(force=True)
    if not payload_dict:
        return yaptide_response(message="No JSON in body", code=400)

    # Rework in later PRs to match pattern from jobs endpoint
    job = convert_input_files.delay(payload_dict=payload_dict)
    result: dict = job.wait()

    return yaptide_response(message="Converted Input Files", code=200, content=result)

JobsDirect

Bases: Resource

Class responsible for simulations run directly with celery

Source code in yaptide/routes/celery_routes.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class JobsDirect(Resource):
    """Class responsible for simulations run directly with celery"""

    @staticmethod
    @requires_auth()
    def post(user: UserModel):
        """Submit simulation job to celery"""
        payload_dict: dict = request.get_json(force=True)
        if not payload_dict:
            return yaptide_response(message="No JSON in body", code=400)

        required_keys = {"sim_type", "ntasks", "input_type"}

        if required_keys != required_keys.intersection(set(payload_dict.keys())):
            diff = required_keys.difference(set(payload_dict.keys()))
            return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400)

        input_type = determine_input_type(payload_dict)

        if input_type is None:
            return error_validation_response()

        # create a new simulation in the database, not waiting for the job to finish
        job_id = datetime.now().strftime('%Y%m%d-%H%M%S-') + str(uuid.uuid4()) + PlatformType.DIRECT.value
        simulation = CelerySimulationModel(user_id=user.id,
                                           job_id=job_id,
                                           sim_type=payload_dict["sim_type"],
                                           input_type=input_type,
                                           title=payload_dict.get("title", ''))
        update_key = str(uuid.uuid4())
        simulation.set_update_key(update_key)
        add_object_to_db(simulation)
        logging.info("Simulation %d created and inserted into DB", simulation.id)
        logging.debug("Update key set to %s", update_key)

        input_dict = make_input_dict(payload_dict=payload_dict, input_type=input_type)

        # submit the asynchronous job to celery
        simulation.merge_id = run_job(input_dict["input_files"], update_key, simulation.id, payload_dict["ntasks"],
                                      payload_dict["sim_type"])

        # create tasks in the database in the default PENDING state
        for i in range(payload_dict["ntasks"]):
            task = CeleryTaskModel(simulation_id=simulation.id, task_id=i)
            add_object_to_db(task, make_commit=False)

        input_model = InputModel(simulation_id=simulation.id)
        input_model.data = input_dict
        add_object_to_db(input_model)
        if simulation.update_state({"job_state": EntityState.PENDING.value}):
            make_commit_to_db()

        return yaptide_response(message="Task started", code=202, content={'job_id': simulation.job_id})

    class APIParametersSchema(Schema):
        """Class specifies API parameters for GET and DELETE request"""

        job_id = fields.String()

    @staticmethod
    @requires_auth()
    def get(user: UserModel):
        """Method returning job status and results"""
        # validate request parameters and handle errors
        schema = JobsDirect.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return yaptide_response(message="Wrong parameters", code=400, content=errors)
        param_dict: dict = schema.load(request.args)

        # get job_id from request parameters and check if user owns this job
        job_id = param_dict['job_id']
        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        # find appropriate simulation in the database
        simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

        tasks = fetch_celery_tasks_by_sim_id(sim_id=simulation.id)

        job_tasks_status = [task.get_status_dict() for task in tasks]

        if simulation.job_state in (EntityState.COMPLETED.value,
                                    EntityState.FAILED.value):
            return yaptide_response(message=f"Job state: {simulation.job_state}",
                                    code=200,
                                    content={
                                        "job_state": simulation.job_state,
                                        "job_tasks_status": job_tasks_status,
                                    })

        job_info = {
            "job_state": simulation.job_state
        }
        status_counter = Counter([task["task_state"] for task in job_tasks_status])
        if status_counter[EntityState.PENDING.value] == len(job_tasks_status):
            job_info["job_state"] = EntityState.PENDING.value
        elif status_counter[EntityState.FAILED.value] == len(job_tasks_status):
            job_info["job_state"] = EntityState.FAILED.value
        elif status_counter[EntityState.RUNNING.value] > 0:
            job_info["job_state"] = EntityState.RUNNING.value

        # if simulation is not found, return error
        update_simulation_state(simulation=simulation, update_dict=job_info)

        job_info["job_tasks_status"] = job_tasks_status

        return yaptide_response(message=f"Job state: {job_info['job_state']}", code=200, content=job_info)

    @staticmethod
    @requires_auth()
    def delete(user: UserModel):
        """Method canceling simulation and returning status of this action"""
        schema = JobsDirect.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return error_validation_response(content=errors)
        params_dict: dict = schema.load(request.args)

        job_id = params_dict['job_id']

        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(
            job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

        if simulation.job_state in (EntityState.COMPLETED.value,
                                    EntityState.FAILED.value,
                                    EntityState.CANCELED.value,
                                    EntityState.UNKNOWN.value):
            return yaptide_response(message=f"Cannot cancel job which is in {simulation.job_state} state",
                                    code=200,
                                    content={
                                        "job_state": simulation.job_state,
                                    })

        tasks = fetch_celery_tasks_by_sim_id(sim_id=simulation.id)

        celery_ids = [task.celery_id for task in tasks]

        result: dict = cancel_job(merge_id=simulation.merge_id, celery_ids=celery_ids)

        if "merge" in result:
            update_simulation_state(simulation=simulation, update_dict=result["merge"])
            for i, task in enumerate(tasks):
                update_task_state(task=task, update_dict=result["tasks"][i])

            return yaptide_response(message="", code=200, content=result)

        return error_internal_response()

APIParametersSchema

Bases: Schema

Class specifies API parameters for GET and DELETE request

Source code in yaptide/routes/celery_routes.py
86
87
88
89
class APIParametersSchema(Schema):
    """Class specifies API parameters for GET and DELETE request"""

    job_id = fields.String()
job_id class-attribute instance-attribute
job_id = String()

delete staticmethod

delete(user)

Method canceling simulation and returning status of this action

Source code in yaptide/routes/celery_routes.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@staticmethod
@requires_auth()
def delete(user: UserModel):
    """Method canceling simulation and returning status of this action"""
    schema = JobsDirect.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return error_validation_response(content=errors)
    params_dict: dict = schema.load(request.args)

    job_id = params_dict['job_id']

    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(
        job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

    if simulation.job_state in (EntityState.COMPLETED.value,
                                EntityState.FAILED.value,
                                EntityState.CANCELED.value,
                                EntityState.UNKNOWN.value):
        return yaptide_response(message=f"Cannot cancel job which is in {simulation.job_state} state",
                                code=200,
                                content={
                                    "job_state": simulation.job_state,
                                })

    tasks = fetch_celery_tasks_by_sim_id(sim_id=simulation.id)

    celery_ids = [task.celery_id for task in tasks]

    result: dict = cancel_job(merge_id=simulation.merge_id, celery_ids=celery_ids)

    if "merge" in result:
        update_simulation_state(simulation=simulation, update_dict=result["merge"])
        for i, task in enumerate(tasks):
            update_task_state(task=task, update_dict=result["tasks"][i])

        return yaptide_response(message="", code=200, content=result)

    return error_internal_response()

get staticmethod

get(user)

Method returning job status and results

Source code in yaptide/routes/celery_routes.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@staticmethod
@requires_auth()
def get(user: UserModel):
    """Method returning job status and results"""
    # validate request parameters and handle errors
    schema = JobsDirect.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return yaptide_response(message="Wrong parameters", code=400, content=errors)
    param_dict: dict = schema.load(request.args)

    # get job_id from request parameters and check if user owns this job
    job_id = param_dict['job_id']
    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    # find appropriate simulation in the database
    simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

    tasks = fetch_celery_tasks_by_sim_id(sim_id=simulation.id)

    job_tasks_status = [task.get_status_dict() for task in tasks]

    if simulation.job_state in (EntityState.COMPLETED.value,
                                EntityState.FAILED.value):
        return yaptide_response(message=f"Job state: {simulation.job_state}",
                                code=200,
                                content={
                                    "job_state": simulation.job_state,
                                    "job_tasks_status": job_tasks_status,
                                })

    job_info = {
        "job_state": simulation.job_state
    }
    status_counter = Counter([task["task_state"] for task in job_tasks_status])
    if status_counter[EntityState.PENDING.value] == len(job_tasks_status):
        job_info["job_state"] = EntityState.PENDING.value
    elif status_counter[EntityState.FAILED.value] == len(job_tasks_status):
        job_info["job_state"] = EntityState.FAILED.value
    elif status_counter[EntityState.RUNNING.value] > 0:
        job_info["job_state"] = EntityState.RUNNING.value

    # if simulation is not found, return error
    update_simulation_state(simulation=simulation, update_dict=job_info)

    job_info["job_tasks_status"] = job_tasks_status

    return yaptide_response(message=f"Job state: {job_info['job_state']}", code=200, content=job_info)

post staticmethod

post(user)

Submit simulation job to celery

Source code in yaptide/routes/celery_routes.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@staticmethod
@requires_auth()
def post(user: UserModel):
    """Submit simulation job to celery"""
    payload_dict: dict = request.get_json(force=True)
    if not payload_dict:
        return yaptide_response(message="No JSON in body", code=400)

    required_keys = {"sim_type", "ntasks", "input_type"}

    if required_keys != required_keys.intersection(set(payload_dict.keys())):
        diff = required_keys.difference(set(payload_dict.keys()))
        return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400)

    input_type = determine_input_type(payload_dict)

    if input_type is None:
        return error_validation_response()

    # create a new simulation in the database, not waiting for the job to finish
    job_id = datetime.now().strftime('%Y%m%d-%H%M%S-') + str(uuid.uuid4()) + PlatformType.DIRECT.value
    simulation = CelerySimulationModel(user_id=user.id,
                                       job_id=job_id,
                                       sim_type=payload_dict["sim_type"],
                                       input_type=input_type,
                                       title=payload_dict.get("title", ''))
    update_key = str(uuid.uuid4())
    simulation.set_update_key(update_key)
    add_object_to_db(simulation)
    logging.info("Simulation %d created and inserted into DB", simulation.id)
    logging.debug("Update key set to %s", update_key)

    input_dict = make_input_dict(payload_dict=payload_dict, input_type=input_type)

    # submit the asynchronous job to celery
    simulation.merge_id = run_job(input_dict["input_files"], update_key, simulation.id, payload_dict["ntasks"],
                                  payload_dict["sim_type"])

    # create tasks in the database in the default PENDING state
    for i in range(payload_dict["ntasks"]):
        task = CeleryTaskModel(simulation_id=simulation.id, task_id=i)
        add_object_to_db(task, make_commit=False)

    input_model = InputModel(simulation_id=simulation.id)
    input_model.data = input_dict
    add_object_to_db(input_model)
    if simulation.update_state({"job_state": EntityState.PENDING.value}):
        make_commit_to_db()

    return yaptide_response(message="Task started", code=202, content={'job_id': simulation.job_id})

ResultsDirect

Bases: Resource

Class responsible for returning simulation results

Source code in yaptide/routes/celery_routes.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class ResultsDirect(Resource):
    """Class responsible for returning simulation results"""

    class APIParametersSchema(Schema):
        """Class specifies API parameters"""

        job_id = fields.String()

    @staticmethod
    @requires_auth()
    def get(user: UserModel):
        """Method returning job status and results"""
        schema = ResultsDirect.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return yaptide_response(message="Wrong parameters", code=400, content=errors)
        param_dict: dict = schema.load(request.args)

        job_id = param_dict['job_id']
        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

        estimators: list[EstimatorModel] = fetch_estimators_by_sim_id(sim_id=simulation.id)
        if len(estimators) > 0:
            logging.debug("Returning results from database")
            result_estimators = []
            for estimator in estimators:
                pages: list[PageModel] = fetch_pages_by_estimator_id(est_id=estimator.id)
                estimator_dict = {
                    "metadata": estimator.data,
                    "name": estimator.name,
                    "pages": [page.data for page in pages]
                }
                result_estimators.append(estimator_dict)
            return yaptide_response(message=f"Results for job: {job_id}",
                                    code=200, content={"estimators": result_estimators})

        result: dict = get_job_results(job_id=job_id)
        if "estimators" not in result:
            logging.debug("Results for job %s are unavailable", job_id)
            return yaptide_response(message="Results are unavailable", code=404, content=result)

        for estimator_dict in result["estimators"]:
            estimator = EstimatorModel(name=estimator_dict["name"], simulation_id=simulation.id)
            estimator.data = estimator_dict["metadata"]
            add_object_to_db(estimator)
            for page_dict in estimator_dict["pages"]:
                page = PageModel(estimator_id=estimator.id,
                                 page_number=int(page_dict["metadata"]["page_number"]))
                page.data = page_dict
                add_object_to_db(page, False)
            make_commit_to_db()

        logging.debug("Returning results from Celery")
        return yaptide_response(message=f"Results for job: {job_id}, results from Celery", code=200, content=result)

APIParametersSchema

Bases: Schema

Class specifies API parameters

Source code in yaptide/routes/celery_routes.py
190
191
192
193
class APIParametersSchema(Schema):
    """Class specifies API parameters"""

    job_id = fields.String()
job_id class-attribute instance-attribute
job_id = String()

get staticmethod

get(user)

Method returning job status and results

Source code in yaptide/routes/celery_routes.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
@staticmethod
@requires_auth()
def get(user: UserModel):
    """Method returning job status and results"""
    schema = ResultsDirect.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return yaptide_response(message="Wrong parameters", code=400, content=errors)
    param_dict: dict = schema.load(request.args)

    job_id = param_dict['job_id']
    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

    estimators: list[EstimatorModel] = fetch_estimators_by_sim_id(sim_id=simulation.id)
    if len(estimators) > 0:
        logging.debug("Returning results from database")
        result_estimators = []
        for estimator in estimators:
            pages: list[PageModel] = fetch_pages_by_estimator_id(est_id=estimator.id)
            estimator_dict = {
                "metadata": estimator.data,
                "name": estimator.name,
                "pages": [page.data for page in pages]
            }
            result_estimators.append(estimator_dict)
        return yaptide_response(message=f"Results for job: {job_id}",
                                code=200, content={"estimators": result_estimators})

    result: dict = get_job_results(job_id=job_id)
    if "estimators" not in result:
        logging.debug("Results for job %s are unavailable", job_id)
        return yaptide_response(message="Results are unavailable", code=404, content=result)

    for estimator_dict in result["estimators"]:
        estimator = EstimatorModel(name=estimator_dict["name"], simulation_id=simulation.id)
        estimator.data = estimator_dict["metadata"]
        add_object_to_db(estimator)
        for page_dict in estimator_dict["pages"]:
            page = PageModel(estimator_id=estimator.id,
                             page_number=int(page_dict["metadata"]["page_number"]))
            page.data = page_dict
            add_object_to_db(page, False)
        make_commit_to_db()

    logging.debug("Returning results from Celery")
    return yaptide_response(message=f"Results for job: {job_id}, results from Celery", code=200, content=result)