Coverage for yaptide/batch/simulation_data_sender.py: 0%
60 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
1import argparse
2import json
3import logging
4import signal
5import ssl
6from pathlib import Path
7from urllib import request
10def send_simulation_results(output_Path: Path, simulation_id: int, update_key: str, backend_url: str):
11 """Sends simulation results to backend"""
12 if not backend_url:
13 logging.error("Backend url not specified")
14 return
16 estimators = []
17 for filename in sorted(output_Path.iterdir()):
18 if filename.suffix != ".json":
19 continue
20 with open(filename, "r") as json_file:
21 est_dict = json.load(json_file)
22 est_dict["name"] = filename.stem
23 estimators.append(est_dict)
25 dict_to_send = {
26 "simulation_id": simulation_id,
27 "update_key": update_key,
28 "estimators": estimators,
29 }
30 results_url = f"{backend_url}/results"
31 context = ssl.SSLContext()
33 req = request.Request(results_url,
34 json.dumps(dict_to_send).encode(), {'Content-Type': 'application/json'},
35 method='POST')
37 try:
38 with request.urlopen(req, context=context) as res: # skipcq: BAN-B310
39 if res.getcode() != 202:
40 logging.warning("Sending update to %s failed", results_url)
41 except Exception as e: # skipcq: PYL-W0703
42 logging.error("Sending update to %s failed: %s", results_url, str(e))
45def send_simulation_state_update(simulation_id: int, update_key: str, backend_url: str, simulation_state: str):
46 """Sends simulation state to backend"""
47 dict_to_send = {"sim_id": simulation_id, "job_state": simulation_state, "update_key": update_key}
48 jobs_url = f"{backend_url}/jobs"
49 context = ssl.SSLContext()
51 req = request.Request(jobs_url,
52 json.dumps(dict_to_send).encode(), {'Content-Type': 'application/json'},
53 method='POST')
54 try:
55 with request.urlopen(req, context=context) as res: # skipcq: BAN-B310
56 if res.getcode() != 202:
57 logging.warning("Sending update to %s failed", jobs_url)
58 except Exception as e: # skipcq: PYL-W0703
59 logging.error("Sending update to %s failed: %s", jobs_url, str(e))
62if __name__ == "__main__":
63 # This script allows sending simulation data (either results or state updates) to a backend server.
64 # The user must specify the simulation ID, update_key, and backend URL, and either:
65 # - directory containing JSON result files (`--results_dir`) to send simulation results.
66 # - simulation state (`--state`) to send a state update.
67 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
69 logging.basicConfig(level=logging.INFO,
70 format="%(asctime)s %(levelname)s %(message)s",
71 handlers=[logging.StreamHandler()])
72 parser = argparse.ArgumentParser()
73 parser.add_argument("--sim_id", type=int, required=True)
74 parser.add_argument("--update_key", type=str, required=True)
75 parser.add_argument("--backend_url", type=str, required=True)
76 parser.add_argument("--output_dir", type=str, required=False)
77 parser.add_argument("--simulation_state", type=str, required=False)
78 args = parser.parse_args()
80 logging.info("sim_id %s", args.sim_id)
81 logging.info("update_key %s", args.update_key)
82 logging.info("backend_url %s", args.backend_url)
84 if args.output_dir:
85 logging.info("Sending simulation results for directory: %s", args.output_dir)
86 send_simulation_results(output_Path=Path(args.output_dir),
87 simulation_id=args.sim_id,
88 update_key=args.update_key,
89 backend_url=args.backend_url)
90 elif args.simulation_state:
91 logging.info("No output_dir provided, sending simulation state update %s", args.simulation_state)
92 send_simulation_state_update(simulation_id=args.sim_id,
93 update_key=args.update_key,
94 backend_url=args.backend_url,
95 simulation_state=args.simulation_state)
96 else:
97 logging.error("Either --results_dir or --simulation_state must be provided.")