Coverage for yaptide/batch/watcher.py: 17%
121 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
1import argparse
2import json
3import logging
4import re
5import signal
6import ssl
7import threading
8import time
9from datetime import datetime
10from io import TextIOWrapper
11from pathlib import Path
12from urllib import request
14RUN_MATCH = r"\bPrimary particle no.\s*\d*\s*ETR:\s*\d*\s*hour.*\d*\s*minute.*\d*\s*second.*\b"
15COMPLETE_MATCH = r"\bRun time:\s*\d*\s*hour.*\d*\s*minute.*\d*\s*second.*\b"
16REQUESTED_MATCH = r"\bRequested number of primaries NSTAT"
17TIMEOUT_MATCH = r"\bTimeout occured"
20def log_generator(thefile: TextIOWrapper, event: threading.Event = None, timeout: int = 3600) -> str:
21 """
22 Generator equivalent to `tail -f` Linux command.
23 Yields new lines appended to the end of the file.
24 It no new line appears in `timeout` seconds, generator stops.
25 Main purpose is monitoring of the log files
26 """
27 sleep_counter = 0
28 while True:
29 if event and event.is_set():
30 break
31 if thefile is None:
32 return "File not found"
33 line = thefile.readline()
34 if not line:
35 time.sleep(1)
36 sleep_counter += 1
37 if sleep_counter >= timeout:
38 return "Timeout occured"
39 continue
40 sleep_counter = 0
41 yield line
44def send_task_update(sim_id: int, task_id: str, update_key: str, update_dict: dict, backend_url: str) -> bool:
45 """Sends task update to flask to update database"""
46 if not backend_url:
47 logging.error("Backend url not specified")
48 return False
50 dict_to_send = {"simulation_id": sim_id, "task_id": task_id, "update_key": update_key, "update_dict": update_dict}
51 tasks_url = f"{backend_url}/tasks"
52 logging.debug("Sending update %s to the backend %s", dict_to_send, tasks_url)
53 context = ssl.SSLContext()
55 req = request.Request(tasks_url,
56 json.dumps(dict_to_send).encode(), {'Content-Type': 'application/json'},
57 method='POST')
59 try:
60 with request.urlopen(req, context=context) as res: # skipcq: BAN-B310
61 if res.getcode() != 202:
62 logging.warning("Sending update to %s failed", tasks_url)
63 return False
64 except Exception as e: # skipcq: PYL-W0703
65 print(e)
66 logging.debug("Sending update to %s failed", tasks_url)
67 return False
68 return True
71def read_file(filepath: Path, sim_id: int, task_id: int, update_key: str, backend_url: str): # skipcq: PYL-W0613
72 """Monitors log file of certain task"""
73 logging.debug("Started monitoring, simulation id: %d, task id: %s", sim_id, task_id)
74 logfile = None
75 update_time = 0
76 for _ in range(30): # 30 stands for maximum attempts
77 try:
78 logfile = open(filepath) # skipcq: PTC-W6004
79 break
80 except FileNotFoundError:
81 time.sleep(1)
83 if logfile is None:
84 logging.debug("Log file for task %s not found", task_id)
85 up_dict = { # skipcq: PYL-W0612
86 "task_state": "FAILED",
87 "end_time": datetime.utcnow().isoformat(sep=" ")
88 }
89 send_task_update(sim_id=sim_id,
90 task_id=task_id,
91 update_key=update_key,
92 update_dict=up_dict,
93 backend_url=backend_url)
94 logging.debug("Update for task: %d - FAILED", task_id)
95 return
97 loglines = log_generator(logfile, threading.Event())
98 for line in loglines:
99 utc_now = datetime.utcnow()
100 if re.search(RUN_MATCH, line):
101 logging.debug("Found RUN_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id)
102 if utc_now.timestamp() - update_time < 2: # hardcoded 2 seconds to avoid spamming
103 logging.debug("Skipping update, too often")
104 continue
105 update_time = utc_now.timestamp()
106 splitted = line.split()
107 up_dict = { # skipcq: PYL-W0612
108 "simulated_primaries": int(splitted[3]),
109 "estimated_time": int(splitted[9]) + int(splitted[7]) * 60 + int(splitted[5]) * 3600
110 }
111 send_task_update(sim_id=sim_id,
112 task_id=task_id,
113 update_key=update_key,
114 update_dict=up_dict,
115 backend_url=backend_url)
116 logging.debug("Update for task: %d - simulated primaries: %s", task_id, splitted[3])
118 elif re.search(REQUESTED_MATCH, line):
119 logging.debug("Found REQUESTED_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id)
120 splitted = line.split(": ")
121 up_dict = { # skipcq: PYL-W0612
122 "simulated_primaries": 0,
123 "requested_primaries": int(splitted[1]),
124 "start_time": utc_now.isoformat(sep=" "),
125 "task_state": "RUNNING"
126 }
127 send_task_update(sim_id=sim_id,
128 task_id=task_id,
129 update_key=update_key,
130 update_dict=up_dict,
131 backend_url=backend_url)
132 logging.debug("Update for task: %d - RUNNING", task_id)
134 elif re.search(COMPLETE_MATCH, line):
135 logging.debug("Found COMPLETE_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id)
136 splitted = line.split()
137 up_dict = { # skipcq: PYL-W0612
138 "end_time": utc_now.isoformat(sep=" "),
139 "task_state": "COMPLETED"
140 }
141 send_task_update(sim_id=sim_id,
142 task_id=task_id,
143 update_key=update_key,
144 update_dict=up_dict,
145 backend_url=backend_url)
146 logging.debug("Update for task: %d - COMPLETED", task_id)
147 return
149 elif re.search(TIMEOUT_MATCH, line):
150 logging.debug("Found TIMEOUT_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id)
151 up_dict = { # skipcq: PYL-W0612
152 "task_state": "FAILED",
153 "end_time": datetime.utcnow().isoformat(sep=" ")
154 }
155 send_task_update(sim_id=sim_id,
156 task_id=task_id,
157 update_key=update_key,
158 update_dict=up_dict,
159 backend_url=backend_url)
160 print("Update for task: %d - TIMEOUT", task_id)
161 return
162 else:
163 logging.debug("No match found in line: %s for file: %s and task: %s ", line, filepath, task_id)
164 return
167if __name__ == "__main__":
168 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
170 parser = argparse.ArgumentParser()
171 parser.add_argument("--filepath", type=str)
172 parser.add_argument("--sim_id", type=int)
173 parser.add_argument("--task_id", type=int)
174 parser.add_argument("--update_key", type=str)
175 parser.add_argument("--backend_url", type=str)
176 parser.add_argument("--verbose", action="store_true")
177 args = parser.parse_args()
179 log_level = logging.INFO
180 if args.verbose:
181 log_level = logging.DEBUG
182 logging.basicConfig(level=log_level,
183 format="%(asctime)s %(levelname)s %(message)s",
184 handlers=[logging.StreamHandler()])
186 logging.info("log file %s", args.filepath)
187 logging.info("sim_id %s", args.sim_id)
188 logging.info("task_id %s", args.task_id)
189 logging.info("update_key %s", args.update_key)
190 logging.info("backend_url %s", args.backend_url)
191 read_file(filepath=Path(args.filepath),
192 sim_id=args.sim_id,
193 task_id=args.task_id,
194 update_key=args.update_key,
195 backend_url=args.backend_url)