Coverage for yaptide/batch/watcher.py: 17%

121 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-04 00:31 +0000

1import argparse 

2import json 

3import logging 

4import re 

5import signal 

6import ssl 

7import threading 

8import time 

9from datetime import datetime 

10from io import TextIOWrapper 

11from pathlib import Path 

12from urllib import request 

13 

14RUN_MATCH = r"\bPrimary particle no.\s*\d*\s*ETR:\s*\d*\s*hour.*\d*\s*minute.*\d*\s*second.*\b" 

15COMPLETE_MATCH = r"\bRun time:\s*\d*\s*hour.*\d*\s*minute.*\d*\s*second.*\b" 

16REQUESTED_MATCH = r"\bRequested number of primaries NSTAT" 

17TIMEOUT_MATCH = r"\bTimeout occured" 

18 

19 

20def log_generator(thefile: TextIOWrapper, event: threading.Event = None, timeout: int = 3600) -> str: 

21 """ 

22 Generator equivalent to `tail -f` Linux command. 

23 Yields new lines appended to the end of the file. 

24 It no new line appears in `timeout` seconds, generator stops. 

25 Main purpose is monitoring of the log files 

26 """ 

27 sleep_counter = 0 

28 while True: 

29 if event and event.is_set(): 

30 break 

31 if thefile is None: 

32 return "File not found" 

33 line = thefile.readline() 

34 if not line: 

35 time.sleep(1) 

36 sleep_counter += 1 

37 if sleep_counter >= timeout: 

38 return "Timeout occured" 

39 continue 

40 sleep_counter = 0 

41 yield line 

42 

43 

44def send_task_update(sim_id: int, task_id: str, update_key: str, update_dict: dict, backend_url: str) -> bool: 

45 """Sends task update to flask to update database""" 

46 if not backend_url: 

47 logging.error("Backend url not specified") 

48 return False 

49 

50 dict_to_send = {"simulation_id": sim_id, "task_id": task_id, "update_key": update_key, "update_dict": update_dict} 

51 tasks_url = f"{backend_url}/tasks" 

52 logging.debug("Sending update %s to the backend %s", dict_to_send, tasks_url) 

53 context = ssl.SSLContext() 

54 

55 req = request.Request(tasks_url, 

56 json.dumps(dict_to_send).encode(), {'Content-Type': 'application/json'}, 

57 method='POST') 

58 

59 try: 

60 with request.urlopen(req, context=context) as res: # skipcq: BAN-B310 

61 if res.getcode() != 202: 

62 logging.warning("Sending update to %s failed", tasks_url) 

63 return False 

64 except Exception as e: # skipcq: PYL-W0703 

65 print(e) 

66 logging.debug("Sending update to %s failed", tasks_url) 

67 return False 

68 return True 

69 

70 

71def read_file(filepath: Path, sim_id: int, task_id: int, update_key: str, backend_url: str): # skipcq: PYL-W0613 

72 """Monitors log file of certain task""" 

73 logging.debug("Started monitoring, simulation id: %d, task id: %s", sim_id, task_id) 

74 logfile = None 

75 update_time = 0 

76 for _ in range(30): # 30 stands for maximum attempts 

77 try: 

78 logfile = open(filepath) # skipcq: PTC-W6004 

79 break 

80 except FileNotFoundError: 

81 time.sleep(1) 

82 

83 if logfile is None: 

84 logging.debug("Log file for task %s not found", task_id) 

85 up_dict = { # skipcq: PYL-W0612 

86 "task_state": "FAILED", 

87 "end_time": datetime.utcnow().isoformat(sep=" ") 

88 } 

89 send_task_update(sim_id=sim_id, 

90 task_id=task_id, 

91 update_key=update_key, 

92 update_dict=up_dict, 

93 backend_url=backend_url) 

94 logging.debug("Update for task: %d - FAILED", task_id) 

95 return 

96 

97 loglines = log_generator(logfile, threading.Event()) 

98 for line in loglines: 

99 utc_now = datetime.utcnow() 

100 if re.search(RUN_MATCH, line): 

101 logging.debug("Found RUN_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id) 

102 if utc_now.timestamp() - update_time < 2: # hardcoded 2 seconds to avoid spamming 

103 logging.debug("Skipping update, too often") 

104 continue 

105 update_time = utc_now.timestamp() 

106 splitted = line.split() 

107 up_dict = { # skipcq: PYL-W0612 

108 "simulated_primaries": int(splitted[3]), 

109 "estimated_time": int(splitted[9]) + int(splitted[7]) * 60 + int(splitted[5]) * 3600 

110 } 

111 send_task_update(sim_id=sim_id, 

112 task_id=task_id, 

113 update_key=update_key, 

114 update_dict=up_dict, 

115 backend_url=backend_url) 

116 logging.debug("Update for task: %d - simulated primaries: %s", task_id, splitted[3]) 

117 

118 elif re.search(REQUESTED_MATCH, line): 

119 logging.debug("Found REQUESTED_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id) 

120 splitted = line.split(": ") 

121 up_dict = { # skipcq: PYL-W0612 

122 "simulated_primaries": 0, 

123 "requested_primaries": int(splitted[1]), 

124 "start_time": utc_now.isoformat(sep=" "), 

125 "task_state": "RUNNING" 

126 } 

127 send_task_update(sim_id=sim_id, 

128 task_id=task_id, 

129 update_key=update_key, 

130 update_dict=up_dict, 

131 backend_url=backend_url) 

132 logging.debug("Update for task: %d - RUNNING", task_id) 

133 

134 elif re.search(COMPLETE_MATCH, line): 

135 logging.debug("Found COMPLETE_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id) 

136 splitted = line.split() 

137 up_dict = { # skipcq: PYL-W0612 

138 "end_time": utc_now.isoformat(sep=" "), 

139 "task_state": "COMPLETED" 

140 } 

141 send_task_update(sim_id=sim_id, 

142 task_id=task_id, 

143 update_key=update_key, 

144 update_dict=up_dict, 

145 backend_url=backend_url) 

146 logging.debug("Update for task: %d - COMPLETED", task_id) 

147 return 

148 

149 elif re.search(TIMEOUT_MATCH, line): 

150 logging.debug("Found TIMEOUT_MATCH in line: %s for file: %s and task: %s ", line, filepath, task_id) 

151 up_dict = { # skipcq: PYL-W0612 

152 "task_state": "FAILED", 

153 "end_time": datetime.utcnow().isoformat(sep=" ") 

154 } 

155 send_task_update(sim_id=sim_id, 

156 task_id=task_id, 

157 update_key=update_key, 

158 update_dict=up_dict, 

159 backend_url=backend_url) 

160 print("Update for task: %d - TIMEOUT", task_id) 

161 return 

162 else: 

163 logging.debug("No match found in line: %s for file: %s and task: %s ", line, filepath, task_id) 

164 return 

165 

166 

167if __name__ == "__main__": 

168 signal.signal(signal.SIGUSR1, signal.SIG_IGN) 

169 

170 parser = argparse.ArgumentParser() 

171 parser.add_argument("--filepath", type=str) 

172 parser.add_argument("--sim_id", type=int) 

173 parser.add_argument("--task_id", type=int) 

174 parser.add_argument("--update_key", type=str) 

175 parser.add_argument("--backend_url", type=str) 

176 parser.add_argument("--verbose", action="store_true") 

177 args = parser.parse_args() 

178 

179 log_level = logging.INFO 

180 if args.verbose: 

181 log_level = logging.DEBUG 

182 logging.basicConfig(level=log_level, 

183 format="%(asctime)s %(levelname)s %(message)s", 

184 handlers=[logging.StreamHandler()]) 

185 

186 logging.info("log file %s", args.filepath) 

187 logging.info("sim_id %s", args.sim_id) 

188 logging.info("task_id %s", args.task_id) 

189 logging.info("update_key %s", args.update_key) 

190 logging.info("backend_url %s", args.backend_url) 

191 read_file(filepath=Path(args.filepath), 

192 sim_id=args.sim_id, 

193 task_id=args.task_id, 

194 update_key=args.update_key, 

195 backend_url=args.backend_url)