Coverage for yaptide/admin/simulator_storage.py: 19%

268 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-01-20 14:27 +0000

1import platform 

2import shutil 

3import tarfile 

4import tempfile 

5import zipfile 

6from base64 import urlsafe_b64encode 

7from enum import IntEnum, auto 

8from pathlib import Path 

9 

10import boto3 

11import click 

12import cryptography 

13import requests 

14from botocore.config import Config 

15from botocore.exceptions import (ClientError, EndpointConnectionError, NoCredentialsError) 

16from cryptography.fernet import Fernet 

17from cryptography.hazmat.primitives import hashes 

18from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC 

19 

20 

21class SimulatorType(IntEnum): 

22 """Simulator types""" 

23 

24 shieldhit = auto() 

25 fluka = auto() 

26 topas = auto() 

27 

28 

29def extract_shieldhit_from_tar_gz(archive_path: Path, unpacking_directory: Path, member_name: str, 

30 destination_dir: Path): 

31 """Extracts a single file from a tar.gz archive""" 

32 with tarfile.open(archive_path, "r:gz") as tar: 

33 # print all members 

34 for member in tar.getmembers(): 

35 if Path(member.name).name == member_name and Path(member.name).parent.name == 'bin': 

36 click.echo(f"Extracting {member.name}") 

37 tar.extract(member, unpacking_directory) 

38 # move to installation path 

39 local_file_path = unpacking_directory / member.name 

40 click.echo(f"Moving {local_file_path} to {destination_dir}") 

41 shutil.move(local_file_path, destination_dir / member_name) 

42 

43 

44def extract_shieldhit_from_zip(archive_path: Path, unpacking_dir: Path, member_name: str, destination_dir: Path): 

45 """Extracts a single file from a zip archive""" 

46 with zipfile.ZipFile(archive_path) as zip_handle: 

47 # print all members 

48 for member in zip_handle.infolist(): 

49 click.echo(f"Member: {member.filename}") 

50 if Path(member.filename).name == member_name: 

51 click.echo(f"Extracting {member.filename}") 

52 zip_handle.extract(member, unpacking_dir) 

53 # move to installation path 

54 local_file_path = Path(unpacking_dir) / member.filename 

55 destination_file_path = destination_dir / member_name 

56 click.echo(f"Moving {local_file_path} to {destination_file_path}") 

57 # move file from temporary directory to installation path using shutils 

58 if not destination_file_path.exists(): 

59 shutil.move(local_file_path, destination_file_path) 

60 

61 

62def download_shieldhit_demo_version(destination_dir: Path) -> bool: 

63 """Download shieldhit demo version from shieldhit.org""" 

64 demo_version_url = 'https://shieldhit.org/download/DEMO/shield_hit12a_x86_64_demo_gfortran_v1.1.0.tar.gz' 

65 # check if working on Windows 

66 if platform.system() == 'Windows': 

67 demo_version_url = 'https://shieldhit.org/download/DEMO/shield_hit12a_win64_demo_v1.1.0.zip' 

68 

69 # create temporary directory and download 

70 # Create a temporary file to store the downloaded binary data 

71 with tempfile.TemporaryDirectory() as tmpdir_name: 

72 click.echo(f"Downloading from {demo_version_url} to {tmpdir_name}") 

73 headers = {'User-Agent': 'Mozilla/5.0 (Windows NT x.y; rv:10.0) Gecko/20100101 Firefox/10.0'} 

74 response = requests.get(demo_version_url, headers=headers) 

75 temp_file_archive = Path(tmpdir_name) / Path(demo_version_url).name 

76 with open(temp_file_archive, 'wb') as file_handle: 

77 file_handle.write(response.content) 

78 click.echo(f"Saved to {temp_file_archive} with size {temp_file_archive.stat().st_size} bytes") 

79 

80 # extract 

81 click.echo(f"Extracting {temp_file_archive} to {destination_dir}") 

82 destination_dir.mkdir(parents=True, exist_ok=True) 

83 if temp_file_archive.suffix == '.gz': 

84 extract_shieldhit_from_tar_gz(temp_file_archive, 

85 Path(tmpdir_name), 

86 'shieldhit', 

87 destination_dir=destination_dir) 

88 elif temp_file_archive.suffix == '.zip': 

89 extract_shieldhit_from_zip(temp_file_archive, 

90 Path(tmpdir_name), 

91 'shieldhit.exe', 

92 destination_dir=destination_dir) 

93 return True 

94 

95 

96def check_if_s3_connection_is_working(s3_client: boto3.client) -> bool: 

97 """Check if connection to S3 is possible""" 

98 try: 

99 s3_client.list_buckets() 

100 except NoCredentialsError as e: 

101 click.echo(f"No credentials found. Check your access key and secret key. {e}", err=True) 

102 return False 

103 except EndpointConnectionError as e: 

104 click.echo(f"Could not connect to the specified endpoint. {e}", err=True) 

105 return False 

106 except ClientError as e: 

107 click.echo(f"An error occurred while connecting to S3: {e.response['Error']['Message']}", err=True) 

108 return False 

109 return True 

110 

111 

112def download_shieldhit_from_s3( 

113 destination_dir: Path, 

114 endpoint: str, 

115 access_key: str, 

116 secret_key: str, 

117 password: str, 

118 salt: str, 

119 bucket: str, 

120 key: str, 

121 decrypt: bool = True, 

122) -> bool: 

123 """Download SHIELD-HIT12A from S3 bucket""" 

124 s3_client = boto3.client("s3", 

125 aws_access_key_id=access_key, 

126 aws_secret_access_key=secret_key, 

127 endpoint_url=endpoint) 

128 

129 if not validate_connection_data(bucket=bucket, key=key, s3_client=s3_client): 

130 return False 

131 

132 if not destination_dir.exists(): 

133 destination_dir.mkdir(parents=True, exist_ok=True) 

134 

135 destination_file_path = destination_dir / 'shieldhit' 

136 # append '.exe' to file name if working on Windows 

137 if platform.system() == 'Windows': 

138 destination_file_path = destination_dir / 'shieldhit.exe' 

139 

140 download_and_decrypt_status = download_file(key=key, 

141 bucket=bucket, 

142 s3_client=s3_client, 

143 decrypt=decrypt, 

144 password=password, 

145 salt=salt, 

146 destination_file_path=destination_file_path) 

147 

148 if not download_and_decrypt_status: 

149 return False 

150 

151 return True 

152 

153 

154def download_shieldhit_from_s3_or_from_website( 

155 destination_dir: Path, 

156 endpoint: str, 

157 access_key: str, 

158 secret_key: str, 

159 password: str, 

160 salt: str, 

161 bucket: str, 

162 key: str, 

163 decrypt: bool = True, 

164): 

165 """Download SHIELD-HIT12A from S3 bucket, if not available download demo version from shieldhit.org website""" 

166 download_ok = download_shieldhit_from_s3(destination_dir=destination_dir, 

167 endpoint=endpoint, 

168 access_key=access_key, 

169 secret_key=secret_key, 

170 password=password, 

171 salt=salt, 

172 bucket=bucket, 

173 key=key, 

174 decrypt=decrypt) 

175 if download_ok: 

176 click.echo('SHIELD-HIT12A downloaded from S3') 

177 else: 

178 click.echo('SHIELD-HIT12A download failed, trying to download demo version from shieldhit.org website') 

179 demo_download_ok = download_shieldhit_demo_version(destination_dir=destination_dir) 

180 if demo_download_ok: 

181 click.echo('SHIELD-HIT12A demo version downloaded from shieldhit.org website') 

182 else: 

183 click.echo('SHIELD-HIT12A demo version download failed') 

184 

185 

186# skipcq: PY-R1000 

187def download_topas_from_s3(download_dir: Path, endpoint: str, access_key: str, secret_key: str, bucket: str, key: str, 

188 version: str, geant4_bucket: str) -> bool: 

189 """Download TOPAS from S3 bucket""" 

190 s3_client = boto3.client("s3", 

191 aws_access_key_id=access_key, 

192 aws_secret_access_key=secret_key, 

193 endpoint_url=endpoint) 

194 

195 if not validate_connection_data(bucket, key, s3_client): 

196 return False 

197 

198 # Download TOPAS tar 

199 topas_temp_file = tempfile.NamedTemporaryFile() 

200 try: 

201 response = s3_client.list_object_versions( 

202 Bucket=bucket, 

203 Prefix=key, 

204 ) 

205 topas_file_downloaded = False 

206 for curr_version in response["Versions"]: 

207 version_id = curr_version["VersionId"] 

208 

209 tags = s3_client.get_object_tagging( 

210 Bucket=bucket, 

211 Key=key, 

212 VersionId=version_id, 

213 ) 

214 for tag in tags["TagSet"]: 

215 if tag["Key"] == "version" and tag["Value"] == version: 

216 click.echo(f"Downloading {key}, version {version} from {bucket} to {topas_temp_file.name}") 

217 s3_client.download_fileobj(Bucket=bucket, 

218 Key=key, 

219 Fileobj=topas_temp_file, 

220 ExtraArgs={"VersionId": version_id}) 

221 topas_file_downloaded = True 

222 if not topas_file_downloaded: 

223 click.echo(f"Could not find TOPAS version {version} in bucket {bucket}, file {key}", err=True) 

224 return False 

225 

226 except ClientError as e: 

227 click.echo("Failed to download TOPAS from S3 with error: ", e.response["Error"]["Message"]) 

228 return False 

229 

230 # Download GEANT4 tar files 

231 geant4_temp_files = [] 

232 

233 objects = s3_client.list_objects_v2(Bucket=geant4_bucket) 

234 

235 try: 

236 for obj in objects['Contents']: 

237 key = obj['Key'] 

238 response = s3_client.list_object_versions( 

239 Bucket=geant4_bucket, 

240 Prefix=key, 

241 ) 

242 for curr_version in response["Versions"]: 

243 version_id = curr_version["VersionId"] 

244 tags = s3_client.get_object_tagging( 

245 Bucket=geant4_bucket, 

246 Key=key, 

247 VersionId=version_id, 

248 ) 

249 for tag in tags["TagSet"]: 

250 if tag["Key"] == "topas_versions": 

251 topas_versions = tag["Value"].split(",") 

252 topas_versions = [version.strip() for version in topas_versions] 

253 if version in topas_versions: 

254 temp_file = tempfile.NamedTemporaryFile() 

255 click.echo(f"""Downloading {key} for TOPAS version {version} 

256 from {bucket} to {temp_file.name}""") 

257 s3_client.download_fileobj(Bucket=geant4_bucket, 

258 Key=key, 

259 Fileobj=temp_file, 

260 ExtraArgs={"VersionId": version_id}) 

261 geant4_temp_files.append(temp_file) 

262 

263 except ClientError as e: 

264 click.echo("Failed to download Geant4 data from S3 with error: ", e.response["Error"]["Message"]) 

265 return False 

266 

267 topas_temp_file.seek(0) 

268 topas_file_contents = tarfile.TarFile(fileobj=topas_temp_file) 

269 click.echo(f"Unpacking {topas_temp_file.name} to {download_dir}") 

270 topas_file_contents.extractall(path=download_dir) 

271 topas_extracted_path = download_dir / "topas" / "bin" / "topas" 

272 topas_extracted_path.chmod(0o700) 

273 click.echo(f"Installed TOPAS into {download_dir}") 

274 

275 geant4_files_path = download_dir / "geant4_files_path" 

276 if not geant4_files_path.exists(): 

277 try: 

278 geant4_files_path.mkdir() 

279 except OSError as e: 

280 click.echo(f"Could not create directory {geant4_files_path}: {e}", err=True) 

281 return False 

282 for file in geant4_temp_files: 

283 file.seek(0) 

284 file_contents = tarfile.TarFile(fileobj=file) 

285 click.echo(f"Unpacking {file.name} to {geant4_files_path}") 

286 file_contents.extractall(path=geant4_files_path) 

287 click.echo(f"Installed Geant4 files into {geant4_files_path}") 

288 return True 

289 

290 

291def extract_fluka_from_tar_gz(archive_path: Path, unpacking_directory: Path, destination_dir: Path) -> bool: 

292 """Extracts a single directory from a tar.gz archive""" 

293 with tarfile.open(archive_path, "r:gz") as tar: 

294 tar.extractall(path=unpacking_directory) 

295 content = list(unpacking_directory.iterdir()) 

296 if len(content) == 1: 

297 shutil.copytree(str(content[0]), str(destination_dir / 'fluka'), dirs_exist_ok=True) 

298 return True 

299 if len(content) > 1: 

300 shutil.copytree(str(unpacking_directory), str(destination_dir / 'fluka'), dirs_exist_ok=True) 

301 return True 

302 return False 

303 

304 

305def download_fluka_from_s3(download_dir: Path, endpoint: str, access_key: str, secret_key: str, bucket: str, 

306 password: str, salt: str, key: str) -> bool: 

307 """Download (and decrypt) Fluka from S3 bucket""" 

308 s3_client = boto3.client("s3", 

309 aws_access_key_id=access_key, 

310 aws_secret_access_key=secret_key, 

311 endpoint_url=endpoint) 

312 

313 if not validate_connection_data(bucket, key, s3_client): 

314 return False 

315 

316 with tempfile.TemporaryDirectory() as tmpdir_name: 

317 tmp_dir = Path(tmpdir_name).resolve() 

318 tmp_archive = tmp_dir / 'fluka.tgz' 

319 tmp_dir_path = tmp_dir / 'fluka' 

320 download_and_decrypt_status = download_file(key=key, 

321 bucket=bucket, 

322 s3_client=s3_client, 

323 decrypt=True, 

324 password=password, 

325 salt=salt, 

326 destination_file_path=tmp_archive) 

327 if not download_and_decrypt_status: 

328 return False 

329 download_and_decrypt_status = extract_fluka_from_tar_gz(archive_path=tmp_archive, 

330 unpacking_directory=tmp_dir_path, 

331 destination_dir=download_dir) 

332 

333 return download_and_decrypt_status 

334 

335 

336def upload_file_to_s3(bucket: str, 

337 file_path: Path, 

338 endpoint: str, 

339 access_key: str, 

340 secret_key: str, 

341 encrypt: bool = False, 

342 encryption_password: str = '', 

343 encryption_salt: str = '') -> bool: 

344 """Upload file to S3 bucket""" 

345 # Create S3 client with disabled flexible checksums to avoid XAmzContentSHA256Mismatch errors 

346 # This is needed for S3-compatible endpoints that don't support aws-chunked encoding 

347 s3_client = boto3.client( 

348 "s3", 

349 aws_access_key_id=access_key, 

350 aws_secret_access_key=secret_key, 

351 endpoint_url=endpoint, 

352 config=Config( 

353 request_checksum_calculation='when_required', 

354 ), 

355 ) 

356 if not check_if_s3_connection_is_working(s3_client): 

357 click.echo("S3 connection failed", err=True) 

358 return False 

359 

360 # Check if bucket exists and create if not 

361 if bucket not in [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]]: 

362 click.echo(f"Bucket {bucket} does not exist. Creating.") 

363 s3_client.create_bucket(Bucket=bucket) 

364 

365 # Encrypt file 

366 file_contents = file_path.read_bytes() 

367 if encrypt: 

368 click.echo(f"Encrypting file {file_path}") 

369 file_contents = encrypt_file(file_path, encryption_password, encryption_salt) 

370 try: 

371 # Upload encrypted file to S3 bucket 

372 click.echo(f"Uploading file {file_path}") 

373 s3_client.put_object( 

374 Body=file_contents, 

375 Bucket=bucket, 

376 Key=file_path.name 

377 ) 

378 return True 

379 except ClientError as e: 

380 error_message = e.response.get("Error", {}).get("Message", str(e)) 

381 click.echo(f"Upload failed with error: {error_message}") 

382 return False 

383 

384 

385def encrypt_file(file_path: Path, password: str, salt: str) -> bytes: 

386 """Encrypts a file using Fernet""" 

387 encryption_key = derive_key(password, salt) 

388 # skipcq: PTC-W6004 

389 bytes_from_file = file_path.read_bytes() 

390 fernet = Fernet(encryption_key) 

391 encrypted = fernet.encrypt(bytes_from_file) 

392 return encrypted 

393 

394 

395def decrypt_file(file_path: Path, password: str, salt: str) -> bytes: 

396 """Decrypts a file using Fernet""" 

397 encryption_key = derive_key(password, salt) 

398 # skipcq: PTC-W6004 

399 bytes_from_file = file_path.read_bytes() 

400 fernet = Fernet(encryption_key) 

401 try: 

402 decrypted = fernet.decrypt(bytes_from_file) 

403 except cryptography.fernet.InvalidToken: 

404 click.echo("Decryption failed - invalid token (password+salt)", err=True) 

405 return b'' 

406 return decrypted 

407 

408 

409def validate_connection_data(bucket: str, key: str, s3_client) -> bool: 

410 """Validate S3 connection""" 

411 if not check_if_s3_connection_is_working(s3_client): 

412 click.echo("S3 connection failed", err=True) 

413 return False 

414 

415 # Check if bucket name is valid 

416 if not bucket: 

417 click.echo("Bucket name is empty", err=True) 

418 return False 

419 

420 # Check if key is valid 

421 if not key: 

422 click.echo("Key is empty", err=True) 

423 return False 

424 

425 # Check if bucket exists 

426 try: 

427 s3_client.head_bucket(Bucket=bucket) 

428 except ClientError as e: 

429 click.echo(f"Problem accessing bucket named {bucket}: {e}", err=True) 

430 return False 

431 

432 # Check if key exists 

433 try: 

434 s3_client.head_object(Bucket=bucket, Key=key) 

435 except ClientError as e: 

436 click.echo(f"Problem accessing key named {key} in bucket {bucket}: {e}", err=True) 

437 return False 

438 

439 return True 

440 

441 

442def download_file(key: str, 

443 bucket: str, 

444 s3_client, 

445 destination_file_path: Path, 

446 decrypt: bool = False, 

447 password: str = '', 

448 salt: str = ''): 

449 """Handle download with encryption""" 

450 try: 

451 with tempfile.NamedTemporaryFile() as temp_file: 

452 click.echo(f"Downloading {key} from {bucket} to {temp_file.name}") 

453 s3_client.download_fileobj(Bucket=bucket, Key=key, Fileobj=temp_file) 

454 

455 if decrypt: 

456 click.echo("Decrypting downloaded file") 

457 if not password or not salt: 

458 click.echo("Password or salt not set", err=True) 

459 return False 

460 bytes_from_decrypted_file = decrypt_file(file_path=Path(temp_file.name), password=password, salt=salt) 

461 if not bytes_from_decrypted_file: 

462 click.echo("Decryption failed", err=True) 

463 return False 

464 

465 Path(destination_file_path).parent.mkdir(parents=True, exist_ok=True) 

466 Path(destination_file_path).write_bytes(bytes_from_decrypted_file) 

467 else: 

468 click.echo(f"Copying {temp_file.name} to {destination_file_path}") 

469 shutil.copy2(temp_file.name, destination_file_path) 

470 except ClientError as e: 

471 click.echo(f"S3 download failed with client error: {e}", err=True) 

472 return False 

473 

474 destination_file_path.chmod(0o700) 

475 return True 

476 

477 

478def derive_key(password: str, salt: str) -> bytes: 

479 """Derives a key from the password and salt""" 

480 kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt.encode(), iterations=480_000) 

481 key = urlsafe_b64encode(kdf.derive(password.encode())) 

482 return key