Coverage for yaptide/celery/utils/progress/fluka_monitor.py: 34%
90 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-22 07:31 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-22 07:31 +0000
1from dataclasses import dataclass
2from datetime import datetime, timezone
3import logging
4import re
5import threading
6from typing import Iterator, Optional, Tuple
7from yaptide.batch.watcher import TIMEOUT_MATCH
9from yaptide.celery.utils.requests import send_task_update
10from yaptide.utils.enums import EntityState
12# templates for regex matching output from `<simulation>_<no>.out` file
13S_OK_OUT_INIT = "Total time used for initialization:"
14S_OK_OUT_START = "1NUMBER OF BEAM"
15S_OK_OUT_IN_PROGRESS = " NEXT SEEDS:"
16S_OK_OUT_COLLECTED = " All cases handled by Feeder"
17S_OK_OUT_FIN_PRE_CHECK = "End of FLUKA"
18S_OK_OUT_FIN_PATTERN = re.compile(r"^ \* ======(?:( )*?)End of FLUKA [\w\-.]* run (?:( )*?) ====== \*")
20logger = logging.getLogger(__name__)
23def parse_progress_remaining_line(line: str) -> Optional[tuple[int, int]]:
24 """Function parsing the line with progress remaining information.
26 Args:
27 line (str): line to be parsed
28 Returns:
29 Tuple[int, int]: tuple with two integers representing the progress and remaining.
30 If the line cannot be parsed None is returned.
31 """
32 parts = line.split()
33 # expecting 6 sections which are int or floats
34 if len(parts) != 6:
35 return None
36 try:
37 [float(x) for x in parts]
38 except ValueError:
39 return None
40 return (int(parts[0]), int(parts[1]))
43@dataclass
44class TaskDetails:
45 """Class holding details about the task."""
47 simulation_id: int
48 task_id: str
49 update_key: str
52def time_now_utc() -> datetime:
53 """Function returning current time in UTC timezone."""
54 # because datetime.utcnow() is deprecated
55 return datetime.now(timezone.utc)
58def utc_without_offset(utc: datetime) -> str:
59 """Function returning current time in UTC timezone."""
60 return utc.strftime('%Y-%m-%d %H:%M:%S.%f')
63@dataclass
64class ProgressDetails:
65 """Class holding details about the progress."""
67 utc_now: datetime
68 requested_primaries: int = 0
69 last_update_timestamp: int = 0
72def check_progress(line: str, next_backend_update_time: int, details: TaskDetails,
73 progress_details: ProgressDetails) -> bool:
74 """Function checking if the line contains progress information and sending update if needed.
76 Function returns True if the line contained progress information, False otherwise.
77 """
78 res = parse_progress_remaining_line(line)
79 if res:
80 progress, remainder = res
81 logger.debug("Found progress remaining line with progress: %s, remaining: %s", progress, remainder)
82 if not progress_details.requested_primaries:
83 progress_details.requested_primaries = progress + remainder
84 up_dict = {
85 "simulated_primaries": progress,
86 "requested_primaries": progress_details.requested_primaries,
87 "start_time": utc_without_offset(progress_details.utc_now),
88 "task_state": EntityState.RUNNING.value
89 }
90 send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)
91 else:
92 if (progress_details.utc_now.timestamp() - progress_details.last_update_timestamp <
93 next_backend_update_time # do not send update too often
94 and progress_details.requested_primaries > progress):
95 return True
96 progress_details.last_update_timestamp = progress_details.utc_now.timestamp()
97 up_dict = {
98 "simulated_primaries": progress,
99 }
100 send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)
101 return True
102 return False
105def read_fluka_out_file(event: threading.Event,
106 line_iterator: Iterator[str],
107 next_backend_update_time: int,
108 details: TaskDetails,
109 verbose: bool = False) -> None:
110 """Function reading the fluka output file and reporting progress to the backend."""
111 in_progress = False
112 progress_details = ProgressDetails(utc_now=time_now_utc())
113 for line in line_iterator:
114 if event.is_set():
115 return
116 progress_details.utc_now = time_now_utc()
117 if in_progress:
118 if check_progress(line=line,
119 next_backend_update_time=next_backend_update_time,
120 details=details,
121 progress_details=progress_details):
122 continue
123 if line.startswith(S_OK_OUT_COLLECTED):
124 in_progress = False
125 if verbose:
126 logger.debug("Found end of simulation calculation line")
127 continue
128 else:
129 if line.startswith(S_OK_OUT_IN_PROGRESS):
130 in_progress = True
131 if verbose:
132 logger.debug("Found progress line")
133 continue
134 if line.startswith(S_OK_OUT_START):
135 logger.debug("Found start of the simulation")
136 continue
137 if S_OK_OUT_FIN_PRE_CHECK in line and re.match(S_OK_OUT_FIN_PATTERN, line):
138 logger.debug("Found end of the simulation")
139 break
140 # handle generator timeout
141 if re.search(TIMEOUT_MATCH, line):
142 logging.error("Simulation watcher %s timed out", details.task_id)
143 up_dict = {"task_state": EntityState.FAILED.value, "end_time": time_now_utc().isoformat(sep=" ")}
144 send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)
145 return
147 logging.info("Parsing log file for task %s finished", details.task_id)
148 up_dict = {
149 "simulated_primaries": progress_details.requested_primaries,
150 "end_time": utc_without_offset(progress_details.utc_now),
151 "task_state": EntityState.COMPLETED.value
152 }
153 logging.info("Sending final update for task %d, simulated primaries %d", details.task_id,
154 progress_details.requested_primaries)
155 send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)