Coverage for yaptide/utils/sim_utils.py: 82%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-22 07:31 +0000

1import copy 

2import json 

3import logging 

4import re 

5from enum import Enum, auto 

6from pathlib import Path 

7 

8from pymchelper.estimator import Estimator 

9from pymchelper.writers.json import JsonWriter 

10from pymchelper.flair.Input import Card 

11from converter.api import (get_parser_from_str, run_parser) 

12 

13NSTAT_MATCH = r"NSTAT\s*\d*\s*\d*" 

14 

15 

16def estimators_to_list(estimators_dict: dict, dir_path: Path) -> list[dict]: 

17 """Convert simulation output to JSON dictionary representation (to be consumed by UI)""" 

18 if not estimators_dict: 

19 return {"message": "No estimators"} 

20 

21 # result_dict is a dictionary, which is later converted to json 

22 # to provide readable API response for fronted 

23 # keys in results_dict are estimator names, values are the estimator objects 

24 result_estimators = [] 

25 estimator: Estimator 

26 for estimator_key, estimator in estimators_dict.items(): 

27 filepath = dir_path / estimator_key 

28 writer = JsonWriter(str(filepath), None) 

29 writer.write(estimator) 

30 

31 with open(writer.filename, "r") as json_file: 

32 est_dict = json.load(json_file) 

33 est_dict["name"] = estimator_key 

34 result_estimators.append(est_dict) 

35 

36 return result_estimators 

37 

38 

39class JSON_TYPE(Enum): 

40 """Class defining custom JSON types""" 

41 

42 Editor = auto() 

43 Files = auto() 

44 

45 

46def get_json_type(payload_dict: dict) -> JSON_TYPE: 

47 """Returns type of provided JSON""" 

48 if "input_files" in payload_dict: 

49 return JSON_TYPE.Files 

50 return JSON_TYPE.Editor 

51 

52 

53def convert_editor_dict_to_files_dict(editor_dict: dict, parser_type: str) -> dict: 

54 """ 

55 Convert payload data to dictionary with filenames and contents for Editor type projects 

56 Otherwise return empty dictionary 

57 """ 

58 conv_parser = get_parser_from_str(parser_type) 

59 files_dict = run_parser(parser=conv_parser, input_data=editor_dict) 

60 return files_dict 

61 

62 

63def check_and_convert_payload_to_files_dict(payload_dict: dict) -> dict: 

64 """ 

65 Convert payload data to dictionary with filenames and contents for Editor type projects 

66 Otherwise return empty dictionary 

67 """ 

68 files_dict = {} 

69 json_type = get_json_type(payload_dict) 

70 if json_type == JSON_TYPE.Editor: 

71 files_dict = convert_editor_dict_to_files_dict(editor_dict=payload_dict["input_json"], 

72 parser_type=payload_dict["sim_type"]) 

73 else: 

74 logging.warning("Project of %s used, conversion works only for Editor projects", json_type) 

75 return files_dict 

76 

77 

78def adjust_primaries_in_editor_dict(payload_editor_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

79 """ 

80 Replaces number of primaries in `payload_editor_dict` 

81 if `ntasks` parameter is provided, it is used over one 

82 provided in `payload_editor_dict` 

83 """ 

84 if ntasks is None: 

85 ntasks = payload_editor_dict['ntasks'] 

86 else: 

87 logging.warning("ntasks value was specified as %d and will be overwritten", ntasks) 

88 

89 editor_dict = copy.deepcopy(payload_editor_dict['input_json']) 

90 number_of_all_primaries = editor_dict['beam']['numberOfParticles'] 

91 editor_dict['beam']['numberOfParticles'] //= ntasks 

92 return editor_dict, number_of_all_primaries 

93 

94 

95def adjust_primaries_in_files_dict(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

96 """ 

97 Replaces number of primaries in `payload_files_dict` 

98 if `ntasks` parameter is provided, it is used over one 

99 provided in `payload_files_dict` 

100 """ 

101 if ntasks is None: 

102 ntasks = payload_files_dict['ntasks'] 

103 else: 

104 logging.warning("ntasks value was specified as %d and will be overwritten", ntasks) 

105 

106 input_files = payload_files_dict['input_files'] 

107 # determining input file type 

108 # should be done in more robust way which will require a lot of refactoring to pass sim_type 

109 if 'beam.dat' in input_files: 

110 return adjust_primaries_for_shieldhit_files(payload_files_dict=payload_files_dict, ntasks=ntasks) 

111 if next((file for file in input_files if file.endswith(".inp")), None): 

112 return adjust_primaries_for_fluka_files(payload_files_dict=payload_files_dict, ntasks=ntasks) 

113 return {}, 0 

114 

115 

116def adjust_primaries_for_shieldhit_files(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

117 """Adjusts number of primaries in beam.dat file for SHIELD-HIT12A""" 

118 files_dict = copy.deepcopy(payload_files_dict['input_files']) 

119 all_beam_lines: list[str] = files_dict['beam.dat'].split('\n') 

120 all_beam_lines_with_nstat = [line for line in all_beam_lines if line.lstrip().startswith('NSTAT')] 

121 beam_lines_count = len(all_beam_lines_with_nstat) 

122 if beam_lines_count != 1: 

123 logging.warning("Found unexpected number of lines with NSTAT keyword: %d", beam_lines_count) 

124 if beam_lines_count < 1: 

125 return files_dict, 0 

126 number_of_all_primaries: str = all_beam_lines_with_nstat[0].split()[1] 

127 primaries_per_task = str(int(number_of_all_primaries) // ntasks) 

128 for i in range(len(all_beam_lines)): 

129 if re.search(NSTAT_MATCH, all_beam_lines[i]): 

130 # line below replaces first found nstat value 

131 # it is important to specify 3rd argument as 1 

132 # because otherwise values further in line might be changed to 

133 all_beam_lines[i] = all_beam_lines[i].replace(number_of_all_primaries, primaries_per_task, 1) 

134 files_dict['beam.dat'] = '\n'.join(all_beam_lines) 

135 # number_of_tasks = payload_files_dict['ntasks'] -> to be implemented in UI 

136 # here we manipulate the files_dict['beam.dat'] file to adjust number of primaries 

137 # we manipulate content of the file, no need to write the file to disk 

138 return files_dict, int(number_of_all_primaries) 

139 

140 

141def adjust_primaries_for_fluka_files(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

142 """Adjusts number of primaries in *.inp file for FLUKA""" 

143 files_dict = copy.deepcopy(payload_files_dict['input_files']) 

144 input_file = next((file for file in files_dict if file.endswith(".inp")), None) 

145 if not input_file: 

146 return {}, 0 

147 

148 # read number of primaries from fluka file 

149 all_input_lines: list[str] = files_dict[input_file].split('\n') 

150 # get value from START card 

151 start_card = next((line for line in all_input_lines if line.lstrip().startswith('START')), None) 

152 number_of_all_primaries = start_card.split()[1] 

153 parsed_number_of_all_primaries = int(float(number_of_all_primaries)) 

154 primaries_per_task = parsed_number_of_all_primaries // ntasks 

155 logging.warning("Number of primaries per task: %d", primaries_per_task) 

156 for i in range(len(all_input_lines)): 

157 # replace first found card START 

158 if all_input_lines[i].lstrip().startswith('START'): 

159 logging.warning("Replacing START card with new value") 

160 card = Card(tag="START") 

161 card.setWhat(1, str(primaries_per_task)) 

162 start_card = str(card) 

163 all_input_lines[i] = start_card 

164 break 

165 files_dict[input_file] = '\n'.join(all_input_lines) 

166 return files_dict, parsed_number_of_all_primaries 

167 

168 

169def files_dict_with_adjusted_primaries(payload_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

170 """ 

171 Replaces number of primaries in `payload_dict` 

172 if `ntasks` parameter is provided, it is used over one 

173 provided in `payload_dict` 

174 returns dict with input files and full number of requested primaries 

175 """ 

176 json_type = get_json_type(payload_dict) 

177 if json_type == JSON_TYPE.Editor: 

178 new_payload_dict = copy.deepcopy(payload_dict) 

179 new_payload_dict["input_json"], number_of_all_primaries = adjust_primaries_in_editor_dict( 

180 payload_editor_dict=payload_dict, ntasks=ntasks) 

181 return check_and_convert_payload_to_files_dict(new_payload_dict), number_of_all_primaries 

182 if json_type == JSON_TYPE.Files: 

183 files_dict, number_of_all_primaries = adjust_primaries_in_files_dict(payload_files_dict=payload_dict, 

184 ntasks=ntasks) 

185 return files_dict, number_of_all_primaries 

186 return {}, 0 

187 

188 

189def write_simulation_input_files(files_dict: dict, output_dir: Path) -> None: 

190 """Save files from provided dict (filenames as keys and content as values) into the provided directory""" 

191 for filename, file_contents in files_dict.items(): 

192 with open(output_dir / filename, "w", newline='\n') as writer: # skipcq: PTC-W6004 

193 writer.write(file_contents) 

194 

195 

196def simulation_logfiles(path: Path) -> dict: 

197 """Function returning simulation logfile""" 

198 result = {} 

199 for log in path.glob("run_*/shieldhit_*log"): 

200 try: 

201 with open(log, "r") as reader: # skipcq: PTC-W6004 

202 result[log.name] = reader.read() 

203 except FileNotFoundError: 

204 result[log.name] = "No file" 

205 return result 

206 

207 

208def simulation_input_files(path: Path) -> dict: 

209 """Function returning a dictionary with simulation input filenames as keys and their content as values""" 

210 result = {} 

211 try: 

212 for filename in ["info.json", "geo.dat", "detect.dat", "beam.dat", "mat.dat"]: 

213 file = path / filename 

214 with open(file, "r") as reader: 

215 result[filename] = reader.read() 

216 except FileNotFoundError: 

217 result["info"] = "No input present" 

218 return result