Coverage for yaptide/celery/utils/pymc.py: 59%

224 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-31 19:18 +0000

1import logging 

2import os 

3import re 

4import subprocess 

5import tempfile 

6import threading 

7import time 

8from datetime import datetime 

9from pathlib import Path 

10from typing import List, Optional, Protocol 

11 

12from pymchelper.executor.options import SimulationSettings, SimulatorType 

13from pymchelper.input_output import frompattern 

14from pymchelper.executor.runner import Runner 

15 

16from yaptide.batch.watcher import (COMPLETE_MATCH, REQUESTED_MATCH, RUN_MATCH, TIMEOUT_MATCH, log_generator) 

17from yaptide.celery.utils.progress.fluka_monitor import TaskDetails, read_fluka_out_file 

18from yaptide.celery.utils.requests import send_task_update 

19from yaptide.utils.enums import EntityState 

20 

21 

22def get_tmp_dir() -> Path: 

23 """Function to get temporary directory from environment variables.""" 

24 # lets try by default to use python tempfile module 

25 tmp_dir = tempfile.gettempdir() 

26 logging.debug("1. tempfile.gettempdir() is: %s", tmp_dir) 

27 

28 # if the TMPDIR env variable is set we will use it to override the default 

29 logging.info("1. TMPDIR is: %s", os.environ.get("TMPDIR", "not set")) 

30 if os.environ.get("TMPDIR"): 

31 tmp_dir = os.environ.get("TMPDIR") 

32 

33 # if the TEMP env variable is set we will use it to override the default 

34 logging.info("2. TEMP is: %s", os.environ.get("TEMP", "not set")) 

35 if os.environ.get("TEMP"): 

36 tmp_dir = os.environ.get("TEMP") 

37 

38 # if the TMP env variable is set we will use it to override the default 

39 logging.info("3. TMP is: %s", os.environ.get("TMP", "not set")) 

40 if os.environ.get("TMP"): 

41 tmp_dir = os.environ.get("TMP") 

42 

43 return Path(tmp_dir) 

44 

45 

46def command_to_run_shieldhit(dir_path: Path, task_id: int) -> list[str]: 

47 """Function to create command to run SHIELD-HIT12A.""" 

48 settings = SimulationSettings( 

49 input_path=dir_path, # skipcq: PYL-W0612 # usefull 

50 simulator_type=SimulatorType.shieldhit, 

51 simulator_exec_path=None, # useless, we guess from PATH 

52 cmdline_opts="") # useless, we could use -q in the future 

53 # last part of task_id gives an integer seed for random number generator 

54 settings.set_rng_seed(task_id) 

55 command_as_list = str(settings).split() 

56 command_as_list.append(str(dir_path)) 

57 return command_as_list 

58 

59 

60def get_shieldhit_estimators(dir_path: Path) -> dict: 

61 """Function to get estimators from SHIELD-HIT12A output files.""" 

62 estimators_dict = {} 

63 

64 matching_files = list(dir_path.glob("*.bdo")) 

65 if len(matching_files) == 0: 

66 logging.error("No *.bdo files found in %s", dir_path) 

67 return estimators_dict 

68 

69 logging.debug("Found %d *.bdo files in %s", len(matching_files), dir_path) 

70 files_pattern_pattern = str(dir_path / "*.bdo") 

71 estimators_list = frompattern(pattern=files_pattern_pattern) 

72 for estimator in estimators_list: 

73 logging.debug("Appending estimator for %s", estimator.file_corename) 

74 estimators_dict[estimator.file_corename] = estimator 

75 

76 return estimators_dict 

77 

78 

79def command_to_run_fluka(dir_path: Path, task_id: str) -> list[str]: 

80 """Function to create command to run FLUKA.""" 

81 input_file = next(dir_path.glob("*.inp"), None) 

82 if input_file is None: 

83 logging.debug("failed to generate fluka command. No *.inp file found in %s", dir_path) 

84 # if there is no input file, raise an error 

85 # this should never happen 

86 raise FileNotFoundError("Input file not found") 

87 

88 # create settings object 

89 # we are providing input file, simulator type and additional options 

90 # provided option M with value 1 will run execute only one simulation cycle, default is 5 

91 settings = SimulationSettings(input_path=str(input_file), simulator_type=SimulatorType.fluka, cmdline_opts="-M 1") 

92 update_rng_seed_in_fluka_file(input_file, task_id) 

93 command_as_list = str(settings).split() 

94 command_as_list.append(str(input_file)) 

95 return command_as_list 

96 

97 

98def update_rng_seed_in_fluka_file(input_file: Path, task_id: int) -> None: 

99 """Function to update random seed in FLUKA input file.""" 

100 

101 class UpdateFlukaRandomSeed(Protocol): 

102 """Definition of protocol for updating random seed in fluka input file. 

103 

104 Its purpose is to allow us to use private method of Runner class. 

105 """ 

106 

107 def __call__(self, file_path: str, rng_seed: int) -> None: 

108 """Updates random seed in fluka input file""" 

109 

110 random_seed = task_id 

111 update_fluka_function: UpdateFlukaRandomSeed = Runner._Runner__update_fluka_input_file # pylint: disable=W0212 

112 update_fluka_function(str(input_file.resolve()), random_seed) 

113 

114 

115def execute_simulation_subprocess(dir_path: Path, command_as_list: list[str]) -> tuple[bool, str, str]: 

116 """Function to execute simulation subprocess.""" 

117 process_exit_success: bool = True 

118 command_stdout: str = "" 

119 command_stderr: str = "" 

120 try: 

121 completed_process = subprocess.run(command_as_list, 

122 check=True, 

123 cwd=str(dir_path), 

124 stdout=subprocess.PIPE, 

125 stderr=subprocess.PIPE, 

126 text=True) 

127 logging.info("simulation subprocess with return code %d finished", completed_process.returncode) 

128 

129 # Capture stdout and stderr 

130 command_stdout = completed_process.stdout 

131 command_stderr = completed_process.stderr 

132 

133 process_exit_success = True 

134 

135 # Log stdout and stderr using logging 

136 logging.info("Command Output:\n%s", command_stdout) 

137 logging.info("Command Error Output:\n%s", command_stderr) 

138 except subprocess.CalledProcessError as e: 

139 process_exit_success = False 

140 # If the command exits with a non-zero status 

141 logging.error("Command Error: %sSTD OUT: %s\nExecuted Command: %s", e.stderr, e.stdout, 

142 " ".join(command_as_list)) 

143 except Exception as e: # skipcq: PYL-W0703 

144 process_exit_success = False 

145 logging.error("Exception while running simulation: %s", e) 

146 

147 return process_exit_success, command_stdout, command_stderr 

148 

149 

150def get_fluka_estimators(dir_path: Path) -> dict: 

151 """Function to get estimators from FLUKA output files.""" 

152 estimators_dict = {} 

153 

154 matching_files = list(dir_path.glob("*_fort.*")) 

155 if len(matching_files) == 0: 

156 logging.error("No *_fort.* files found in %s", dir_path) 

157 return estimators_dict 

158 

159 logging.debug("Found %d *_fort.* files in %s", len(matching_files), dir_path) 

160 files_pattern_pattern = str(dir_path / "*_fort.*") 

161 estimators_list = frompattern(pattern=files_pattern_pattern) 

162 for estimator in estimators_list: 

163 logging.debug("Appending estimator for %s", estimator.file_corename) 

164 

165 estimators_dict[estimator.file_corename] = estimator 

166 

167 return estimators_dict 

168 

169 

170def average_values(base_values: List[float], new_values: List[float], count: int) -> List[float]: 

171 """Average two lists of values""" 

172 return [sum(x) / (count + 1) for x in zip(map(lambda x: x * count, base_values), new_values)] 

173 

174 

175def average_estimators(base_list: list[dict], list_to_add: list[dict], averaged_count: int) -> list: 

176 """Averages estimators from two dicts""" 

177 logging.debug("Averaging estimators - already averaged: %d", averaged_count) 

178 for est_i, estimator_dict in enumerate(list_to_add): 

179 # check if estimator names are the same and if not, find matching estimator's index in base_list 

180 if estimator_dict["name"] != base_list[est_i]["name"]: 

181 est_i = next((i for i, item in enumerate(base_list) if item["name"] == estimator_dict["name"]), None) 

182 logging.debug("Averaging estimator %s", estimator_dict["name"]) 

183 for page_i, page_dict in enumerate(estimator_dict["pages"]): 

184 # check if page numbers are the same and if not, find matching page's index in base_list 

185 if page_dict["metadata"]["page_number"] != base_list[est_i]["pages"][page_i]["metadata"]["page_number"]: 

186 page_i = next((i for i, item in enumerate(base_list[est_i]["pages"]) 

187 if item["metadata"]["page_number"] == page_dict["metadata"]["page_number"]), None) 

188 

189 base_list[est_i]["pages"][page_i]["data"]["values"] = average_values( 

190 base_list[est_i]["pages"][page_i]["data"]["values"], page_dict["data"]["values"], averaged_count) 

191 

192 logging.debug("Averaged page %s with %d elements", page_dict["metadata"]["page_number"], 

193 len(page_dict["data"]["values"])) 

194 return base_list 

195 

196 

197def read_file(event: threading.Event, 

198 filepath: Path, 

199 simulation_id: int, 

200 task_id: str, 

201 update_key: str, 

202 timeout_wait_for_file: int = 20, 

203 timeout_wait_for_line: int = 5 * 60, 

204 next_backend_update_time: int = 2, 

205 logging_level: int = logging.WARNING): 

206 """Monitors log file of certain task, when new line with message matching regex appears, sends update to backend""" 

207 logging.getLogger(__name__).setLevel(logging_level) 

208 logfile = None 

209 update_time = 0 

210 logging.info("Started monitoring, simulation id: %d, task id: %d", simulation_id, task_id) 

211 # if the logfile is not created in the first X seconds, it is probably an error 

212 for i in range(timeout_wait_for_file): # maximum attempts, each attempt is one second 

213 if event.is_set(): 

214 return 

215 try: 

216 logging.debug("Trying to open file %s, attempt %d/%d", filepath, i, timeout_wait_for_file) 

217 logfile = open(filepath) # skipcq: PTC-W6004 

218 break 

219 except FileNotFoundError: 

220 time.sleep(1) 

221 

222 # if logfile was not created in the first minute, task is marked as failed 

223 if logfile is None: 

224 logging.error("Log file for task %d not found", task_id) 

225 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")} 

226 send_task_update(simulation_id, task_id, update_key, up_dict) 

227 return 

228 logging.debug("Log file for task %d found", task_id) 

229 

230 # create generator which waits for new lines in log file 

231 # if no new line appears in timeout_wait_for_line seconds, generator stops 

232 loglines = log_generator(logfile, event, timeout=timeout_wait_for_line) 

233 requested_primaries = 0 

234 logging.info("Parsing log file for task %d started", task_id) 

235 simulated_primaries = 0 

236 for line in loglines: 

237 if event.is_set(): 

238 return 

239 utc_now = datetime.utcnow() 

240 logging.debug("Parsing line: %s", line.rstrip()) 

241 if re.search(RUN_MATCH, line): 

242 logging.debug("Found RUN_MATCH in line: %s for file: %s and task: %d ", line.rstrip(), filepath, task_id) 

243 splitted = line.split() 

244 try: 

245 simulated_primaries = int(splitted[3]) 

246 except (IndexError, ValueError): 

247 logging.error("Cannot parse number of simulated primaries in line: %s", line.rstrip()) 

248 if (utc_now.timestamp() - update_time < next_backend_update_time # do not send update too often 

249 and requested_primaries >= simulated_primaries): 

250 logging.debug("Skipping update for task %d", task_id) 

251 continue 

252 update_time = utc_now.timestamp() 

253 estimated_seconds = 0 

254 try: 

255 estimated_seconds = int(splitted[9]) + int(splitted[7]) * 60 + int(splitted[5]) * 3600 

256 except (IndexError, ValueError): 

257 logging.error("Cannot parse estimated time in line: %s", line.rstrip()) 

258 up_dict = {"simulated_primaries": simulated_primaries, "estimated_time": estimated_seconds} 

259 logging.debug("Sending update for task %d, simulated primaries %d", task_id, simulated_primaries) 

260 send_task_update(simulation_id, task_id, update_key, up_dict) 

261 

262 elif re.search(REQUESTED_MATCH, line): 

263 logging.debug("Found REQUESTED_MATCH in line: %s for file: %s and task: %d ", line, filepath, task_id) 

264 # found a line with requested primaries, update database 

265 # task is in RUNNING state 

266 splitted = line.split(": ") 

267 requested_primaries = int(splitted[1]) 

268 up_dict = { 

269 "simulated_primaries": 0, 

270 "requested_primaries": requested_primaries, 

271 "start_time": utc_now.isoformat(sep=" "), 

272 "task_state": EntityState.RUNNING.value 

273 } 

274 logging.debug("Sending update for task %d, requested primaries %d", task_id, requested_primaries) 

275 send_task_update(simulation_id, task_id, update_key, up_dict) 

276 

277 elif re.search(TIMEOUT_MATCH, line): 

278 logging.error("Simulation watcher %d timed out", task_id) 

279 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")} 

280 send_task_update(simulation_id, task_id, update_key, up_dict) 

281 return 

282 

283 elif re.search(COMPLETE_MATCH, line): 

284 logging.debug("Found COMPLETE_MATCH in line: %s for file: %s and task: %d ", line, filepath, task_id) 

285 break 

286 

287 logging.info("Parsing log file for task %d finished", task_id) 

288 up_dict = { 

289 "simulated_primaries": requested_primaries, 

290 "end_time": utc_now.isoformat(sep=" "), 

291 "task_state": EntityState.COMPLETED.value 

292 } 

293 logging.info("Sending final update for task %d, simulated primaries %d", task_id, simulated_primaries) 

294 send_task_update(simulation_id, task_id, update_key, up_dict) 

295 

296 

297def read_fluka_file(event: threading.Event, 

298 dirpath: Path, 

299 simulation_id: int, 

300 task_id: int, 

301 update_key: str, 

302 timeout_wait_for_file_s: int = 20, 

303 timeout_wait_for_line_s: int = 5 * 60, 

304 next_backend_update_time_s: int = 2, 

305 logging_level: int = logging.WARNING): 

306 """Monitors log file of fluka task""" 

307 logging.getLogger(__name__).setLevel(logging_level) 

308 logfile = None 

309 logging.info("Started monitoring, simulation id: %d, task id: %d", simulation_id, task_id) 

310 

311 # if the logfile is not created in the first X seconds, it is probably an error 

312 # continuation of awful glob path hack 

313 def get_first_matching_file() -> Optional[Path]: 

314 """Returns first matching file.""" 

315 path = next(dirpath.glob("fluka_*/*001.out"), None) 

316 return path.resolve() if path else None 

317 

318 for _ in range(timeout_wait_for_file_s): # maximum attempts, each attempt is one second 

319 if event.is_set(): 

320 return 

321 try: 

322 optional_file = get_first_matching_file() 

323 if not optional_file: 

324 time.sleep(1) 

325 continue 

326 logfile = open(optional_file) # skipcq: PTC-W6004 

327 break 

328 except FileNotFoundError: 

329 time.sleep(1) 

330 

331 # if logfile was not created in the first minute, task is marked as failed 

332 if logfile is None: 

333 logging.error("Log file for task %d not found", task_id) 

334 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")} 

335 send_task_update(simulation_id, task_id, update_key, up_dict) 

336 return 

337 logging.debug("Log file for task %d found", task_id) 

338 

339 # create generator which waits for new lines in log file 

340 # if no new line appears in timeout_wait_for_line seconds, generator stops 

341 loglines = log_generator(logfile, event, timeout=timeout_wait_for_line_s) 

342 logging.info("Parsing log file for task %d started", task_id) 

343 read_fluka_out_file(event, 

344 loglines, 

345 next_backend_update_time=next_backend_update_time_s, 

346 details=TaskDetails(simulation_id, task_id, update_key), 

347 verbose=logging_level <= logging.INFO) 

348 

349 

350def read_file_offline(filepath: Path) -> tuple[int, int]: 

351 """Reads log file and returns number of simulated and requested primaries""" 

352 simulated_primaries = 0 

353 requested_primaries = 0 

354 try: 

355 with open(filepath, 'r') as f: 

356 for line in f: 

357 logging.debug("Parsing line: %s", line.rstrip()) 

358 if re.search(RUN_MATCH, line): 

359 logging.debug("Found RUN_MATCH in line: %s for file: %s", line.rstrip(), filepath) 

360 splitted = line.split() 

361 simulated_primaries = int(splitted[3]) 

362 elif re.search(REQUESTED_MATCH, line): 

363 logging.debug("Found REQUESTED_MATCH in line: %s for file: %s", line.rstrip(), filepath) 

364 splitted = line.split(": ") 

365 requested_primaries = int(splitted[1]) 

366 except FileNotFoundError: 

367 logging.error("Log file %s not found", filepath) 

368 return simulated_primaries, requested_primaries