Skip to content

celery_routes

routes.celery_routes

ConvertResource

Bases: Resource

Class responsible for returning input_model files converted from front JSON

Source code in yaptide/routes/celery_routes.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class ConvertResource(Resource):
    """Class responsible for returning input_model files converted from front JSON"""

    @staticmethod
    @requires_auth()
    def post(_: UserModel):
        """Method handling input_model files convertion"""
        payload_dict: dict = request.get_json(force=True)
        if not payload_dict:
            return yaptide_response(message="No JSON in body", code=400)

        # Rework in later PRs to match pattern from jobs endpoint
        job = convert_input_files.delay(payload_dict=payload_dict)
        result: dict = job.wait()

        return yaptide_response(message="Converted Input Files", code=200, content=result)

post staticmethod

post(_)

Method handling input_model files convertion

Source code in yaptide/routes/celery_routes.py
242
243
244
245
246
247
248
249
250
251
252
253
254
@staticmethod
@requires_auth()
def post(_: UserModel):
    """Method handling input_model files convertion"""
    payload_dict: dict = request.get_json(force=True)
    if not payload_dict:
        return yaptide_response(message="No JSON in body", code=400)

    # Rework in later PRs to match pattern from jobs endpoint
    job = convert_input_files.delay(payload_dict=payload_dict)
    result: dict = job.wait()

    return yaptide_response(message="Converted Input Files", code=200, content=result)

JobsDirect

Bases: Resource

Class responsible for simulations run directly with celery

Source code in yaptide/routes/celery_routes.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class JobsDirect(Resource):
    """Class responsible for simulations run directly with celery"""

    @staticmethod
    @requires_auth()
    def post(user: UserModel):
        """Submit simulation job to celery"""
        payload_dict: dict = request.get_json(force=True)
        if not payload_dict:
            return yaptide_response(message="No JSON in body", code=400)

        required_keys = {"sim_type", "ntasks", "input_type"}

        if required_keys != required_keys.intersection(set(payload_dict.keys())):
            diff = required_keys.difference(set(payload_dict.keys()))
            return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400)

        input_type = determine_input_type(payload_dict)

        if input_type is None:
            return error_validation_response()

        # create a new simulation in the database, not waiting for the job to finish
        job_id = datetime.now().strftime('%Y%m%d-%H%M%S-') + str(uuid4()) + PlatformType.DIRECT.value
        simulation = CelerySimulationModel(user_id=user.id,
                                           job_id=job_id,
                                           sim_type=payload_dict["sim_type"],
                                           input_type=input_type,
                                           title=payload_dict.get("title", ''))
        add_object_to_db(simulation)
        update_key = encode_simulation_auth_token(simulation.id)
        logging.info("Simulation %d created and inserted into DB", simulation.id)
        logging.debug("Update key set to %s", update_key)

        input_dict = make_input_dict(payload_dict=payload_dict, input_type=input_type)
        # create tasks in the database in the default PENDING state
        celery_ids = [str(uuid4()) for _ in range(payload_dict["ntasks"])]
        for i in range(payload_dict["ntasks"]):
            task = CeleryTaskModel(simulation_id=simulation.id, task_id=i, celery_id=celery_ids[i])
            add_object_to_db(task, make_commit=False)
        make_commit_to_db()

        # submit the asynchronous job to celery
        simulation.merge_id = run_job(input_dict["input_files"], update_key, simulation.id, payload_dict["ntasks"],
                                      celery_ids, payload_dict["sim_type"])

        input_model = InputModel(simulation_id=simulation.id)
        input_model.data = input_dict
        add_object_to_db(input_model)
        if simulation.update_state({"job_state": EntityState.PENDING.value}):
            make_commit_to_db()

        return yaptide_response(message="Task started", code=202, content={'job_id': simulation.job_id})

    class APIParametersSchema(Schema):
        """Class specifies API parameters for GET and DELETE request"""

        job_id = fields.String()

    @staticmethod
    @requires_auth()
    def get(user: UserModel):
        """Method returning job status and results"""
        # validate request parameters and handle errors
        schema = JobsDirect.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return yaptide_response(message="Wrong parameters", code=400, content=errors)
        param_dict: dict = schema.load(request.args)

        # get job_id from request parameters and check if user owns this job
        job_id = param_dict['job_id']
        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        # find appropriate simulation in the database
        simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

        tasks = fetch_celery_tasks_by_sim_id(sim_id=simulation.id)

        job_tasks_status = [task.get_status_dict() for task in tasks]

        if simulation.job_state in (EntityState.COMPLETED.value, EntityState.FAILED.value):
            return yaptide_response(message=f"Job state: {simulation.job_state}",
                                    code=200,
                                    content={
                                        "job_state": simulation.job_state,
                                        "job_tasks_status": job_tasks_status,
                                    })

        job_info = {"job_state": simulation.job_state}
        status_counter = Counter([task["task_state"] for task in job_tasks_status])
        if status_counter[EntityState.PENDING.value] == len(job_tasks_status):
            job_info["job_state"] = EntityState.PENDING.value
        elif status_counter[EntityState.FAILED.value] == len(job_tasks_status):
            job_info["job_state"] = EntityState.FAILED.value
        elif status_counter[EntityState.RUNNING.value] > 0:
            job_info["job_state"] = EntityState.RUNNING.value

        # if simulation is not found, return error
        update_simulation_state(simulation=simulation, update_dict=job_info)

        job_info["job_tasks_status"] = job_tasks_status

        return yaptide_response(message=f"Job state: {job_info['job_state']}", code=200, content=job_info)

    @staticmethod
    @requires_auth()
    def delete(user: UserModel):
        """Method canceling simulation and returning status of this action"""
        schema = JobsDirect.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return error_validation_response(content=errors)
        params_dict: dict = schema.load(request.args)

        job_id = params_dict['job_id']

        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

        if simulation.job_state in (EntityState.COMPLETED.value, EntityState.FAILED.value, EntityState.CANCELED.value,
                                    EntityState.UNKNOWN.value):
            return yaptide_response(message=f"Cannot cancel job which is in {simulation.job_state} state",
                                    code=200,
                                    content={
                                        "job_state": simulation.job_state,
                                    })

        tasks = fetch_celery_tasks_by_sim_id(sim_id=simulation.id)
        celery_ids = [
            task.celery_id for task in tasks
            if task.task_state in [EntityState.PENDING.value, EntityState.RUNNING.value, EntityState.UNKNOWN.value]
        ]

        # The merge_id is canceled first because merge task starts after run simulation tasks are finished/canceled.
        # We don't want it to run accidentally.
        celery_app.control.revoke(simulation.merge_id, terminate=True, signal="SIGINT")
        celery_app.control.revoke(celery_ids, terminate=True, signal="SIGINT")
        update_simulation_state(simulation=simulation, update_dict={"job_state": EntityState.CANCELED.value})
        for task in tasks:
            if task.task_state in [EntityState.PENDING.value, EntityState.RUNNING.value]:
                update_task_state(task=task, update_dict={"task_state": EntityState.CANCELED.value})

        terminate_unfinished_tasks.delay(simulation_id=simulation.id)
        return yaptide_response(message="Cancelled sucessfully", code=200)

APIParametersSchema

Bases: Schema

Class specifies API parameters for GET and DELETE request

Source code in yaptide/routes/celery_routes.py
81
82
83
84
class APIParametersSchema(Schema):
    """Class specifies API parameters for GET and DELETE request"""

    job_id = fields.String()
job_id class-attribute instance-attribute
job_id = String()

delete staticmethod

delete(user)

Method canceling simulation and returning status of this action

Source code in yaptide/routes/celery_routes.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@staticmethod
@requires_auth()
def delete(user: UserModel):
    """Method canceling simulation and returning status of this action"""
    schema = JobsDirect.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return error_validation_response(content=errors)
    params_dict: dict = schema.load(request.args)

    job_id = params_dict['job_id']

    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

    if simulation.job_state in (EntityState.COMPLETED.value, EntityState.FAILED.value, EntityState.CANCELED.value,
                                EntityState.UNKNOWN.value):
        return yaptide_response(message=f"Cannot cancel job which is in {simulation.job_state} state",
                                code=200,
                                content={
                                    "job_state": simulation.job_state,
                                })

    tasks = fetch_celery_tasks_by_sim_id(sim_id=simulation.id)
    celery_ids = [
        task.celery_id for task in tasks
        if task.task_state in [EntityState.PENDING.value, EntityState.RUNNING.value, EntityState.UNKNOWN.value]
    ]

    # The merge_id is canceled first because merge task starts after run simulation tasks are finished/canceled.
    # We don't want it to run accidentally.
    celery_app.control.revoke(simulation.merge_id, terminate=True, signal="SIGINT")
    celery_app.control.revoke(celery_ids, terminate=True, signal="SIGINT")
    update_simulation_state(simulation=simulation, update_dict={"job_state": EntityState.CANCELED.value})
    for task in tasks:
        if task.task_state in [EntityState.PENDING.value, EntityState.RUNNING.value]:
            update_task_state(task=task, update_dict={"task_state": EntityState.CANCELED.value})

    terminate_unfinished_tasks.delay(simulation_id=simulation.id)
    return yaptide_response(message="Cancelled sucessfully", code=200)

get staticmethod

get(user)

Method returning job status and results

Source code in yaptide/routes/celery_routes.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@staticmethod
@requires_auth()
def get(user: UserModel):
    """Method returning job status and results"""
    # validate request parameters and handle errors
    schema = JobsDirect.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return yaptide_response(message="Wrong parameters", code=400, content=errors)
    param_dict: dict = schema.load(request.args)

    # get job_id from request parameters and check if user owns this job
    job_id = param_dict['job_id']
    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    # find appropriate simulation in the database
    simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

    tasks = fetch_celery_tasks_by_sim_id(sim_id=simulation.id)

    job_tasks_status = [task.get_status_dict() for task in tasks]

    if simulation.job_state in (EntityState.COMPLETED.value, EntityState.FAILED.value):
        return yaptide_response(message=f"Job state: {simulation.job_state}",
                                code=200,
                                content={
                                    "job_state": simulation.job_state,
                                    "job_tasks_status": job_tasks_status,
                                })

    job_info = {"job_state": simulation.job_state}
    status_counter = Counter([task["task_state"] for task in job_tasks_status])
    if status_counter[EntityState.PENDING.value] == len(job_tasks_status):
        job_info["job_state"] = EntityState.PENDING.value
    elif status_counter[EntityState.FAILED.value] == len(job_tasks_status):
        job_info["job_state"] = EntityState.FAILED.value
    elif status_counter[EntityState.RUNNING.value] > 0:
        job_info["job_state"] = EntityState.RUNNING.value

    # if simulation is not found, return error
    update_simulation_state(simulation=simulation, update_dict=job_info)

    job_info["job_tasks_status"] = job_tasks_status

    return yaptide_response(message=f"Job state: {job_info['job_state']}", code=200, content=job_info)

post staticmethod

post(user)

Submit simulation job to celery

Source code in yaptide/routes/celery_routes.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@staticmethod
@requires_auth()
def post(user: UserModel):
    """Submit simulation job to celery"""
    payload_dict: dict = request.get_json(force=True)
    if not payload_dict:
        return yaptide_response(message="No JSON in body", code=400)

    required_keys = {"sim_type", "ntasks", "input_type"}

    if required_keys != required_keys.intersection(set(payload_dict.keys())):
        diff = required_keys.difference(set(payload_dict.keys()))
        return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400)

    input_type = determine_input_type(payload_dict)

    if input_type is None:
        return error_validation_response()

    # create a new simulation in the database, not waiting for the job to finish
    job_id = datetime.now().strftime('%Y%m%d-%H%M%S-') + str(uuid4()) + PlatformType.DIRECT.value
    simulation = CelerySimulationModel(user_id=user.id,
                                       job_id=job_id,
                                       sim_type=payload_dict["sim_type"],
                                       input_type=input_type,
                                       title=payload_dict.get("title", ''))
    add_object_to_db(simulation)
    update_key = encode_simulation_auth_token(simulation.id)
    logging.info("Simulation %d created and inserted into DB", simulation.id)
    logging.debug("Update key set to %s", update_key)

    input_dict = make_input_dict(payload_dict=payload_dict, input_type=input_type)
    # create tasks in the database in the default PENDING state
    celery_ids = [str(uuid4()) for _ in range(payload_dict["ntasks"])]
    for i in range(payload_dict["ntasks"]):
        task = CeleryTaskModel(simulation_id=simulation.id, task_id=i, celery_id=celery_ids[i])
        add_object_to_db(task, make_commit=False)
    make_commit_to_db()

    # submit the asynchronous job to celery
    simulation.merge_id = run_job(input_dict["input_files"], update_key, simulation.id, payload_dict["ntasks"],
                                  celery_ids, payload_dict["sim_type"])

    input_model = InputModel(simulation_id=simulation.id)
    input_model.data = input_dict
    add_object_to_db(input_model)
    if simulation.update_state({"job_state": EntityState.PENDING.value}):
        make_commit_to_db()

    return yaptide_response(message="Task started", code=202, content={'job_id': simulation.job_id})

ResultsDirect

Bases: Resource

Class responsible for returning simulation results

Source code in yaptide/routes/celery_routes.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class ResultsDirect(Resource):
    """Class responsible for returning simulation results"""

    class APIParametersSchema(Schema):
        """Class specifies API parameters"""

        job_id = fields.String()

    @staticmethod
    @requires_auth()
    def get(user: UserModel):
        """Method returning job status and results"""
        schema = ResultsDirect.APIParametersSchema()
        errors: dict[str, list[str]] = schema.validate(request.args)
        if errors:
            return yaptide_response(message="Wrong parameters", code=400, content=errors)
        param_dict: dict = schema.load(request.args)

        job_id = param_dict['job_id']
        is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
        if not is_owned:
            return yaptide_response(message=error_message, code=res_code)

        simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

        estimators: list[EstimatorModel] = fetch_estimators_by_sim_id(sim_id=simulation.id)
        if len(estimators) > 0:
            logging.debug("Returning results from database")
            result_estimators = []
            for estimator in estimators:
                pages: list[PageModel] = fetch_pages_by_estimator_id(est_id=estimator.id)
                estimator_dict = {
                    "metadata": estimator.data,
                    "name": estimator.name,
                    "pages": [page.data for page in pages]
                }
                result_estimators.append(estimator_dict)
            return yaptide_response(message=f"Results for job: {job_id}",
                                    code=200,
                                    content={"estimators": result_estimators})

        result: dict = get_job_results(job_id=job_id)
        if "estimators" not in result:
            logging.debug("Results for job %s are unavailable", job_id)
            return yaptide_response(message="Results are unavailable", code=404, content=result)

        for estimator_dict in result["estimators"]:
            estimator = EstimatorModel(name=estimator_dict["name"], simulation_id=simulation.id)
            estimator.data = estimator_dict["metadata"]
            add_object_to_db(estimator)
            for page_dict in estimator_dict["pages"]:
                page = PageModel(estimator_id=estimator.id, page_number=int(page_dict["metadata"]["page_number"]))
                page.data = page_dict
                add_object_to_db(page, False)
            make_commit_to_db()

        logging.debug("Returning results from Celery")
        return yaptide_response(message=f"Results for job: {job_id}, results from Celery", code=200, content=result)

APIParametersSchema

Bases: Schema

Class specifies API parameters

Source code in yaptide/routes/celery_routes.py
182
183
184
185
class APIParametersSchema(Schema):
    """Class specifies API parameters"""

    job_id = fields.String()
job_id class-attribute instance-attribute
job_id = String()

get staticmethod

get(user)

Method returning job status and results

Source code in yaptide/routes/celery_routes.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@staticmethod
@requires_auth()
def get(user: UserModel):
    """Method returning job status and results"""
    schema = ResultsDirect.APIParametersSchema()
    errors: dict[str, list[str]] = schema.validate(request.args)
    if errors:
        return yaptide_response(message="Wrong parameters", code=400, content=errors)
    param_dict: dict = schema.load(request.args)

    job_id = param_dict['job_id']
    is_owned, error_message, res_code = check_if_job_is_owned_and_exist(job_id=job_id, user=user)
    if not is_owned:
        return yaptide_response(message=error_message, code=res_code)

    simulation = fetch_celery_simulation_by_job_id(job_id=job_id)

    estimators: list[EstimatorModel] = fetch_estimators_by_sim_id(sim_id=simulation.id)
    if len(estimators) > 0:
        logging.debug("Returning results from database")
        result_estimators = []
        for estimator in estimators:
            pages: list[PageModel] = fetch_pages_by_estimator_id(est_id=estimator.id)
            estimator_dict = {
                "metadata": estimator.data,
                "name": estimator.name,
                "pages": [page.data for page in pages]
            }
            result_estimators.append(estimator_dict)
        return yaptide_response(message=f"Results for job: {job_id}",
                                code=200,
                                content={"estimators": result_estimators})

    result: dict = get_job_results(job_id=job_id)
    if "estimators" not in result:
        logging.debug("Results for job %s are unavailable", job_id)
        return yaptide_response(message="Results are unavailable", code=404, content=result)

    for estimator_dict in result["estimators"]:
        estimator = EstimatorModel(name=estimator_dict["name"], simulation_id=simulation.id)
        estimator.data = estimator_dict["metadata"]
        add_object_to_db(estimator)
        for page_dict in estimator_dict["pages"]:
            page = PageModel(estimator_id=estimator.id, page_number=int(page_dict["metadata"]["page_number"]))
            page.data = page_dict
            add_object_to_db(page, False)
        make_commit_to_db()

    logging.debug("Returning results from Celery")
    return yaptide_response(message=f"Results for job: {job_id}, results from Celery", code=200, content=result)