Skip to content

simulation_data_sender

batch.simulation_data_sender

args module-attribute

args = parse_args()

parser module-attribute

parser = ArgumentParser()

send_simulation_results

send_simulation_results(
    output_Path, simulation_id, update_key, backend_url
)

Sends simulation results to backend

Source code in yaptide/batch/simulation_data_sender.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def send_simulation_results(output_Path: Path, simulation_id: int, update_key: str, backend_url: str):
    """Sends simulation results to backend"""
    if not backend_url:
        logging.error("Backend url not specified")
        return

    estimators = []
    for filename in sorted(output_Path.iterdir()):
        if filename.suffix != ".json":
            continue
        with open(filename, "r") as json_file:
            est_dict = json.load(json_file)
            est_dict["name"] = filename.stem
            estimators.append(est_dict)

    dict_to_send = {
        "simulation_id": simulation_id,
        "update_key": update_key,
        "estimators": estimators,
    }
    results_url = f"{backend_url}/results"
    context = ssl.SSLContext()

    req = request.Request(results_url,
                          json.dumps(dict_to_send).encode(), {'Content-Type': 'application/json'},
                          method='POST')

    try:
        with request.urlopen(req, context=context) as res:  # skipcq: BAN-B310
            if res.getcode() != 202:
                logging.warning("Sending update to %s failed", results_url)
    except Exception as e:  # skipcq: PYL-W0703
        logging.error("Sending update to %s failed: %s", results_url, str(e))

send_simulation_state_update

send_simulation_state_update(
    simulation_id, update_key, backend_url, simulation_state
)

Sends simulation state to backend

Source code in yaptide/batch/simulation_data_sender.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def send_simulation_state_update(simulation_id: int, update_key: str, backend_url: str, simulation_state: str):
    """Sends simulation state to backend"""
    dict_to_send = {"sim_id": simulation_id, "job_state": simulation_state, "update_key": update_key}
    jobs_url = f"{backend_url}/jobs"
    context = ssl.SSLContext()

    req = request.Request(jobs_url,
                          json.dumps(dict_to_send).encode(), {'Content-Type': 'application/json'},
                          method='POST')
    try:
        with request.urlopen(req, context=context) as res:  # skipcq: BAN-B310
            if res.getcode() != 202:
                logging.warning("Sending update to %s failed", jobs_url)
    except Exception as e:  # skipcq: PYL-W0703
        logging.error("Sending update to %s failed: %s", jobs_url, str(e))