Class responsible for returning user's available clusters
Source code in yaptide/routes/batch_routes.py
182183184185186187188189190191192193194195
classClusters(Resource):"""Class responsible for returning user's available clusters"""@staticmethod@requires_auth()defget(user:KeycloakUserModel):"""Method returning clusters"""ifnotisinstance(user,KeycloakUserModel):returnyaptide_response(message="User is not allowed to use this endpoint",code=403)clusters=fetch_all_clusters()result={'clusters':[{'cluster_name':cluster.cluster_name}forclusterinclusters]}returnyaptide_response(message='Available clusters',code=200,content=result)
getstaticmethod
get(user)
Method returning clusters
Source code in yaptide/routes/batch_routes.py
185186187188189190191192193194195
@staticmethod@requires_auth()defget(user:KeycloakUserModel):"""Method returning clusters"""ifnotisinstance(user,KeycloakUserModel):returnyaptide_response(message="User is not allowed to use this endpoint",code=403)clusters=fetch_all_clusters()result={'clusters':[{'cluster_name':cluster.cluster_name}forclusterinclusters]}returnyaptide_response(message='Available clusters',code=200,content=result)
JobsBatch
Bases: Resource
Class responsible for jobs via direct slurm connection
classJobsBatch(Resource):"""Class responsible for jobs via direct slurm connection"""@staticmethod@requires_auth()defpost(user:KeycloakUserModel):"""Method handling running shieldhit with batch"""ifnotisinstance(user,KeycloakUserModel):returnyaptide_response(message="User is not allowed to use this endpoint",code=403)payload_dict:dict=request.get_json(force=True)ifnotpayload_dict:returnyaptide_response(message="No JSON in body",code=400)required_keys={"sim_type","ntasks","input_type"}ifrequired_keys!=required_keys.intersection(set(payload_dict.keys())):diff=required_keys.difference(set(payload_dict.keys()))returnyaptide_response(message=f"Missing keys in JSON payload: {diff}",code=400)input_type=determine_input_type(payload_dict)ifinput_typeisNone:returnerror_validation_response()clusters=fetch_all_clusters()iflen(clusters)<1:returnerror_validation_response({"message":"No clusters are available"})filtered_clusters:list[ClusterModel]=[]if"batch_options"inpayload_dictand"cluster_name"inpayload_dict["batch_options"]:cluster_name=payload_dict["batch_options"]["cluster_name"]filtered_clusters=[clusterforclusterinclustersifcluster.cluster_name==cluster_name]cluster=filtered_clusters[0]iflen(filtered_clusters)>0elseclusters[0]# create a new simulation in the database, not waiting for the job to finishjob_id=datetime.now().strftime('%Y%m%d-%H%M%S-')+str(uuid.uuid4())+PlatformType.BATCH.value# skipcq: PYL-E1123simulation=BatchSimulationModel(user_id=user.id,cluster_id=cluster.id,job_id=job_id,sim_type=payload_dict["sim_type"],input_type=input_type,title=payload_dict.get("title",''))add_object_to_db(simulation)update_key=encode_simulation_auth_token(simulation.id)input_dict=make_input_dict(payload_dict=payload_dict,input_type=input_type)submit_job.delay(payload_dict=payload_dict,files_dict=input_dict["input_files"],userId=user.id,clusterId=cluster.id,sim_id=simulation.id,update_key=update_key)foriinrange(payload_dict["ntasks"]):task=BatchTaskModel(simulation_id=simulation.id,task_id=str(i+1))add_object_to_db(task,False)input_model=InputModel(simulation_id=simulation.id)input_model.data=input_dictadd_object_to_db(input_model)ifsimulation.update_state({"job_state":EntityState.PENDING.value}):make_commit_to_db()returnyaptide_response(message="Job waiting for submission",code=202,content={'job_id':simulation.job_id})classAPIParametersSchema(Schema):"""Class specifies API parameters"""job_id=fields.String()@staticmethod@requires_auth()defget(user:KeycloakUserModel):"""Method geting job's result"""ifnotisinstance(user,KeycloakUserModel):returnyaptide_response(message="User is not allowed to use this endpoint",code=403)schema=JobsBatch.APIParametersSchema()errors:dict[str,list[str]]=schema.validate(request.args)iferrors:returnerror_validation_response(content=errors)params_dict:dict=schema.load(request.args)job_id:str=params_dict["job_id"]is_owned,error_message,res_code=check_if_job_is_owned_and_exist(job_id=job_id,user=user)ifnotis_owned:returnyaptide_response(message=error_message,code=res_code)simulation=fetch_batch_simulation_by_job_id(job_id=job_id)tasks=fetch_batch_tasks_by_sim_id(sim_id=simulation.id)job_tasks_status=[task.get_status_dict()fortaskintasks]ifsimulation.job_statein(EntityState.COMPLETED.value,EntityState.FAILED.value):returnyaptide_response(message=f"Job state: {simulation.job_state}",code=200,content={"job_state":simulation.job_state,"job_tasks_status":job_tasks_status,})cluster=fetch_cluster_by_id(cluster_id=simulation.cluster_id)job_info=get_job_status(simulation=simulation,user=user,cluster=cluster)update_simulation_state(simulation=simulation,update_dict=job_info)job_info.pop("end_time",None)job_info["job_tasks_status"]=job_tasks_statusreturnyaptide_response(message="",code=200,content=job_info)@staticmethod@requires_auth()defdelete(user:KeycloakUserModel):"""Method canceling job"""ifnotisinstance(user,KeycloakUserModel):returnyaptide_response(message="User is not allowed to use this endpoint",code=403)schema=JobsBatch.APIParametersSchema()errors:dict[str,list[str]]=schema.validate(request.args)iferrors:returnerror_validation_response(content=errors)params_dict:dict=schema.load(request.args)job_id:str=params_dict["job_id"]is_owned,error_message,res_code=check_if_job_is_owned_and_exist(job_id=job_id,user=user)ifnotis_owned:returnyaptide_response(message=error_message,code=res_code)simulation=fetch_batch_simulation_by_job_id(job_id=job_id)ifsimulation.job_statein(EntityState.COMPLETED.value,EntityState.FAILED.value,EntityState.CANCELED.value,EntityState.UNKNOWN.value):returnyaptide_response(message=f"Cannot cancel job which is in {simulation.job_state} state",code=200,content={"job_state":simulation.job_state,})cluster=fetch_cluster_by_id(cluster_id=simulation.cluster_id)result,status_code=delete_job(simulation=simulation,user=user,cluster=cluster)ifstatus_code!=200:returnerror_internal_response(content=result)update_simulation_state(simulation=simulation,update_dict={"job_state":EntityState.CANCELED.value})tasks=fetch_batch_tasks_by_sim_id(sim_id=simulation.id)fortaskintasks:update_task_state(task=task,update_dict={"task_state":EntityState.CANCELED.value})returnyaptide_response(message="",code=status_code,content=result)
APIParametersSchema
Bases: Schema
Class specifies API parameters
Source code in yaptide/routes/batch_routes.py
90919293
classAPIParametersSchema(Schema):"""Class specifies API parameters"""job_id=fields.String()
@staticmethod@requires_auth()defdelete(user:KeycloakUserModel):"""Method canceling job"""ifnotisinstance(user,KeycloakUserModel):returnyaptide_response(message="User is not allowed to use this endpoint",code=403)schema=JobsBatch.APIParametersSchema()errors:dict[str,list[str]]=schema.validate(request.args)iferrors:returnerror_validation_response(content=errors)params_dict:dict=schema.load(request.args)job_id:str=params_dict["job_id"]is_owned,error_message,res_code=check_if_job_is_owned_and_exist(job_id=job_id,user=user)ifnotis_owned:returnyaptide_response(message=error_message,code=res_code)simulation=fetch_batch_simulation_by_job_id(job_id=job_id)ifsimulation.job_statein(EntityState.COMPLETED.value,EntityState.FAILED.value,EntityState.CANCELED.value,EntityState.UNKNOWN.value):returnyaptide_response(message=f"Cannot cancel job which is in {simulation.job_state} state",code=200,content={"job_state":simulation.job_state,})cluster=fetch_cluster_by_id(cluster_id=simulation.cluster_id)result,status_code=delete_job(simulation=simulation,user=user,cluster=cluster)ifstatus_code!=200:returnerror_internal_response(content=result)update_simulation_state(simulation=simulation,update_dict={"job_state":EntityState.CANCELED.value})tasks=fetch_batch_tasks_by_sim_id(sim_id=simulation.id)fortaskintasks:update_task_state(task=task,update_dict={"task_state":EntityState.CANCELED.value})returnyaptide_response(message="",code=status_code,content=result)
@staticmethod@requires_auth()defget(user:KeycloakUserModel):"""Method geting job's result"""ifnotisinstance(user,KeycloakUserModel):returnyaptide_response(message="User is not allowed to use this endpoint",code=403)schema=JobsBatch.APIParametersSchema()errors:dict[str,list[str]]=schema.validate(request.args)iferrors:returnerror_validation_response(content=errors)params_dict:dict=schema.load(request.args)job_id:str=params_dict["job_id"]is_owned,error_message,res_code=check_if_job_is_owned_and_exist(job_id=job_id,user=user)ifnotis_owned:returnyaptide_response(message=error_message,code=res_code)simulation=fetch_batch_simulation_by_job_id(job_id=job_id)tasks=fetch_batch_tasks_by_sim_id(sim_id=simulation.id)job_tasks_status=[task.get_status_dict()fortaskintasks]ifsimulation.job_statein(EntityState.COMPLETED.value,EntityState.FAILED.value):returnyaptide_response(message=f"Job state: {simulation.job_state}",code=200,content={"job_state":simulation.job_state,"job_tasks_status":job_tasks_status,})cluster=fetch_cluster_by_id(cluster_id=simulation.cluster_id)job_info=get_job_status(simulation=simulation,user=user,cluster=cluster)update_simulation_state(simulation=simulation,update_dict=job_info)job_info.pop("end_time",None)job_info["job_tasks_status"]=job_tasks_statusreturnyaptide_response(message="",code=200,content=job_info)
@staticmethod@requires_auth()defpost(user:KeycloakUserModel):"""Method handling running shieldhit with batch"""ifnotisinstance(user,KeycloakUserModel):returnyaptide_response(message="User is not allowed to use this endpoint",code=403)payload_dict:dict=request.get_json(force=True)ifnotpayload_dict:returnyaptide_response(message="No JSON in body",code=400)required_keys={"sim_type","ntasks","input_type"}ifrequired_keys!=required_keys.intersection(set(payload_dict.keys())):diff=required_keys.difference(set(payload_dict.keys()))returnyaptide_response(message=f"Missing keys in JSON payload: {diff}",code=400)input_type=determine_input_type(payload_dict)ifinput_typeisNone:returnerror_validation_response()clusters=fetch_all_clusters()iflen(clusters)<1:returnerror_validation_response({"message":"No clusters are available"})filtered_clusters:list[ClusterModel]=[]if"batch_options"inpayload_dictand"cluster_name"inpayload_dict["batch_options"]:cluster_name=payload_dict["batch_options"]["cluster_name"]filtered_clusters=[clusterforclusterinclustersifcluster.cluster_name==cluster_name]cluster=filtered_clusters[0]iflen(filtered_clusters)>0elseclusters[0]# create a new simulation in the database, not waiting for the job to finishjob_id=datetime.now().strftime('%Y%m%d-%H%M%S-')+str(uuid.uuid4())+PlatformType.BATCH.value# skipcq: PYL-E1123simulation=BatchSimulationModel(user_id=user.id,cluster_id=cluster.id,job_id=job_id,sim_type=payload_dict["sim_type"],input_type=input_type,title=payload_dict.get("title",''))add_object_to_db(simulation)update_key=encode_simulation_auth_token(simulation.id)input_dict=make_input_dict(payload_dict=payload_dict,input_type=input_type)submit_job.delay(payload_dict=payload_dict,files_dict=input_dict["input_files"],userId=user.id,clusterId=cluster.id,sim_id=simulation.id,update_key=update_key)foriinrange(payload_dict["ntasks"]):task=BatchTaskModel(simulation_id=simulation.id,task_id=str(i+1))add_object_to_db(task,False)input_model=InputModel(simulation_id=simulation.id)input_model.data=input_dictadd_object_to_db(input_model)ifsimulation.update_state({"job_state":EntityState.PENDING.value}):make_commit_to_db()returnyaptide_response(message="Job waiting for submission",code=202,content={'job_id':simulation.job_id})