Skip to content

fluka_monitor

celery.utils.progress.fluka_monitor

S_OK_OUT_COLLECTED module-attribute

S_OK_OUT_COLLECTED = ' All cases handled by Feeder'

S_OK_OUT_FIN_PATTERN module-attribute

S_OK_OUT_FIN_PATTERN = compile(
    "^ \\* ======(?:( )*?)End of FLUKA [\\w\\-.]* run (?:( )*?) ====== \\*"
)

S_OK_OUT_FIN_PRE_CHECK module-attribute

S_OK_OUT_FIN_PRE_CHECK = 'End of FLUKA'

S_OK_OUT_INIT module-attribute

S_OK_OUT_INIT = 'Total time used for initialization:'

S_OK_OUT_IN_PROGRESS module-attribute

S_OK_OUT_IN_PROGRESS = ' NEXT SEEDS:'

S_OK_OUT_START module-attribute

S_OK_OUT_START = '1NUMBER OF BEAM'

logger module-attribute

logger = getLogger(__name__)

ProgressDetails dataclass

Class holding details about the progress.

Source code in yaptide/celery/utils/progress/fluka_monitor.py
63
64
65
66
67
68
69
@dataclass
class ProgressDetails:
    """Class holding details about the progress."""

    utc_now: datetime
    requested_primaries: int = 0
    last_update_timestamp: int = 0

last_update_timestamp class-attribute instance-attribute

last_update_timestamp = 0

requested_primaries class-attribute instance-attribute

requested_primaries = 0

utc_now instance-attribute

utc_now

__init__

__init__(
    utc_now, requested_primaries=0, last_update_timestamp=0
)

TaskDetails dataclass

Class holding details about the task.

Source code in yaptide/celery/utils/progress/fluka_monitor.py
43
44
45
46
47
48
49
@dataclass
class TaskDetails:
    """Class holding details about the task."""

    simulation_id: int
    task_id: str
    update_key: str

simulation_id instance-attribute

simulation_id

task_id instance-attribute

task_id

update_key instance-attribute

update_key

__init__

__init__(simulation_id, task_id, update_key)

check_progress

check_progress(
    line,
    next_backend_update_time,
    details,
    progress_details,
)

Function checking if the line contains progress information and sending update if needed.

Function returns True if the line contained progress information, False otherwise.

Source code in yaptide/celery/utils/progress/fluka_monitor.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def check_progress(line: str, next_backend_update_time: int, details: TaskDetails,
                   progress_details: ProgressDetails) -> bool:
    """Function checking if the line contains progress information and sending update if needed.

    Function returns True if the line contained progress information, False otherwise.
    """
    res = parse_progress_remaining_line(line)
    if res:
        progress, remainder = res
        logger.debug("Found progress remaining line with progress: %s, remaining: %s", progress, remainder)
        if not progress_details.requested_primaries:
            progress_details.requested_primaries = progress + remainder
            up_dict = {
                "simulated_primaries": progress,
                "requested_primaries": progress_details.requested_primaries,
                "start_time": utc_without_offset(progress_details.utc_now),
                "task_state": EntityState.RUNNING.value
            }
            send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)
        else:
            if (progress_details.utc_now.timestamp() - progress_details.last_update_timestamp <
                    next_backend_update_time  # do not send update too often
                    and progress_details.requested_primaries > progress):
                return True
            progress_details.last_update_timestamp = progress_details.utc_now.timestamp()
            up_dict = {
                "simulated_primaries": progress,
            }
            send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)
        return True
    return False

parse_progress_remaining_line

parse_progress_remaining_line(line)

Function parsing the line with progress remaining information.

Parameters:

Name Type Description Default
line str

line to be parsed

required

Returns: Tuple[int, int]: tuple with two integers representing the progress and remaining. If the line cannot be parsed None is returned.

Source code in yaptide/celery/utils/progress/fluka_monitor.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def parse_progress_remaining_line(line: str) -> Optional[tuple[int, int]]:
    """Function parsing the line with progress remaining information.

    Args:
        line (str): line to be parsed
    Returns:
        Tuple[int, int]: tuple with two integers representing the progress and remaining.
        If the line cannot be parsed None is returned.
    """
    parts = line.split()
    # expecting 6 sections which are int or floats
    if len(parts) != 6:
        return None
    try:
        [float(x) for x in parts]
    except ValueError:
        return None
    return (int(parts[0]), int(parts[1]))

read_fluka_out_file

read_fluka_out_file(
    event,
    line_iterator,
    next_backend_update_time,
    details,
    verbose=False,
)

Function reading the fluka output file and reporting progress to the backend.

Source code in yaptide/celery/utils/progress/fluka_monitor.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def read_fluka_out_file(event: threading.Event,
                        line_iterator: Iterator[str],
                        next_backend_update_time: int,
                        details: TaskDetails,
                        verbose: bool = False) -> None:
    """Function reading the fluka output file and reporting progress to the backend."""
    in_progress = False
    progress_details = ProgressDetails(utc_now=time_now_utc())
    for line in line_iterator:
        if event.is_set():
            return
        progress_details.utc_now = time_now_utc()
        if in_progress:
            if check_progress(line=line,
                              next_backend_update_time=next_backend_update_time,
                              details=details,
                              progress_details=progress_details):
                continue
            if line.startswith(S_OK_OUT_COLLECTED):
                in_progress = False
                if verbose:
                    logger.debug("Found end of simulation calculation line")
                continue
        else:
            if line.startswith(S_OK_OUT_IN_PROGRESS):
                in_progress = True
                if verbose:
                    logger.debug("Found progress line")
                continue
            if line.startswith(S_OK_OUT_START):
                logger.debug("Found start of the simulation")
                continue
            if S_OK_OUT_FIN_PRE_CHECK in line and re.match(S_OK_OUT_FIN_PATTERN, line):
                logger.debug("Found end of the simulation")
                break
        # handle generator timeout
        if re.search(TIMEOUT_MATCH, line):
            logging.error("Simulation watcher %s timed out", details.task_id)
            up_dict = {"task_state": EntityState.FAILED.value, "end_time": time_now_utc().isoformat(sep=" ")}
            send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)
            return

    logging.info("Parsing log file for task %s finished", details.task_id)
    up_dict = {
        "simulated_primaries": progress_details.requested_primaries,
        "end_time": utc_without_offset(progress_details.utc_now),
        "task_state": EntityState.COMPLETED.value
    }
    logging.info("Sending final update for task %d, simulated primaries %d", details.task_id,
                 progress_details.requested_primaries)
    send_task_update(details.simulation_id, details.task_id, details.update_key, up_dict)

time_now_utc

time_now_utc()

Function returning current time in UTC timezone.

Source code in yaptide/celery/utils/progress/fluka_monitor.py
52
53
54
55
def time_now_utc() -> datetime:
    """Function returning current time in UTC timezone."""
    # because datetime.utcnow() is deprecated
    return datetime.now(timezone.utc)

utc_without_offset

utc_without_offset(utc)

Function returning current time in UTC timezone.

Source code in yaptide/celery/utils/progress/fluka_monitor.py
58
59
60
def utc_without_offset(utc: datetime) -> str:
    """Function returning current time in UTC timezone."""
    return utc.strftime('%Y-%m-%d %H:%M:%S.%f')