Coverage for yaptide/utils/sim_utils.py: 83%

132 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-07-01 12:55 +0000

1import copy 

2import json 

3import logging 

4import re 

5import sys 

6from enum import Enum, auto 

7from pathlib import Path 

8 

9from pymchelper.estimator import Estimator 

10from pymchelper.writers.json import JsonWriter 

11from pymchelper.flair.Input import Card 

12 

13# dirty hack needed to properly handle relative imports in the converter submodule 

14sys.path.append("yaptide/converter") 

15from ..converter.converter.api import (get_parser_from_str, run_parser) # skipcq: FLK-E402 

16 

17NSTAT_MATCH = r"NSTAT\s*\d*\s*\d*" 

18 

19 

20def estimators_to_list(estimators_dict: dict, dir_path: Path) -> list[dict]: 

21 """Convert simulation output to JSON dictionary representation (to be consumed by UI)""" 

22 if not estimators_dict: 

23 return {"message": "No estimators"} 

24 

25 # result_dict is a dictionary, which is later converted to json 

26 # to provide readable API response for fronted 

27 # keys in results_dict are estimator names, values are the estimator objects 

28 result_estimators = [] 

29 estimator: Estimator 

30 for estimator_key, estimator in estimators_dict.items(): 

31 filepath = dir_path / estimator_key 

32 writer = JsonWriter(str(filepath), None) 

33 writer.write(estimator) 

34 

35 with open(writer.filename, "r") as json_file: 

36 est_dict = json.load(json_file) 

37 est_dict["name"] = estimator_key 

38 result_estimators.append(est_dict) 

39 

40 return result_estimators 

41 

42 

43class JSON_TYPE(Enum): 

44 """Class defining custom JSON types""" 

45 

46 Editor = auto() 

47 Files = auto() 

48 

49 

50def get_json_type(payload_dict: dict) -> JSON_TYPE: 

51 """Returns type of provided JSON""" 

52 if "input_files" in payload_dict: 

53 return JSON_TYPE.Files 

54 return JSON_TYPE.Editor 

55 

56 

57def convert_editor_dict_to_files_dict(editor_dict: dict, parser_type: str) -> dict: 

58 """ 

59 Convert payload data to dictionary with filenames and contents for Editor type projects 

60 Otherwise return empty dictionary 

61 """ 

62 conv_parser = get_parser_from_str(parser_type) 

63 files_dict = run_parser(parser=conv_parser, input_data=editor_dict) 

64 return files_dict 

65 

66 

67def check_and_convert_payload_to_files_dict(payload_dict: dict) -> dict: 

68 """ 

69 Convert payload data to dictionary with filenames and contents for Editor type projects 

70 Otherwise return empty dictionary 

71 """ 

72 files_dict = {} 

73 json_type = get_json_type(payload_dict) 

74 if json_type == JSON_TYPE.Editor: 

75 files_dict = convert_editor_dict_to_files_dict(editor_dict=payload_dict["input_json"], 

76 parser_type=payload_dict["sim_type"]) 

77 else: 

78 logging.warning("Project of %s used, conversion works only for Editor projects", json_type) 

79 return files_dict 

80 

81 

82def adjust_primaries_in_editor_dict(payload_editor_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

83 """ 

84 Replaces number of primaries in `payload_editor_dict` 

85 if `ntasks` parameter is provided, it is used over one 

86 provided in `payload_editor_dict` 

87 """ 

88 if ntasks is None: 

89 ntasks = payload_editor_dict['ntasks'] 

90 else: 

91 logging.warning("ntasks value was specified as %d and will be overwritten", ntasks) 

92 

93 editor_dict = copy.deepcopy(payload_editor_dict['input_json']) 

94 number_of_all_primaries = editor_dict['beam']['numberOfParticles'] 

95 editor_dict['beam']['numberOfParticles'] //= ntasks 

96 return editor_dict, number_of_all_primaries 

97 

98 

99def adjust_primaries_in_files_dict(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

100 """ 

101 Replaces number of primaries in `payload_files_dict` 

102 if `ntasks` parameter is provided, it is used over one 

103 provided in `payload_files_dict` 

104 """ 

105 if ntasks is None: 

106 ntasks = payload_files_dict['ntasks'] 

107 else: 

108 logging.warning("ntasks value was specified as %d and will be overwritten", ntasks) 

109 

110 input_files = payload_files_dict['input_files'] 

111 # determining input file type 

112 # should be done in more robust way which will require a lot of refactoring to pass sim_type 

113 if 'beam.dat' in input_files: 

114 return adjust_primaries_for_shieldhit_files(payload_files_dict=payload_files_dict, ntasks=ntasks) 

115 if next((file for file in input_files if file.endswith(".inp")), None): 

116 return adjust_primaries_for_fluka_files(payload_files_dict=payload_files_dict, ntasks=ntasks) 

117 return {}, 0 

118 

119 

120def adjust_primaries_for_shieldhit_files(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

121 """Adjusts number of primaries in beam.dat file for SHIELD-HIT12A""" 

122 files_dict = copy.deepcopy(payload_files_dict['input_files']) 

123 all_beam_lines: list[str] = files_dict['beam.dat'].split('\n') 

124 all_beam_lines_with_nstat = [line for line in all_beam_lines if line.lstrip().startswith('NSTAT')] 

125 beam_lines_count = len(all_beam_lines_with_nstat) 

126 if beam_lines_count != 1: 

127 logging.warning("Found unexpected number of lines with NSTAT keyword: %d", beam_lines_count) 

128 if beam_lines_count < 1: 

129 return files_dict, 0 

130 number_of_all_primaries: str = all_beam_lines_with_nstat[0].split()[1] 

131 primaries_per_task = str(int(number_of_all_primaries) // ntasks) 

132 for i in range(len(all_beam_lines)): 

133 if re.search(NSTAT_MATCH, all_beam_lines[i]): 

134 # line below replaces first found nstat value 

135 # it is important to specify 3rd argument as 1 

136 # because otherwise values further in line might be changed to 

137 all_beam_lines[i] = all_beam_lines[i].replace(number_of_all_primaries, primaries_per_task, 1) 

138 files_dict['beam.dat'] = '\n'.join(all_beam_lines) 

139 # number_of_tasks = payload_files_dict['ntasks'] -> to be implemented in UI 

140 # here we manipulate the files_dict['beam.dat'] file to adjust number of primaries 

141 # we manipulate content of the file, no need to write the file to disk 

142 return files_dict, int(number_of_all_primaries) 

143 

144 

145def adjust_primaries_for_fluka_files(payload_files_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

146 """Adjusts number of primaries in *.inp file for FLUKA""" 

147 files_dict = copy.deepcopy(payload_files_dict['input_files']) 

148 input_file = next((file for file in files_dict if file.endswith(".inp")), None) 

149 if not input_file: 

150 return {}, 0 

151 

152 # read number of primaries from fluka file 

153 all_input_lines: list[str] = files_dict[input_file].split('\n') 

154 # get value from START card 

155 start_card = next((line for line in all_input_lines if line.lstrip().startswith('START')), None) 

156 number_of_all_primaries = start_card.split()[1] 

157 parsed_number_of_all_primaries = int(float(number_of_all_primaries)) 

158 primaries_per_task = parsed_number_of_all_primaries // ntasks 

159 logging.warning("Number of primaries per task: %d", primaries_per_task) 

160 for i in range(len(all_input_lines)): 

161 # replace first found card START 

162 if all_input_lines[i].lstrip().startswith('START'): 

163 logging.warning("Replacing START card with new value") 

164 card = Card(tag="START") 

165 card.setWhat(1, str(primaries_per_task)) 

166 start_card = str(card) 

167 all_input_lines[i] = start_card 

168 break 

169 files_dict[input_file] = '\n'.join(all_input_lines) 

170 return files_dict, parsed_number_of_all_primaries 

171 

172 

173def files_dict_with_adjusted_primaries(payload_dict: dict, ntasks: int = None) -> tuple[dict, int]: 

174 """ 

175 Replaces number of primaries in `payload_dict` 

176 if `ntasks` parameter is provided, it is used over one 

177 provided in `payload_dict` 

178 returns dict with input files and full number of requested primaries 

179 """ 

180 json_type = get_json_type(payload_dict) 

181 if json_type == JSON_TYPE.Editor: 

182 new_payload_dict = copy.deepcopy(payload_dict) 

183 new_payload_dict["input_json"], number_of_all_primaries = adjust_primaries_in_editor_dict( 

184 payload_editor_dict=payload_dict, ntasks=ntasks) 

185 return check_and_convert_payload_to_files_dict(new_payload_dict), number_of_all_primaries 

186 if json_type == JSON_TYPE.Files: 

187 files_dict, number_of_all_primaries = adjust_primaries_in_files_dict(payload_files_dict=payload_dict, 

188 ntasks=ntasks) 

189 return files_dict, number_of_all_primaries 

190 return {}, 0 

191 

192 

193def write_simulation_input_files(files_dict: dict, output_dir: Path) -> None: 

194 """Save files from provided dict (filenames as keys and content as values) into the provided directory""" 

195 for filename, file_contents in files_dict.items(): 

196 with open(output_dir / filename, "w", newline='\n') as writer: # skipcq: PTC-W6004 

197 writer.write(file_contents) 

198 

199 

200def simulation_logfiles(path: Path) -> dict: 

201 """Function returning simulation logfile""" 

202 result = {} 

203 for log in path.glob("run_*/shieldhit_*log"): 

204 try: 

205 with open(log, "r") as reader: # skipcq: PTC-W6004 

206 result[log.name] = reader.read() 

207 except FileNotFoundError: 

208 result[log.name] = "No file" 

209 return result 

210 

211 

212def simulation_input_files(path: Path) -> dict: 

213 """Function returning a dictionary with simulation input filenames as keys and their content as values""" 

214 result = {} 

215 try: 

216 for filename in ["info.json", "geo.dat", "detect.dat", "beam.dat", "mat.dat"]: 

217 file = path / filename 

218 with open(file, "r") as reader: 

219 result[filename] = reader.read() 

220 except FileNotFoundError: 

221 result["info"] = "No input present" 

222 return result