Coverage for yaptide/utils/sim_utils.py: 83%
132 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-07-01 12:55 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-07-01 12:55 +0000
1import copy
2import json
3import logging
4import re
5import sys
6from enum import Enum, auto
7from pathlib import Path
9from pymchelper.estimator import Estimator
10from pymchelper.writers.json import JsonWriter
11from pymchelper.flair.Input import Card
13# dirty hack needed to properly handle relative imports in the converter submodule
14sys.path.append("yaptide/converter")
15from ..converter.converter.api import (get_parser_from_str, run_parser) # skipcq: FLK-E402
17NSTAT_MATCH = r"NSTAT\s*\d*\s*\d*"
20def estimators_to_list(estimators_dict: dict, dir_path: Path) -> list[dict]:
21 """Convert simulation output to JSON dictionary representation (to be consumed by UI)"""
22 if not estimators_dict:
23 return {"message": "No estimators"}
25 # result_dict is a dictionary, which is later converted to json
26 # to provide readable API response for fronted
27 # keys in results_dict are estimator names, values are the estimator objects
28 result_estimators = []
29 estimator: Estimator
30 for estimator_key, estimator in estimators_dict.items():
31 filepath = dir_path / estimator_key
32 writer = JsonWriter(str(filepath), None)
33 writer.write(estimator)
35 with open(writer.filename, "r") as json_file:
36 est_dict = json.load(json_file)
37 est_dict["name"] = estimator_key
38 result_estimators.append(est_dict)
40 return result_estimators
43class JSON_TYPE(Enum):
44 """Class defining custom JSON types"""
46 Editor = auto()
47 Files = auto()
50def get_json_type(payload_dict: dict) -> JSON_TYPE:
51 """Returns type of provided JSON"""
52 if "input_files" in payload_dict:
53 return JSON_TYPE.Files
54 return JSON_TYPE.Editor
57def convert_editor_dict_to_files_dict(editor_dict: dict, parser_type: str) -> dict:
58 """
59 Convert payload data to dictionary with filenames and contents for Editor type projects
60 Otherwise return empty dictionary
61 """
62 conv_parser = get_parser_from_str(parser_type)
63 files_dict = run_parser(parser=conv_parser, input_data=editor_dict)
64 return files_dict
67def check_and_convert_payload_to_files_dict(payload_dict: dict) -> dict:
68 """
69 Convert payload data to dictionary with filenames and contents for Editor type projects
70 Otherwise return empty dictionary
71 """
72 files_dict = {}
73 json_type = get_json_type(payload_dict)
74 if json_type == JSON_TYPE.Editor:
75 files_dict = convert_editor_dict_to_files_dict(editor_dict=payload_dict["input_json"],
76 parser_type=payload_dict["sim_type"])
77 else:
78 logging.warning("Project of %s used, conversion works only for Editor projects", json_type)
79 return files_dict
82def adjust_primaries_in_editor_dict(payload_editor_dict: dict, ntasks: int = None) -> tuple[dict, int]:
83 """
84 Replaces number of primaries in `payload_editor_dict`
85 if `ntasks` parameter is provided, it is used over one
86 provided in `payload_editor_dict`
87 """
88 if ntasks is None:
89 ntasks = payload_editor_dict['ntasks']
90 else:
91 logging.warning("ntasks value was specified as %d and will be overwritten", ntasks)
93 editor_dict = copy.deepcopy(payload_editor_dict['input_json'])
94 number_of_all_primaries = editor_dict['beam']['numberOfParticles']
95 editor_dict['beam']['numberOfParticles'] //= ntasks
96 return editor_dict, number_of_all_primaries
99def adjust_primaries_in_files_dict(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]:
100 """
101 Replaces number of primaries in `payload_files_dict`
102 if `ntasks` parameter is provided, it is used over one
103 provided in `payload_files_dict`
104 """
105 if ntasks is None:
106 ntasks = payload_files_dict['ntasks']
107 else:
108 logging.warning("ntasks value was specified as %d and will be overwritten", ntasks)
110 input_files = payload_files_dict['input_files']
111 # determining input file type
112 # should be done in more robust way which will require a lot of refactoring to pass sim_type
113 if 'beam.dat' in input_files:
114 return adjust_primaries_for_shieldhit_files(payload_files_dict=payload_files_dict, ntasks=ntasks)
115 if next((file for file in input_files if file.endswith(".inp")), None):
116 return adjust_primaries_for_fluka_files(payload_files_dict=payload_files_dict, ntasks=ntasks)
117 return {}, 0
120def adjust_primaries_for_shieldhit_files(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]:
121 """Adjusts number of primaries in beam.dat file for SHIELD-HIT12A"""
122 files_dict = copy.deepcopy(payload_files_dict['input_files'])
123 all_beam_lines: list[str] = files_dict['beam.dat'].split('\n')
124 all_beam_lines_with_nstat = [line for line in all_beam_lines if line.lstrip().startswith('NSTAT')]
125 beam_lines_count = len(all_beam_lines_with_nstat)
126 if beam_lines_count != 1:
127 logging.warning("Found unexpected number of lines with NSTAT keyword: %d", beam_lines_count)
128 if beam_lines_count < 1:
129 return files_dict, 0
130 number_of_all_primaries: str = all_beam_lines_with_nstat[0].split()[1]
131 primaries_per_task = str(int(number_of_all_primaries) // ntasks)
132 for i in range(len(all_beam_lines)):
133 if re.search(NSTAT_MATCH, all_beam_lines[i]):
134 # line below replaces first found nstat value
135 # it is important to specify 3rd argument as 1
136 # because otherwise values further in line might be changed to
137 all_beam_lines[i] = all_beam_lines[i].replace(number_of_all_primaries, primaries_per_task, 1)
138 files_dict['beam.dat'] = '\n'.join(all_beam_lines)
139 # number_of_tasks = payload_files_dict['ntasks'] -> to be implemented in UI
140 # here we manipulate the files_dict['beam.dat'] file to adjust number of primaries
141 # we manipulate content of the file, no need to write the file to disk
142 return files_dict, int(number_of_all_primaries)
145def adjust_primaries_for_fluka_files(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]:
146 """Adjusts number of primaries in *.inp file for FLUKA"""
147 files_dict = copy.deepcopy(payload_files_dict['input_files'])
148 input_file = next((file for file in files_dict if file.endswith(".inp")), None)
149 if not input_file:
150 return {}, 0
152 # read number of primaries from fluka file
153 all_input_lines: list[str] = files_dict[input_file].split('\n')
154 # get value from START card
155 start_card = next((line for line in all_input_lines if line.lstrip().startswith('START')), None)
156 number_of_all_primaries = start_card.split()[1]
157 parsed_number_of_all_primaries = int(float(number_of_all_primaries))
158 primaries_per_task = parsed_number_of_all_primaries // ntasks
159 logging.warning("Number of primaries per task: %d", primaries_per_task)
160 for i in range(len(all_input_lines)):
161 # replace first found card START
162 if all_input_lines[i].lstrip().startswith('START'):
163 logging.warning("Replacing START card with new value")
164 card = Card(tag="START")
165 card.setWhat(1, str(primaries_per_task))
166 start_card = str(card)
167 all_input_lines[i] = start_card
168 break
169 files_dict[input_file] = '\n'.join(all_input_lines)
170 return files_dict, parsed_number_of_all_primaries
173def files_dict_with_adjusted_primaries(payload_dict: dict, ntasks: int = None) -> tuple[dict, int]:
174 """
175 Replaces number of primaries in `payload_dict`
176 if `ntasks` parameter is provided, it is used over one
177 provided in `payload_dict`
178 returns dict with input files and full number of requested primaries
179 """
180 json_type = get_json_type(payload_dict)
181 if json_type == JSON_TYPE.Editor:
182 new_payload_dict = copy.deepcopy(payload_dict)
183 new_payload_dict["input_json"], number_of_all_primaries = adjust_primaries_in_editor_dict(
184 payload_editor_dict=payload_dict, ntasks=ntasks)
185 return check_and_convert_payload_to_files_dict(new_payload_dict), number_of_all_primaries
186 if json_type == JSON_TYPE.Files:
187 files_dict, number_of_all_primaries = adjust_primaries_in_files_dict(payload_files_dict=payload_dict,
188 ntasks=ntasks)
189 return files_dict, number_of_all_primaries
190 return {}, 0
193def write_simulation_input_files(files_dict: dict, output_dir: Path) -> None:
194 """Save files from provided dict (filenames as keys and content as values) into the provided directory"""
195 for filename, file_contents in files_dict.items():
196 with open(output_dir / filename, "w", newline='\n') as writer: # skipcq: PTC-W6004
197 writer.write(file_contents)
200def simulation_logfiles(path: Path) -> dict:
201 """Function returning simulation logfile"""
202 result = {}
203 for log in path.glob("run_*/shieldhit_*log"):
204 try:
205 with open(log, "r") as reader: # skipcq: PTC-W6004
206 result[log.name] = reader.read()
207 except FileNotFoundError:
208 result[log.name] = "No file"
209 return result
212def simulation_input_files(path: Path) -> dict:
213 """Function returning a dictionary with simulation input filenames as keys and their content as values"""
214 result = {}
215 try:
216 for filename in ["info.json", "geo.dat", "detect.dat", "beam.dat", "mat.dat"]:
217 file = path / filename
218 with open(file, "r") as reader:
219 result[filename] = reader.read()
220 except FileNotFoundError:
221 result["info"] = "No input present"
222 return result