Coverage for yaptide/celery/utils/pymc.py: 59%
226 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
1import logging
2import os
3import re
4import subprocess
5import tempfile
6import threading
7import time
8from datetime import datetime
9from pathlib import Path
10from typing import List, Optional, Protocol
12from pymchelper.executor.options import SimulationSettings, SimulatorType
13from pymchelper.input_output import frompattern
14from pymchelper.executor.runner import Runner
16from yaptide.batch.watcher import (COMPLETE_MATCH, REQUESTED_MATCH, RUN_MATCH, TIMEOUT_MATCH, log_generator)
17from yaptide.celery.utils.progress.fluka_monitor import TaskDetails, read_fluka_out_file
18from yaptide.celery.utils.requests import send_task_update
19from yaptide.utils.enums import EntityState
22def get_tmp_dir() -> Path:
23 """Function to get temporary directory from environment variables."""
24 # lets try by default to use python tempfile module
25 tmp_dir = tempfile.gettempdir()
26 logging.debug("1. tempfile.gettempdir() is: %s", tmp_dir)
28 # if the TMPDIR env variable is set we will use it to override the default
29 logging.info("1. TMPDIR is: %s", os.environ.get("TMPDIR", "not set"))
30 if os.environ.get("TMPDIR"):
31 tmp_dir = os.environ.get("TMPDIR")
33 # if the TEMP env variable is set we will use it to override the default
34 logging.info("2. TEMP is: %s", os.environ.get("TEMP", "not set"))
35 if os.environ.get("TEMP"):
36 tmp_dir = os.environ.get("TEMP")
38 # if the TMP env variable is set we will use it to override the default
39 logging.info("3. TMP is: %s", os.environ.get("TMP", "not set"))
40 if os.environ.get("TMP"):
41 tmp_dir = os.environ.get("TMP")
43 return Path(tmp_dir)
46def command_to_run_shieldhit(dir_path: Path, task_id: int) -> list[str]:
47 """Function to create command to run SHIELD-HIT12A."""
48 settings = SimulationSettings(
49 input_path=dir_path, # skipcq: PYL-W0612 # usefull
50 simulator_type=SimulatorType.shieldhit,
51 simulator_exec_path=None, # useless, we guess from PATH
52 cmdline_opts="") # useless, we could use -q in the future
53 # last part of task_id gives an integer seed for random number generator
54 settings.set_rng_seed(task_id)
55 command_as_list = str(settings).split()
56 command_as_list.append(str(dir_path))
57 return command_as_list
60def get_shieldhit_estimators(dir_path: Path) -> dict:
61 """Function to get estimators from SHIELD-HIT12A output files."""
62 estimators_dict = {}
64 matching_files = list(dir_path.glob("*.bdo"))
65 if len(matching_files) == 0:
66 logging.error("No *.bdo files found in %s", dir_path)
67 return estimators_dict
69 logging.debug("Found %d *.bdo files in %s", len(matching_files), dir_path)
70 files_pattern_pattern = str(dir_path / "*.bdo")
71 estimators_list = frompattern(pattern=files_pattern_pattern)
72 for estimator in estimators_list:
73 logging.debug("Appending estimator for %s", estimator.file_corename)
74 estimators_dict[estimator.file_corename] = estimator
76 return estimators_dict
79def command_to_run_fluka(dir_path: Path, task_id: str) -> list[str]:
80 """Function to create command to run FLUKA."""
81 input_file = next(dir_path.glob("*.inp"), None)
82 if input_file is None:
83 logging.debug("failed to generate fluka command. No *.inp file found in %s", dir_path)
84 # if there is no input file, raise an error
85 # this should never happen
86 raise FileNotFoundError("Input file not found")
88 # create settings object
89 # we are providing input file, simulator type and additional options
90 # provided option M with value 1 will run execute only one simulation cycle, default is 5
91 settings = SimulationSettings(input_path=str(input_file), simulator_type=SimulatorType.fluka, cmdline_opts="-M 1")
92 update_rng_seed_in_fluka_file(input_file, task_id)
93 command_as_list = str(settings).split()
94 command_as_list.append(str(input_file))
95 return command_as_list
98def update_rng_seed_in_fluka_file(input_file: Path, task_id: int) -> None:
99 """Function to update random seed in FLUKA input file."""
101 class UpdateFlukaRandomSeed(Protocol):
102 """Definition of protocol for updating random seed in fluka input file.
104 Its purpose is to allow us to use private method of Runner class.
105 """
107 def __call__(self, file_path: str, rng_seed: int) -> None:
108 """Updates random seed in fluka input file"""
110 random_seed = task_id
111 update_fluka_function: UpdateFlukaRandomSeed = Runner._Runner__update_fluka_input_file # pylint: disable=W0212
112 update_fluka_function(str(input_file.resolve()), random_seed)
115def execute_simulation_subprocess(dir_path: Path, command_as_list: list[str]) -> tuple[bool, str, str]:
116 """Function to execute simulation subprocess."""
117 process_exit_success: bool = True
118 command_stdout: str = ""
119 command_stderr: str = ""
120 try:
121 completed_process = subprocess.run(command_as_list,
122 check=True,
123 cwd=str(dir_path),
124 stdout=subprocess.PIPE,
125 stderr=subprocess.PIPE,
126 text=True)
127 logging.info("simulation subprocess with return code %d finished", completed_process.returncode)
129 # Capture stdout and stderr
130 command_stdout = completed_process.stdout
131 command_stderr = completed_process.stderr
133 process_exit_success = True
135 # Log stdout and stderr using logging
136 logging.info("Command Output:\n%s", command_stdout)
137 logging.info("Command Error Output:\n%s", command_stderr)
138 except subprocess.CalledProcessError as e:
139 process_exit_success = False
140 # If the command exits with a non-zero status
141 logging.error("Command Error: %sSTD OUT: %s\nExecuted Command: %s", e.stderr, e.stdout,
142 " ".join(command_as_list))
143 except Exception as e: # skipcq: PYL-W0703
144 process_exit_success = False
145 logging.error("Exception while running simulation: %s", e)
147 return process_exit_success, command_stdout, command_stderr
150def get_fluka_estimators(dir_path: Path) -> dict:
151 """Function to get estimators from FLUKA output files."""
152 estimators_dict = {}
154 matching_files = list(dir_path.glob("*_fort.*"))
155 if len(matching_files) == 0:
156 logging.error("No *_fort.* files found in %s", dir_path)
157 return estimators_dict
159 logging.debug("Found %d *_fort.* files in %s", len(matching_files), dir_path)
160 files_pattern_pattern = str(dir_path / "*_fort.*")
161 estimators_list = frompattern(pattern=files_pattern_pattern)
162 for estimator in estimators_list:
163 logging.debug("Appending estimator for %s", estimator.file_corename)
164 for i, page in enumerate(estimator.pages):
165 page.page_number = i
167 estimators_dict[estimator.file_corename] = estimator
169 return estimators_dict
172def average_values(base_values: List[float], new_values: List[float], count: int) -> List[float]:
173 """Average two lists of values"""
174 return [sum(x) / (count + 1) for x in zip(map(lambda x: x * count, base_values), new_values)]
177def average_estimators(base_list: list[dict], list_to_add: list[dict], averaged_count: int) -> list:
178 """Averages estimators from two dicts"""
179 logging.debug("Averaging estimators - already averaged: %d", averaged_count)
180 for est_i, estimator_dict in enumerate(list_to_add):
181 # check if estimator names are the same and if not, find matching estimator's index in base_list
182 if estimator_dict["name"] != base_list[est_i]["name"]:
183 est_i = next((i for i, item in enumerate(base_list) if item["name"] == estimator_dict["name"]), None)
184 logging.debug("Averaging estimator %s", estimator_dict["name"])
185 for page_i, page_dict in enumerate(estimator_dict["pages"]):
186 # check if page numbers are the same and if not, find matching page's index in base_list
187 if page_dict["metadata"]["page_number"] != base_list[est_i]["pages"][page_i]["metadata"]["page_number"]:
188 page_i = next((i for i, item in enumerate(base_list[est_i]["pages"])
189 if item["metadata"]["page_number"] == page_dict["metadata"]["page_number"]), None)
191 base_list[est_i]["pages"][page_i]["data"]["values"] = average_values(
192 base_list[est_i]["pages"][page_i]["data"]["values"], page_dict["data"]["values"], averaged_count)
194 logging.debug("Averaged page %s with %d elements", page_dict["metadata"]["page_number"],
195 len(page_dict["data"]["values"]))
196 return base_list
199def read_file(event: threading.Event,
200 filepath: Path,
201 simulation_id: int,
202 task_id: str,
203 update_key: str,
204 timeout_wait_for_file: int = 20,
205 timeout_wait_for_line: int = 5 * 60,
206 next_backend_update_time: int = 2,
207 logging_level: int = logging.WARNING):
208 """Monitors log file of certain task, when new line with message matching regex appears, sends update to backend"""
209 logging.getLogger(__name__).setLevel(logging_level)
210 logfile = None
211 update_time = 0
212 logging.info("Started monitoring, simulation id: %d, task id: %d", simulation_id, task_id)
213 # if the logfile is not created in the first X seconds, it is probably an error
214 for i in range(timeout_wait_for_file): # maximum attempts, each attempt is one second
215 if event.is_set():
216 return
217 try:
218 logging.debug("Trying to open file %s, attempt %d/%d", filepath, i, timeout_wait_for_file)
219 logfile = open(filepath) # skipcq: PTC-W6004
220 break
221 except FileNotFoundError:
222 time.sleep(1)
224 # if logfile was not created in the first minute, task is marked as failed
225 if logfile is None:
226 logging.error("Log file for task %d not found", task_id)
227 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")}
228 send_task_update(simulation_id, task_id, update_key, up_dict)
229 return
230 logging.debug("Log file for task %d found", task_id)
232 # create generator which waits for new lines in log file
233 # if no new line appears in timeout_wait_for_line seconds, generator stops
234 loglines = log_generator(logfile, event, timeout=timeout_wait_for_line)
235 requested_primaries = 0
236 logging.info("Parsing log file for task %d started", task_id)
237 simulated_primaries = 0
238 for line in loglines:
239 if event.is_set():
240 return
241 utc_now = datetime.utcnow()
242 logging.debug("Parsing line: %s", line.rstrip())
243 if re.search(RUN_MATCH, line):
244 logging.debug("Found RUN_MATCH in line: %s for file: %s and task: %d ", line.rstrip(), filepath, task_id)
245 splitted = line.split()
246 try:
247 simulated_primaries = int(splitted[3])
248 except (IndexError, ValueError):
249 logging.error("Cannot parse number of simulated primaries in line: %s", line.rstrip())
250 if (utc_now.timestamp() - update_time < next_backend_update_time # do not send update too often
251 and requested_primaries >= simulated_primaries):
252 logging.debug("Skipping update for task %d", task_id)
253 continue
254 update_time = utc_now.timestamp()
255 estimated_seconds = 0
256 try:
257 estimated_seconds = int(splitted[9]) + int(splitted[7]) * 60 + int(splitted[5]) * 3600
258 except (IndexError, ValueError):
259 logging.error("Cannot parse estimated time in line: %s", line.rstrip())
260 up_dict = {"simulated_primaries": simulated_primaries, "estimated_time": estimated_seconds}
261 logging.debug("Sending update for task %d, simulated primaries %d", task_id, simulated_primaries)
262 send_task_update(simulation_id, task_id, update_key, up_dict)
264 elif re.search(REQUESTED_MATCH, line):
265 logging.debug("Found REQUESTED_MATCH in line: %s for file: %s and task: %d ", line, filepath, task_id)
266 # found a line with requested primaries, update database
267 # task is in RUNNING state
268 splitted = line.split(": ")
269 requested_primaries = int(splitted[1])
270 up_dict = {
271 "simulated_primaries": 0,
272 "requested_primaries": requested_primaries,
273 "start_time": utc_now.isoformat(sep=" "),
274 "task_state": EntityState.RUNNING.value
275 }
276 logging.debug("Sending update for task %d, requested primaries %d", task_id, requested_primaries)
277 send_task_update(simulation_id, task_id, update_key, up_dict)
279 elif re.search(TIMEOUT_MATCH, line):
280 logging.error("Simulation watcher %d timed out", task_id)
281 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")}
282 send_task_update(simulation_id, task_id, update_key, up_dict)
283 return
285 elif re.search(COMPLETE_MATCH, line):
286 logging.debug("Found COMPLETE_MATCH in line: %s for file: %s and task: %d ", line, filepath, task_id)
287 break
289 logging.info("Parsing log file for task %d finished", task_id)
290 up_dict = {
291 "simulated_primaries": requested_primaries,
292 "end_time": utc_now.isoformat(sep=" "),
293 "task_state": EntityState.COMPLETED.value
294 }
295 logging.info("Sending final update for task %d, simulated primaries %d", task_id, simulated_primaries)
296 send_task_update(simulation_id, task_id, update_key, up_dict)
299def read_fluka_file(event: threading.Event,
300 dirpath: Path,
301 simulation_id: int,
302 task_id: int,
303 update_key: str,
304 timeout_wait_for_file_s: int = 20,
305 timeout_wait_for_line_s: int = 5 * 60,
306 next_backend_update_time_s: int = 2,
307 logging_level: int = logging.WARNING):
308 """Monitors log file of fluka task"""
309 logging.getLogger(__name__).setLevel(logging_level)
310 logfile = None
311 logging.info("Started monitoring, simulation id: %d, task id: %d", simulation_id, task_id)
313 # if the logfile is not created in the first X seconds, it is probably an error
314 # continuation of awful glob path hack
315 def get_first_matching_file() -> Optional[Path]:
316 """Returns first matching file."""
317 path = next(dirpath.glob("fluka_*/*001.out"), None)
318 return path.resolve() if path else None
320 for _ in range(timeout_wait_for_file_s): # maximum attempts, each attempt is one second
321 if event.is_set():
322 return
323 try:
324 optional_file = get_first_matching_file()
325 if not optional_file:
326 time.sleep(1)
327 continue
328 logfile = open(optional_file) # skipcq: PTC-W6004
329 break
330 except FileNotFoundError:
331 time.sleep(1)
333 # if logfile was not created in the first minute, task is marked as failed
334 if logfile is None:
335 logging.error("Log file for task %d not found", task_id)
336 up_dict = {"task_state": EntityState.FAILED.value, "end_time": datetime.utcnow().isoformat(sep=" ")}
337 send_task_update(simulation_id, task_id, update_key, up_dict)
338 return
339 logging.debug("Log file for task %d found", task_id)
341 # create generator which waits for new lines in log file
342 # if no new line appears in timeout_wait_for_line seconds, generator stops
343 loglines = log_generator(logfile, event, timeout=timeout_wait_for_line_s)
344 logging.info("Parsing log file for task %d started", task_id)
345 read_fluka_out_file(event,
346 loglines,
347 next_backend_update_time=next_backend_update_time_s,
348 details=TaskDetails(simulation_id, task_id, update_key),
349 verbose=logging_level <= logging.INFO)
352def read_file_offline(filepath: Path) -> tuple[int, int]:
353 """Reads log file and returns number of simulated and requested primaries"""
354 simulated_primaries = 0
355 requested_primaries = 0
356 try:
357 with open(filepath, 'r') as f:
358 for line in f:
359 logging.debug("Parsing line: %s", line.rstrip())
360 if re.search(RUN_MATCH, line):
361 logging.debug("Found RUN_MATCH in line: %s for file: %s", line.rstrip(), filepath)
362 splitted = line.split()
363 simulated_primaries = int(splitted[3])
364 elif re.search(REQUESTED_MATCH, line):
365 logging.debug("Found REQUESTED_MATCH in line: %s for file: %s", line.rstrip(), filepath)
366 splitted = line.split(": ")
367 requested_primaries = int(splitted[1])
368 except FileNotFoundError:
369 logging.error("Log file %s not found", filepath)
370 return simulated_primaries, requested_primaries