Coverage for yaptide/utils/sim_utils.py: 82%
130 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-22 07:31 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-22 07:31 +0000
1import copy
2import json
3import logging
4import re
5from enum import Enum, auto
6from pathlib import Path
8from pymchelper.estimator import Estimator
9from pymchelper.writers.json import JsonWriter
10from pymchelper.flair.Input import Card
11from converter.api import (get_parser_from_str, run_parser)
13NSTAT_MATCH = r"NSTAT\s*\d*\s*\d*"
16def estimators_to_list(estimators_dict: dict, dir_path: Path) -> list[dict]:
17 """Convert simulation output to JSON dictionary representation (to be consumed by UI)"""
18 if not estimators_dict:
19 return {"message": "No estimators"}
21 # result_dict is a dictionary, which is later converted to json
22 # to provide readable API response for fronted
23 # keys in results_dict are estimator names, values are the estimator objects
24 result_estimators = []
25 estimator: Estimator
26 for estimator_key, estimator in estimators_dict.items():
27 filepath = dir_path / estimator_key
28 writer = JsonWriter(str(filepath), None)
29 writer.write(estimator)
31 with open(writer.filename, "r") as json_file:
32 est_dict = json.load(json_file)
33 est_dict["name"] = estimator_key
34 result_estimators.append(est_dict)
36 return result_estimators
39class JSON_TYPE(Enum):
40 """Class defining custom JSON types"""
42 Editor = auto()
43 Files = auto()
46def get_json_type(payload_dict: dict) -> JSON_TYPE:
47 """Returns type of provided JSON"""
48 if "input_files" in payload_dict:
49 return JSON_TYPE.Files
50 return JSON_TYPE.Editor
53def convert_editor_dict_to_files_dict(editor_dict: dict, parser_type: str) -> dict:
54 """
55 Convert payload data to dictionary with filenames and contents for Editor type projects
56 Otherwise return empty dictionary
57 """
58 conv_parser = get_parser_from_str(parser_type)
59 files_dict = run_parser(parser=conv_parser, input_data=editor_dict)
60 return files_dict
63def check_and_convert_payload_to_files_dict(payload_dict: dict) -> dict:
64 """
65 Convert payload data to dictionary with filenames and contents for Editor type projects
66 Otherwise return empty dictionary
67 """
68 files_dict = {}
69 json_type = get_json_type(payload_dict)
70 if json_type == JSON_TYPE.Editor:
71 files_dict = convert_editor_dict_to_files_dict(editor_dict=payload_dict["input_json"],
72 parser_type=payload_dict["sim_type"])
73 else:
74 logging.warning("Project of %s used, conversion works only for Editor projects", json_type)
75 return files_dict
78def adjust_primaries_in_editor_dict(payload_editor_dict: dict, ntasks: int = None) -> tuple[dict, int]:
79 """
80 Replaces number of primaries in `payload_editor_dict`
81 if `ntasks` parameter is provided, it is used over one
82 provided in `payload_editor_dict`
83 """
84 if ntasks is None:
85 ntasks = payload_editor_dict['ntasks']
86 else:
87 logging.warning("ntasks value was specified as %d and will be overwritten", ntasks)
89 editor_dict = copy.deepcopy(payload_editor_dict['input_json'])
90 number_of_all_primaries = editor_dict['beam']['numberOfParticles']
91 editor_dict['beam']['numberOfParticles'] //= ntasks
92 return editor_dict, number_of_all_primaries
95def adjust_primaries_in_files_dict(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]:
96 """
97 Replaces number of primaries in `payload_files_dict`
98 if `ntasks` parameter is provided, it is used over one
99 provided in `payload_files_dict`
100 """
101 if ntasks is None:
102 ntasks = payload_files_dict['ntasks']
103 else:
104 logging.warning("ntasks value was specified as %d and will be overwritten", ntasks)
106 input_files = payload_files_dict['input_files']
107 # determining input file type
108 # should be done in more robust way which will require a lot of refactoring to pass sim_type
109 if 'beam.dat' in input_files:
110 return adjust_primaries_for_shieldhit_files(payload_files_dict=payload_files_dict, ntasks=ntasks)
111 if next((file for file in input_files if file.endswith(".inp")), None):
112 return adjust_primaries_for_fluka_files(payload_files_dict=payload_files_dict, ntasks=ntasks)
113 return {}, 0
116def adjust_primaries_for_shieldhit_files(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]:
117 """Adjusts number of primaries in beam.dat file for SHIELD-HIT12A"""
118 files_dict = copy.deepcopy(payload_files_dict['input_files'])
119 all_beam_lines: list[str] = files_dict['beam.dat'].split('\n')
120 all_beam_lines_with_nstat = [line for line in all_beam_lines if line.lstrip().startswith('NSTAT')]
121 beam_lines_count = len(all_beam_lines_with_nstat)
122 if beam_lines_count != 1:
123 logging.warning("Found unexpected number of lines with NSTAT keyword: %d", beam_lines_count)
124 if beam_lines_count < 1:
125 return files_dict, 0
126 number_of_all_primaries: str = all_beam_lines_with_nstat[0].split()[1]
127 primaries_per_task = str(int(number_of_all_primaries) // ntasks)
128 for i in range(len(all_beam_lines)):
129 if re.search(NSTAT_MATCH, all_beam_lines[i]):
130 # line below replaces first found nstat value
131 # it is important to specify 3rd argument as 1
132 # because otherwise values further in line might be changed to
133 all_beam_lines[i] = all_beam_lines[i].replace(number_of_all_primaries, primaries_per_task, 1)
134 files_dict['beam.dat'] = '\n'.join(all_beam_lines)
135 # number_of_tasks = payload_files_dict['ntasks'] -> to be implemented in UI
136 # here we manipulate the files_dict['beam.dat'] file to adjust number of primaries
137 # we manipulate content of the file, no need to write the file to disk
138 return files_dict, int(number_of_all_primaries)
141def adjust_primaries_for_fluka_files(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]:
142 """Adjusts number of primaries in *.inp file for FLUKA"""
143 files_dict = copy.deepcopy(payload_files_dict['input_files'])
144 input_file = next((file for file in files_dict if file.endswith(".inp")), None)
145 if not input_file:
146 return {}, 0
148 # read number of primaries from fluka file
149 all_input_lines: list[str] = files_dict[input_file].split('\n')
150 # get value from START card
151 start_card = next((line for line in all_input_lines if line.lstrip().startswith('START')), None)
152 number_of_all_primaries = start_card.split()[1]
153 parsed_number_of_all_primaries = int(float(number_of_all_primaries))
154 primaries_per_task = parsed_number_of_all_primaries // ntasks
155 logging.warning("Number of primaries per task: %d", primaries_per_task)
156 for i in range(len(all_input_lines)):
157 # replace first found card START
158 if all_input_lines[i].lstrip().startswith('START'):
159 logging.warning("Replacing START card with new value")
160 card = Card(tag="START")
161 card.setWhat(1, str(primaries_per_task))
162 start_card = str(card)
163 all_input_lines[i] = start_card
164 break
165 files_dict[input_file] = '\n'.join(all_input_lines)
166 return files_dict, parsed_number_of_all_primaries
169def files_dict_with_adjusted_primaries(payload_dict: dict, ntasks: int = None) -> tuple[dict, int]:
170 """
171 Replaces number of primaries in `payload_dict`
172 if `ntasks` parameter is provided, it is used over one
173 provided in `payload_dict`
174 returns dict with input files and full number of requested primaries
175 """
176 json_type = get_json_type(payload_dict)
177 if json_type == JSON_TYPE.Editor:
178 new_payload_dict = copy.deepcopy(payload_dict)
179 new_payload_dict["input_json"], number_of_all_primaries = adjust_primaries_in_editor_dict(
180 payload_editor_dict=payload_dict, ntasks=ntasks)
181 return check_and_convert_payload_to_files_dict(new_payload_dict), number_of_all_primaries
182 if json_type == JSON_TYPE.Files:
183 files_dict, number_of_all_primaries = adjust_primaries_in_files_dict(payload_files_dict=payload_dict,
184 ntasks=ntasks)
185 return files_dict, number_of_all_primaries
186 return {}, 0
189def write_simulation_input_files(files_dict: dict, output_dir: Path) -> None:
190 """Save files from provided dict (filenames as keys and content as values) into the provided directory"""
191 for filename, file_contents in files_dict.items():
192 with open(output_dir / filename, "w", newline='\n') as writer: # skipcq: PTC-W6004
193 writer.write(file_contents)
196def simulation_logfiles(path: Path) -> dict:
197 """Function returning simulation logfile"""
198 result = {}
199 for log in path.glob("run_*/shieldhit_*log"):
200 try:
201 with open(log, "r") as reader: # skipcq: PTC-W6004
202 result[log.name] = reader.read()
203 except FileNotFoundError:
204 result[log.name] = "No file"
205 return result
208def simulation_input_files(path: Path) -> dict:
209 """Function returning a dictionary with simulation input filenames as keys and their content as values"""
210 result = {}
211 try:
212 for filename in ["info.json", "geo.dat", "detect.dat", "beam.dat", "mat.dat"]:
213 file = path / filename
214 with open(file, "r") as reader:
215 result[filename] = reader.read()
216 except FileNotFoundError:
217 result["info"] = "No input present"
218 return result