Coverage for yaptide/celery/utils/progress/fluka_monitor.py: 34%

90 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-04 00:31 +0000

1from dataclasses import dataclass 

2from datetime import datetime, timezone 

3import logging 

4import re 

5import threading 

6from typing import Iterator, Optional, Tuple 

7from yaptide.batch.watcher import TIMEOUT_MATCH 

8 

9from yaptide.celery.utils.requests import send_task_update 

10from yaptide.utils.enums import EntityState 

11 

12# templates for regex matching output from `<simulation>_<no>.out` file 

13S_OK_OUT_INIT = "Total time used for initialization:" 

14S_OK_OUT_START = "1NUMBER OF BEAM" 

15S_OK_OUT_IN_PROGRESS = " NEXT SEEDS:" 

16S_OK_OUT_COLLECTED = " All cases handled by Feeder" 

17S_OK_OUT_FIN_PRE_CHECK = "End of FLUKA" 

18S_OK_OUT_FIN_PATTERN = re.compile(r"^ \* ======(?:( )*?)End of FLUKA [\w\-.]* run (?:( )*?) ====== \*") 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23def parse_progress_remaining_line(line: str) -> Optional[tuple[int, int]]: 

24 """Function parsing the line with progress remaining information. 

25 

26 Args: 

27 line (str): line to be parsed 

28 Returns: 

29 Tuple[int, int]: tuple with two integers representing the progress and remaining. 

30 If the line cannot be parsed None is returned. 

31 """ 

32 parts = line.split() 

33 # expecting 6 sections which are int or floats 

34 if len(parts) != 6: 

35 return None 

36 try: 

37 [float(x) for x in parts] 

38 except ValueError: 

39 return None 

40 return (int(parts[0]), int(parts[1])) 

41 

42 

43@dataclass 

44class TaskDetails: 

45 """Class holding details about the task.""" 

46 

47 simulation_id: int 

48 task_id: str 

49 update_key: str 

50 

51 

52def time_now_utc() -> datetime: 

53 """Function returning current time in UTC timezone.""" 

54 # because datetime.utcnow() is deprecated 

55 return datetime.now(timezone.utc) 

56 

57 

58def utc_without_offset(utc: datetime) -> str: 

59 """Function returning current time in UTC timezone.""" 

60 return utc.strftime('%Y-%m-%d %H:%M:%S.%f') 

61 

62 

63@dataclass 

64class ProgressDetails: 

65 """Class holding details about the progress.""" 

66 

67 utc_now: datetime 

68 requested_primaries: int = 0 

69 last_update_timestamp: int = 0 

70 

71 

72def check_progress(line: str, next_backend_update_time: int, details: TaskDetails, 

73 progress_details: ProgressDetails) -> bool: 

74 """Function checking if the line contains progress information and sending update if needed. 

75 

76 Function returns True if the line contained progress information, False otherwise. 

77 """ 

78 res = parse_progress_remaining_line(line) 

79 if res: 

80 progress, remainder = res 

81 logger.debug("Found progress remaining line with progress: %s, remaining: %s", progress, remainder) 

82 if not progress_details.requested_primaries: 

83 progress_details.requested_primaries = progress + remainder 

84 up_dict = { 

85 "simulated_primaries": progress, 

86 "requested_primaries": progress_details.requested_primaries, 

87 "start_time": utc_without_offset(progress_details.utc_now), 

88 "task_state": EntityState.RUNNING.value 

89 } 

90 send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict) 

91 else: 

92 if (progress_details.utc_now.timestamp() - progress_details.last_update_timestamp < 

93 next_backend_update_time # do not send update too often 

94 and progress_details.requested_primaries > progress): 

95 return True 

96 progress_details.last_update_timestamp = progress_details.utc_now.timestamp() 

97 up_dict = { 

98 "simulated_primaries": progress, 

99 } 

100 send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict) 

101 return True 

102 return False 

103 

104 

105def read_fluka_out_file(event: threading.Event, 

106 line_iterator: Iterator[str], 

107 next_backend_update_time: int, 

108 details: TaskDetails, 

109 verbose: bool = False) -> None: 

110 """Function reading the fluka output file and reporting progress to the backend.""" 

111 in_progress = False 

112 progress_details = ProgressDetails(utc_now=time_now_utc()) 

113 for line in line_iterator: 

114 if event.is_set(): 

115 return 

116 progress_details.utc_now = time_now_utc() 

117 if in_progress: 

118 if check_progress(line=line, 

119 next_backend_update_time=next_backend_update_time, 

120 details=details, 

121 progress_details=progress_details): 

122 continue 

123 if line.startswith(S_OK_OUT_COLLECTED): 

124 in_progress = False 

125 if verbose: 

126 logger.debug("Found end of simulation calculation line") 

127 continue 

128 else: 

129 if line.startswith(S_OK_OUT_IN_PROGRESS): 

130 in_progress = True 

131 if verbose: 

132 logger.debug("Found progress line") 

133 continue 

134 if line.startswith(S_OK_OUT_START): 

135 logger.debug("Found start of the simulation") 

136 continue 

137 if S_OK_OUT_FIN_PRE_CHECK in line and re.match(S_OK_OUT_FIN_PATTERN, line): 

138 logger.debug("Found end of the simulation") 

139 break 

140 # handle generator timeout 

141 if re.search(TIMEOUT_MATCH, line): 

142 logging.error("Simulation watcher %s timed out", details.task_id) 

143 up_dict = {"task_state": EntityState.FAILED.value, "end_time": time_now_utc().isoformat(sep=" ")} 

144 send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict) 

145 return 

146 

147 logging.info("Parsing log file for task %s finished", details.task_id) 

148 up_dict = { 

149 "simulated_primaries": progress_details.requested_primaries, 

150 "end_time": utc_without_offset(progress_details.utc_now), 

151 "task_state": EntityState.COMPLETED.value 

152 } 

153 logging.info("Sending final update for task %d, simulated primaries %d", details.task_id, 

154 progress_details.requested_primaries) 

155 send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)