Coverage for yaptide/celery/utils/pymc.py: 59%

224 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-12 06:23 +0000

1import logging 

2import os 

3import re 

4import subprocess 

5import tempfile 

6import threading 

7import time 

8from datetime import datetime 

9from pathlib import Path 

10from typing import List, Optional, Protocol 

11 

12from pymchelper.executor.options import SimulationSettings, SimulatorType 

13from pymchelper.executor.runner import Runner 

14from pymchelper.input_output import frompattern 

15 

16from yaptide.batch.watcher import (COMPLETE_MATCH, REQUESTED_MATCH, RUN_MATCH, 

17 TIMEOUT_MATCH, log_generator) 

18from yaptide.celery.utils.progress.fluka_monitor import (TaskDetails, 

19 read_fluka_out_file) 

20from yaptide.celery.utils.requests import send_task_update 

21from yaptide.utils.enums import EntityState 

22 

23 

24def get_tmp_dir() -> Path: 

25 """Function to get temporary directory from environment variables.""" 

26 # lets try by default to use python tempfile module 

27 tmp_dir = tempfile.gettempdir() 

28 logging.debug("1. tempfile.gettempdir() is: %s", tmp_dir) 

29 

30 # if the TMPDIR env variable is set we will use it to override the default 

31 logging.info("1. TMPDIR is: %s", os.environ.get("TMPDIR", "not set")) 

32 if os.environ.get("TMPDIR"): 

33 tmp_dir = os.environ.get("TMPDIR") 

34 

35 # if the TEMP env variable is set we will use it to override the default 

36 logging.info("2. TEMP is: %s", os.environ.get("TEMP", "not set")) 

37 if os.environ.get("TEMP"): 

38 tmp_dir = os.environ.get("TEMP") 

39 

40 # if the TMP env variable is set we will use it to override the default 

41 logging.info("3. TMP is: %s", os.environ.get("TMP", "not set")) 

42 if os.environ.get("TMP"): 

43 tmp_dir = os.environ.get("TMP") 

44 

45 return Path(tmp_dir) 

46 

47 

48def command_to_run_shieldhit(dir_path: Path, task_id: int) -> list[str]: 

49 """Function to create command to run SHIELD-HIT12A.""" 

50 settings = SimulationSettings( 

51 input_path=dir_path, # skipcq: PYL-W0612 # usefull 

52 simulator_type=SimulatorType.shieldhit, 

53 simulator_exec_path=None, # useless, we guess from PATH 

54 cmdline_opts="") # useless, we could use -q in the future 

55 # last part of task_id gives an integer seed for random number generator 

56 settings.set_rng_seed(task_id) 

57 command_as_list = str(settings).split() 

58 command_as_list.append(str(dir_path)) 

59 return command_as_list 

60 

61 

62def get_shieldhit_estimators(dir_path: Path) -> dict: 

63 """Function to get estimators from SHIELD-HIT12A output files.""" 

64 estimators_dict = {} 

65 

66 matching_files = list(dir_path.glob("*.bdo")) 

67 if len(matching_files) == 0: 

68 logging.error("No *.bdo files found in %s", dir_path) 

69 return estimators_dict 

70 

71 logging.debug("Found %d *.bdo files in %s", len(matching_files), dir_path) 

72 files_pattern_pattern = str(dir_path / "*.bdo") 

73 estimators_list = frompattern(pattern=files_pattern_pattern) 

74 for estimator in estimators_list: 

75 logging.debug("Appending estimator for %s", estimator.file_corename) 

76 estimators_dict[estimator.file_corename] = estimator 

77 

78 return estimators_dict 

79 

80 

81def command_to_run_fluka(dir_path: Path, task_id: str) -> list[str]: 

82 """Function to create command to run FLUKA.""" 

83 input_file = next(dir_path.glob("*.inp"), None) 

84 if input_file is None: 

85 logging.debug("failed to generate fluka command. No *.inp file found in %s", dir_path) 

86 # if there is no input file, raise an error 

87 # this should never happen 

88 raise FileNotFoundError("Input file not found") 

89 

90 # create settings object 

91 # we are providing input file, simulator type and additional options 

92 # provided option M with value 1 will run execute only one simulation cycle, default is 5 

93 settings = SimulationSettings(input_path=str(input_file), simulator_type=SimulatorType.fluka, cmdline_opts="-M 1") 

94 update_rng_seed_in_fluka_file(input_file, task_id) 

95 command_as_list = str(settings).split() 

96 command_as_list.append(str(input_file)) 

97 return command_as_list 

98 

99 

100def update_rng_seed_in_fluka_file(input_file: Path, task_id: int) -> None: 

101 """Function to update random seed in FLUKA input file.""" 

102 

103 class UpdateFlukaRandomSeed(Protocol): 

104 """Definition of protocol for updating random seed in fluka input file. 

105 

106 Its purpose is to allow us to use private method of Runner class. 

107 """ 

108 

109 def __call__(self, file_path: str, rng_seed: int) -> None: 

110 """Updates random seed in fluka input file""" 

111 

112 random_seed = task_id 

113 update_fluka_function: UpdateFlukaRandomSeed = Runner._Runner__update_fluka_input_file # pylint: disable=W0212 

114 update_fluka_function(str(input_file.resolve()), random_seed) 

115 

116 

117def execute_simulation_subprocess(dir_path: Path, command_as_list: list[str]) -> tuple[bool, str, str]: 

118 """Function to execute simulation subprocess.""" 

119 process_exit_success: bool = True 

120 command_stdout: str = "" 

121 command_stderr: str = "" 

122 try: 

123 completed_process = subprocess.run(command_as_list, 

124 check=True, 

125 cwd=str(dir_path), 

126 stdout=subprocess.PIPE, 

127 stderr=subprocess.PIPE, 

128 text=True) 

129 logging.info("simulation subprocess with return code %d finished", completed_process.returncode) 

130 

131 # Capture stdout and stderr 

132 command_stdout = completed_process.stdout 

133 command_stderr = completed_process.stderr 

134 

135 process_exit_success = True 

136 

137 # Log stdout and stderr using logging 

138 logging.info("Command Output:\n%s", command_stdout) 

139 logging.info("Command Error Output:\n%s", command_stderr) 

140 except subprocess.CalledProcessError as e: 

141 process_exit_success = False 

142 # If the command exits with a non-zero status 

143 logging.error("Command Error: %sSTD OUT: %s\nExecuted Command: %s", e.stderr, e.stdout, 

144 " ".join(command_as_list)) 

145 except Exception as e: # skipcq: PYL-W0703 

146 process_exit_success = False 

147 logging.error("Exception while running simulation: %s", e) 

148 

149 return process_exit_success, command_stdout, command_stderr 

150 

151 

152def get_fluka_estimators(dir_path: Path) -> dict: 

153 """Function to get estimators from FLUKA output files.""" 

154 estimators_dict = {} 

155 

156 matching_files = list(dir_path.glob("*_fort.*")) 

157 if len(matching_files) == 0: 

158 logging.error("No *_fort.* files found in %s", dir_path) 

159 return estimators_dict 

160 

161 logging.debug("Found %d *_fort.* files in %s", len(matching_files), dir_path) 

162 files_pattern_pattern = str(dir_path / "*_fort.*") 

163 estimators_list = frompattern(pattern=files_pattern_pattern) 

164 for estimator in estimators_list: 

165 logging.debug("Appending estimator for %s", estimator.file_corename) 

166 

167 estimators_dict[estimator.file_corename] = estimator 

168 

169 return estimators_dict 

170 

171 

172def average_values(base_values: List[float], new_values: List[float], count: int) -> List[float]: 

173 """Average two lists of values""" 

174 return [sum(x) / (count + 1) for x in zip(map(lambda x: x * count, base_values), new_values)] 

175 

176 

177def average_estimators(base_list: list[dict], list_to_add: list[dict], averaged_count: int) -> list: 

178 """Averages estimators from two dicts""" 

179 logging.debug("Averaging estimators - already averaged: %d", averaged_count) 

180 for est_i, estimator_dict in enumerate(list_to_add): 

181 # check if estimator names are the same and if not, find matching estimator's index in base_list 

182 if estimator_dict["name"] != base_list[est_i]["name"]: 

183 est_i = next((i for i, item in enumerate(base_list) if item["name"] == estimator_dict["name"]), None) 

184 logging.debug("Averaging estimator %s", estimator_dict["name"]) 

185 for page_i, page_dict in enumerate(estimator_dict["pages"]): 

186 # check if page numbers are the same and if not, find matching page's index in base_list 

187 if page_dict["metadata"]["page_number"] != base_list[est_i]["pages"][page_i]["metadata"]["page_number"]: 

188 page_i = next((i for i, item in enumerate(base_list[est_i]["pages"]) 

189 if item["metadata"]["page_number"] == page_dict["metadata"]["page_number"]), None) 

190 

191 base_list[est_i]["pages"][page_i]["data"]["values"] = average_values( 

192 base_list[est_i]["pages"][page_i]["data"]["values"], page_dict["data"]["values"], averaged_count) 

193 

194 logging.debug("Averaged page %s with %d elements", page_dict["metadata"]["page_number"], 

195 len(page_dict["data"]["values"])) 

196 return base_list 

197 

198 

199def read_file(event: threading.Event, 

200 filepath: Path, 

201 simulation_id: int, 

202 task_id: str, 

203 update_key: str, 

204 timeout_wait_for_file: int = 20, 

205 timeout_wait_for_line: int = 5 * 60, 

206 next_backend_update_time: int = 2, 

207 logging_level: int = logging.WARNING): 

208 """Monitors log file of certain task, when new line with message matching regex appears, sends update to backend""" 

209 logging.getLogger(__name__).setLevel(logging_level) 

210 logfile = None 

211 update_time = 0 

212 logging.info("Started monitoring, simulation id: %d, task id: %d", simulation_id, task_id) 

213 # if the logfile is not created in the first X seconds, it is probably an error 

214 for i in range(timeout_wait_for_file): # maximum attempts, each attempt is one second 

215 if event.is_set(): 

216 return 

217 try: 

218 logging.debug("Trying to open file %s, attempt %d/%d", filepath, i, timeout_wait_for_file) 

219 logfile = open(filepath) # skipcq: PTC-W6004 

220 break 

221 except FileNotFoundError: 

222 time.sleep(1) 

223 

224 # if logfile was not created in the first minute, task is marked as failed 

225 if logfile is None: 

226 logging.error("Log file for task %d not found", task_id) 

227 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")} 

228 send_task_update(simulation_id, task_id, update_key, up_dict) 

229 return 

230 logging.debug("Log file for task %d found", task_id) 

231 

232 # create generator which waits for new lines in log file 

233 # if no new line appears in timeout_wait_for_line seconds, generator stops 

234 loglines = log_generator(logfile, event, timeout=timeout_wait_for_line) 

235 requested_primaries = 0 

236 logging.info("Parsing log file for task %d started", task_id) 

237 simulated_primaries = 0 

238 for line in loglines: 

239 if event.is_set(): 

240 return 

241 utc_now = datetime.utcnow() 

242 logging.debug("Parsing line: %s", line.rstrip()) 

243 if re.search(RUN_MATCH, line): 

244 logging.debug("Found RUN_MATCH in line: %s for file: %s and task: %d ", line.rstrip(), filepath, task_id) 

245 splitted = line.split() 

246 try: 

247 simulated_primaries = int(splitted[3]) 

248 except (IndexError, ValueError): 

249 logging.error("Cannot parse number of simulated primaries in line: %s", line.rstrip()) 

250 if (utc_now.timestamp() - update_time < next_backend_update_time # do not send update too often 

251 and requested_primaries >= simulated_primaries): 

252 logging.debug("Skipping update for task %d", task_id) 

253 continue 

254 update_time = utc_now.timestamp() 

255 estimated_seconds = 0 

256 try: 

257 estimated_seconds = int(splitted[9]) + int(splitted[7]) * 60 + int(splitted[5]) * 3600 

258 except (IndexError, ValueError): 

259 logging.error("Cannot parse estimated time in line: %s", line.rstrip()) 

260 up_dict = {"simulated_primaries": simulated_primaries, "estimated_time": estimated_seconds} 

261 logging.debug("Sending update for task %d, simulated primaries %d", task_id, simulated_primaries) 

262 send_task_update(simulation_id, task_id, update_key, up_dict) 

263 

264 elif re.search(REQUESTED_MATCH, line): 

265 logging.debug("Found REQUESTED_MATCH in line: %s for file: %s and task: %d ", line, filepath, task_id) 

266 # found a line with requested primaries, update database 

267 # task is in RUNNING state 

268 splitted = line.split(": ") 

269 requested_primaries = int(splitted[1]) 

270 up_dict = { 

271 "simulated_primaries": 0, 

272 "start_time": utc_now.isoformat(sep=" "), 

273 "task_state": EntityState.RUNNING.value 

274 } 

275 logging.debug("Sending update for task %d", task_id) 

276 send_task_update(simulation_id, task_id, update_key, up_dict) 

277 

278 elif re.search(TIMEOUT_MATCH, line): 

279 logging.error("Simulation watcher %d timed out", task_id) 

280 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")} 

281 send_task_update(simulation_id, task_id, update_key, up_dict) 

282 return 

283 

284 elif re.search(COMPLETE_MATCH, line): 

285 logging.debug("Found COMPLETE_MATCH in line: %s for file: %s and task: %d ", line, filepath, task_id) 

286 break 

287 

288 logging.info("Parsing log file for task %d finished", task_id) 

289 up_dict = { 

290 "simulated_primaries": simulated_primaries, 

291 "end_time": utc_now.isoformat(sep=" "), 

292 "task_state": EntityState.COMPLETED.value 

293 } 

294 logging.info("Sending final update for task %d, simulated primaries %d", task_id, simulated_primaries) 

295 send_task_update(simulation_id, task_id, update_key, up_dict) 

296 

297 

298def read_fluka_file(event: threading.Event, 

299 dirpath: Path, 

300 simulation_id: int, 

301 task_id: int, 

302 update_key: str, 

303 timeout_wait_for_file_s: int = 20, 

304 timeout_wait_for_line_s: int = 5 * 60, 

305 next_backend_update_time_s: int = 2, 

306 logging_level: int = logging.WARNING): 

307 """Monitors log file of fluka task""" 

308 logging.getLogger(__name__).setLevel(logging_level) 

309 logfile = None 

310 logging.info("Started monitoring, simulation id: %d, task id: %d", simulation_id, task_id) 

311 

312 # if the logfile is not created in the first X seconds, it is probably an error 

313 # continuation of awful glob path hack 

314 def get_first_matching_file() -> Optional[Path]: 

315 """Returns first matching file.""" 

316 path = next(dirpath.glob("fluka_*/*001.out"), None) 

317 return path.resolve() if path else None 

318 

319 for _ in range(timeout_wait_for_file_s): # maximum attempts, each attempt is one second 

320 if event.is_set(): 

321 return 

322 try: 

323 optional_file = get_first_matching_file() 

324 if not optional_file: 

325 time.sleep(1) 

326 continue 

327 logfile = open(optional_file) # skipcq: PTC-W6004 

328 break 

329 except FileNotFoundError: 

330 time.sleep(1) 

331 

332 # if logfile was not created in the first minute, task is marked as failed 

333 if logfile is None: 

334 logging.error("Log file for task %d not found", task_id) 

335 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")} 

336 send_task_update(simulation_id, task_id, update_key, up_dict) 

337 return 

338 logging.debug("Log file for task %d found", task_id) 

339 

340 # create generator which waits for new lines in log file 

341 # if no new line appears in timeout_wait_for_line seconds, generator stops 

342 loglines = log_generator(logfile, event, timeout=timeout_wait_for_line_s) 

343 logging.info("Parsing log file for task %d started", task_id) 

344 read_fluka_out_file(event, 

345 loglines, 

346 next_backend_update_time=next_backend_update_time_s, 

347 details=TaskDetails(simulation_id, task_id, update_key), 

348 verbose=logging_level <= logging.INFO) 

349 

350 

351def read_file_offline(filepath: Path) -> tuple[int, int]: 

352 """Reads log file and returns number of simulated and requested primaries""" 

353 simulated_primaries = 0 

354 requested_primaries = 0 

355 try: 

356 with open(filepath, 'r') as f: 

357 for line in f: 

358 logging.debug("Parsing line: %s", line.rstrip()) 

359 if re.search(RUN_MATCH, line): 

360 logging.debug("Found RUN_MATCH in line: %s for file: %s", line.rstrip(), filepath) 

361 splitted = line.split() 

362 simulated_primaries = int(splitted[3]) 

363 elif re.search(REQUESTED_MATCH, line): 

364 logging.debug("Found REQUESTED_MATCH in line: %s for file: %s", line.rstrip(), filepath) 

365 splitted = line.split(": ") 

366 requested_primaries = int(splitted[1]) 

367 except FileNotFoundError: 

368 logging.error("Log file %s not found", filepath) 

369 return simulated_primaries, requested_primaries