Skip to content

watcher

batch.watcher

COMPLETE_MATCH module-attribute

COMPLETE_MATCH = "\\bRun time:\\s*\\d*\\s*hour.*\\d*\\s*minute.*\\d*\\s*second.*\\b"

REQUESTED_MATCH module-attribute

REQUESTED_MATCH = '\\bRequested number of primaries NSTAT'

RUN_MATCH module-attribute

RUN_MATCH = "\\bPrimary particle no.\\s*\\d*\\s*ETR:\\s*\\d*\\s*hour.*\\d*\\s*minute.*\\d*\\s*second.*\\b"

TIMEOUT_MATCH module-attribute

TIMEOUT_MATCH = '\\bTimeout occured'

args module-attribute

args = parse_args()

log_level module-attribute

log_level = INFO

parser module-attribute

parser = ArgumentParser()

log_generator

log_generator(thefile, event=None, timeout=3600)

Generator equivalent to tail -f Linux command. Yields new lines appended to the end of the file. It no new line appears in timeout seconds, generator stops. Main purpose is monitoring of the log files

Source code in yaptide/batch/watcher.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def log_generator(thefile: TextIOWrapper, event: threading.Event = None, timeout: int = 3600) -> str:
    """
    Generator equivalent to `tail -f` Linux command.
    Yields new lines appended to the end of the file.
    It no new line appears in `timeout` seconds, generator stops.
    Main purpose is monitoring of the log files
    """
    sleep_counter = 0
    while True:
        if event and event.is_set():
            break
        if thefile is None:
            return "File not found"
        line = thefile.readline()
        if not line:
            time.sleep(1)
            sleep_counter += 1
            if sleep_counter >= timeout:
                return "Timeout occured"
            continue
        sleep_counter = 0
        yield line

read_file

read_file(
    filepath, sim_id, task_id, update_key, backend_url
)

Monitors log file of certain task

Source code in yaptide/batch/watcher.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def read_file(filepath: Path, sim_id: int, task_id: int, update_key: str, backend_url: str):  # skipcq: PYL-W0613
    """Monitors log file of certain task"""
    logging.debug("Started monitoring, simulation id: %d, task id: %s", sim_id, task_id)
    logfile = None
    update_time = 0
    for _ in range(30):  # 30 stands for maximum attempts
        try:
            logfile = open(filepath)  # skipcq: PTC-W6004
            break
        except FileNotFoundError:
            time.sleep(1)

    if logfile is None:
        logging.debug("Log file for task %s not found", task_id)
        up_dict = {  # skipcq: PYL-W0612
            "task_state": "FAILED",
            "end_time": datetime.utcnow().isoformat(sep=" ")
        }
        send_task_update(sim_id=sim_id,
                         task_id=task_id,
                         update_key=update_key,
                         update_dict=up_dict,
                         backend_url=backend_url)
        logging.debug("Update for task: %d - FAILED", task_id)
        return

    loglines = log_generator(logfile, threading.Event())
    for line in loglines:
        utc_now = datetime.utcnow()
        if re.search(RUN_MATCH, line):
            logging.debug("Found RUN_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id)
            if utc_now.timestamp() - update_time < 2:  # hardcoded 2 seconds to avoid spamming
                logging.debug("Skipping update, too often")
                continue
            update_time = utc_now.timestamp()
            splitted = line.split()
            up_dict = {  # skipcq: PYL-W0612
                "simulated_primaries": int(splitted[3]),
                "estimated_time": int(splitted[9]) + int(splitted[7]) * 60 + int(splitted[5]) * 3600
            }
            send_task_update(sim_id=sim_id,
                             task_id=task_id,
                             update_key=update_key,
                             update_dict=up_dict,
                             backend_url=backend_url)
            logging.debug("Update for task: %d - simulated primaries: %s", task_id, splitted[3])

        elif re.search(REQUESTED_MATCH, line):
            logging.debug("Found REQUESTED_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id)
            splitted = line.split(": ")
            up_dict = {  # skipcq: PYL-W0612
                "simulated_primaries": 0,
                "requested_primaries": int(splitted[1]),
                "start_time": utc_now.isoformat(sep=" "),
                "task_state": "RUNNING"
            }
            send_task_update(sim_id=sim_id,
                             task_id=task_id,
                             update_key=update_key,
                             update_dict=up_dict,
                             backend_url=backend_url)
            logging.debug("Update for task: %d - RUNNING", task_id)

        elif re.search(COMPLETE_MATCH, line):
            logging.debug("Found COMPLETE_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id)
            splitted = line.split()
            up_dict = {  # skipcq: PYL-W0612
                "end_time": utc_now.isoformat(sep=" "),
                "task_state": "COMPLETED"
            }
            send_task_update(sim_id=sim_id,
                             task_id=task_id,
                             update_key=update_key,
                             update_dict=up_dict,
                             backend_url=backend_url)
            logging.debug("Update for task: %d - COMPLETED", task_id)
            return

        elif re.search(TIMEOUT_MATCH, line):
            logging.debug("Found TIMEOUT_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id)
            up_dict = {  # skipcq: PYL-W0612
                "task_state": "FAILED",
                "end_time": datetime.utcnow().isoformat(sep=" ")
            }
            send_task_update(sim_id=sim_id,
                             task_id=task_id,
                             update_key=update_key,
                             update_dict=up_dict,
                             backend_url=backend_url)
            print("Update for task: %d - TIMEOUT", task_id)
            return
        else:
            logging.debug("No match found in line: %s for file: %s and task: %s ", line, filepath, task_id)
    return

send_task_update

send_task_update(
    sim_id, task_id, update_key, update_dict, backend_url
)

Sends task update to flask to update database

Source code in yaptide/batch/watcher.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def send_task_update(sim_id: int, task_id: str, update_key: str, update_dict: dict, backend_url: str) -> bool:
    """Sends task update to flask to update database"""
    if not backend_url:
        logging.error("Backend url not specified")
        return False

    dict_to_send = {"simulation_id": sim_id, "task_id": task_id, "update_key": update_key, "update_dict": update_dict}
    tasks_url = f"{backend_url}/tasks"
    logging.debug("Sending update %s to the backend %s", dict_to_send, tasks_url)
    context = ssl.SSLContext()

    req = request.Request(tasks_url,
                          json.dumps(dict_to_send).encode(), {'Content-Type': 'application/json'},
                          method='POST')

    try:
        with request.urlopen(req, context=context) as res:  # skipcq: BAN-B310
            if res.getcode() != 202:
                logging.warning("Sending update to %s failed", tasks_url)
                return False
    except Exception as e:  # skipcq: PYL-W0703
        print(e)
        logging.debug("Sending update to %s failed", tasks_url)
        return False
    return True