Coverage for yaptide/admin/simulator_storage.py: 19%
268 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-20 14:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-20 14:27 +0000
1import platform
2import shutil
3import tarfile
4import tempfile
5import zipfile
6from base64 import urlsafe_b64encode
7from enum import IntEnum, auto
8from pathlib import Path
10import boto3
11import click
12import cryptography
13import requests
14from botocore.config import Config
15from botocore.exceptions import (ClientError, EndpointConnectionError, NoCredentialsError)
16from cryptography.fernet import Fernet
17from cryptography.hazmat.primitives import hashes
18from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
21class SimulatorType(IntEnum):
22 """Simulator types"""
24 shieldhit = auto()
25 fluka = auto()
26 topas = auto()
29def extract_shieldhit_from_tar_gz(archive_path: Path, unpacking_directory: Path, member_name: str,
30 destination_dir: Path):
31 """Extracts a single file from a tar.gz archive"""
32 with tarfile.open(archive_path, "r:gz") as tar:
33 # print all members
34 for member in tar.getmembers():
35 if Path(member.name).name == member_name and Path(member.name).parent.name == 'bin':
36 click.echo(f"Extracting {member.name}")
37 tar.extract(member, unpacking_directory)
38 # move to installation path
39 local_file_path = unpacking_directory / member.name
40 click.echo(f"Moving {local_file_path} to {destination_dir}")
41 shutil.move(local_file_path, destination_dir / member_name)
44def extract_shieldhit_from_zip(archive_path: Path, unpacking_dir: Path, member_name: str, destination_dir: Path):
45 """Extracts a single file from a zip archive"""
46 with zipfile.ZipFile(archive_path) as zip_handle:
47 # print all members
48 for member in zip_handle.infolist():
49 click.echo(f"Member: {member.filename}")
50 if Path(member.filename).name == member_name:
51 click.echo(f"Extracting {member.filename}")
52 zip_handle.extract(member, unpacking_dir)
53 # move to installation path
54 local_file_path = Path(unpacking_dir) / member.filename
55 destination_file_path = destination_dir / member_name
56 click.echo(f"Moving {local_file_path} to {destination_file_path}")
57 # move file from temporary directory to installation path using shutils
58 if not destination_file_path.exists():
59 shutil.move(local_file_path, destination_file_path)
62def download_shieldhit_demo_version(destination_dir: Path) -> bool:
63 """Download shieldhit demo version from shieldhit.org"""
64 demo_version_url = 'https://shieldhit.org/download/DEMO/shield_hit12a_x86_64_demo_gfortran_v1.1.0.tar.gz'
65 # check if working on Windows
66 if platform.system() == 'Windows':
67 demo_version_url = 'https://shieldhit.org/download/DEMO/shield_hit12a_win64_demo_v1.1.0.zip'
69 # create temporary directory and download
70 # Create a temporary file to store the downloaded binary data
71 with tempfile.TemporaryDirectory() as tmpdir_name:
72 click.echo(f"Downloading from {demo_version_url} to {tmpdir_name}")
73 headers = {'User-Agent': 'Mozilla/5.0 (Windows NT x.y; rv:10.0) Gecko/20100101 Firefox/10.0'}
74 response = requests.get(demo_version_url, headers=headers)
75 temp_file_archive = Path(tmpdir_name) / Path(demo_version_url).name
76 with open(temp_file_archive, 'wb') as file_handle:
77 file_handle.write(response.content)
78 click.echo(f"Saved to {temp_file_archive} with size {temp_file_archive.stat().st_size} bytes")
80 # extract
81 click.echo(f"Extracting {temp_file_archive} to {destination_dir}")
82 destination_dir.mkdir(parents=True, exist_ok=True)
83 if temp_file_archive.suffix == '.gz':
84 extract_shieldhit_from_tar_gz(temp_file_archive,
85 Path(tmpdir_name),
86 'shieldhit',
87 destination_dir=destination_dir)
88 elif temp_file_archive.suffix == '.zip':
89 extract_shieldhit_from_zip(temp_file_archive,
90 Path(tmpdir_name),
91 'shieldhit.exe',
92 destination_dir=destination_dir)
93 return True
96def check_if_s3_connection_is_working(s3_client: boto3.client) -> bool:
97 """Check if connection to S3 is possible"""
98 try:
99 s3_client.list_buckets()
100 except NoCredentialsError as e:
101 click.echo(f"No credentials found. Check your access key and secret key. {e}", err=True)
102 return False
103 except EndpointConnectionError as e:
104 click.echo(f"Could not connect to the specified endpoint. {e}", err=True)
105 return False
106 except ClientError as e:
107 click.echo(f"An error occurred while connecting to S3: {e.response['Error']['Message']}", err=True)
108 return False
109 return True
112def download_shieldhit_from_s3(
113 destination_dir: Path,
114 endpoint: str,
115 access_key: str,
116 secret_key: str,
117 password: str,
118 salt: str,
119 bucket: str,
120 key: str,
121 decrypt: bool = True,
122) -> bool:
123 """Download SHIELD-HIT12A from S3 bucket"""
124 s3_client = boto3.client("s3",
125 aws_access_key_id=access_key,
126 aws_secret_access_key=secret_key,
127 endpoint_url=endpoint)
129 if not validate_connection_data(bucket=bucket, key=key, s3_client=s3_client):
130 return False
132 if not destination_dir.exists():
133 destination_dir.mkdir(parents=True, exist_ok=True)
135 destination_file_path = destination_dir / 'shieldhit'
136 # append '.exe' to file name if working on Windows
137 if platform.system() == 'Windows':
138 destination_file_path = destination_dir / 'shieldhit.exe'
140 download_and_decrypt_status = download_file(key=key,
141 bucket=bucket,
142 s3_client=s3_client,
143 decrypt=decrypt,
144 password=password,
145 salt=salt,
146 destination_file_path=destination_file_path)
148 if not download_and_decrypt_status:
149 return False
151 return True
154def download_shieldhit_from_s3_or_from_website(
155 destination_dir: Path,
156 endpoint: str,
157 access_key: str,
158 secret_key: str,
159 password: str,
160 salt: str,
161 bucket: str,
162 key: str,
163 decrypt: bool = True,
164):
165 """Download SHIELD-HIT12A from S3 bucket, if not available download demo version from shieldhit.org website"""
166 download_ok = download_shieldhit_from_s3(destination_dir=destination_dir,
167 endpoint=endpoint,
168 access_key=access_key,
169 secret_key=secret_key,
170 password=password,
171 salt=salt,
172 bucket=bucket,
173 key=key,
174 decrypt=decrypt)
175 if download_ok:
176 click.echo('SHIELD-HIT12A downloaded from S3')
177 else:
178 click.echo('SHIELD-HIT12A download failed, trying to download demo version from shieldhit.org website')
179 demo_download_ok = download_shieldhit_demo_version(destination_dir=destination_dir)
180 if demo_download_ok:
181 click.echo('SHIELD-HIT12A demo version downloaded from shieldhit.org website')
182 else:
183 click.echo('SHIELD-HIT12A demo version download failed')
186# skipcq: PY-R1000
187def download_topas_from_s3(download_dir: Path, endpoint: str, access_key: str, secret_key: str, bucket: str, key: str,
188 version: str, geant4_bucket: str) -> bool:
189 """Download TOPAS from S3 bucket"""
190 s3_client = boto3.client("s3",
191 aws_access_key_id=access_key,
192 aws_secret_access_key=secret_key,
193 endpoint_url=endpoint)
195 if not validate_connection_data(bucket, key, s3_client):
196 return False
198 # Download TOPAS tar
199 topas_temp_file = tempfile.NamedTemporaryFile()
200 try:
201 response = s3_client.list_object_versions(
202 Bucket=bucket,
203 Prefix=key,
204 )
205 topas_file_downloaded = False
206 for curr_version in response["Versions"]:
207 version_id = curr_version["VersionId"]
209 tags = s3_client.get_object_tagging(
210 Bucket=bucket,
211 Key=key,
212 VersionId=version_id,
213 )
214 for tag in tags["TagSet"]:
215 if tag["Key"] == "version" and tag["Value"] == version:
216 click.echo(f"Downloading {key}, version {version} from {bucket} to {topas_temp_file.name}")
217 s3_client.download_fileobj(Bucket=bucket,
218 Key=key,
219 Fileobj=topas_temp_file,
220 ExtraArgs={"VersionId": version_id})
221 topas_file_downloaded = True
222 if not topas_file_downloaded:
223 click.echo(f"Could not find TOPAS version {version} in bucket {bucket}, file {key}", err=True)
224 return False
226 except ClientError as e:
227 click.echo("Failed to download TOPAS from S3 with error: ", e.response["Error"]["Message"])
228 return False
230 # Download GEANT4 tar files
231 geant4_temp_files = []
233 objects = s3_client.list_objects_v2(Bucket=geant4_bucket)
235 try:
236 for obj in objects['Contents']:
237 key = obj['Key']
238 response = s3_client.list_object_versions(
239 Bucket=geant4_bucket,
240 Prefix=key,
241 )
242 for curr_version in response["Versions"]:
243 version_id = curr_version["VersionId"]
244 tags = s3_client.get_object_tagging(
245 Bucket=geant4_bucket,
246 Key=key,
247 VersionId=version_id,
248 )
249 for tag in tags["TagSet"]:
250 if tag["Key"] == "topas_versions":
251 topas_versions = tag["Value"].split(",")
252 topas_versions = [version.strip() for version in topas_versions]
253 if version in topas_versions:
254 temp_file = tempfile.NamedTemporaryFile()
255 click.echo(f"""Downloading {key} for TOPAS version {version}
256 from {bucket} to {temp_file.name}""")
257 s3_client.download_fileobj(Bucket=geant4_bucket,
258 Key=key,
259 Fileobj=temp_file,
260 ExtraArgs={"VersionId": version_id})
261 geant4_temp_files.append(temp_file)
263 except ClientError as e:
264 click.echo("Failed to download Geant4 data from S3 with error: ", e.response["Error"]["Message"])
265 return False
267 topas_temp_file.seek(0)
268 topas_file_contents = tarfile.TarFile(fileobj=topas_temp_file)
269 click.echo(f"Unpacking {topas_temp_file.name} to {download_dir}")
270 topas_file_contents.extractall(path=download_dir)
271 topas_extracted_path = download_dir / "topas" / "bin" / "topas"
272 topas_extracted_path.chmod(0o700)
273 click.echo(f"Installed TOPAS into {download_dir}")
275 geant4_files_path = download_dir / "geant4_files_path"
276 if not geant4_files_path.exists():
277 try:
278 geant4_files_path.mkdir()
279 except OSError as e:
280 click.echo(f"Could not create directory {geant4_files_path}: {e}", err=True)
281 return False
282 for file in geant4_temp_files:
283 file.seek(0)
284 file_contents = tarfile.TarFile(fileobj=file)
285 click.echo(f"Unpacking {file.name} to {geant4_files_path}")
286 file_contents.extractall(path=geant4_files_path)
287 click.echo(f"Installed Geant4 files into {geant4_files_path}")
288 return True
291def extract_fluka_from_tar_gz(archive_path: Path, unpacking_directory: Path, destination_dir: Path) -> bool:
292 """Extracts a single directory from a tar.gz archive"""
293 with tarfile.open(archive_path, "r:gz") as tar:
294 tar.extractall(path=unpacking_directory)
295 content = list(unpacking_directory.iterdir())
296 if len(content) == 1:
297 shutil.copytree(str(content[0]), str(destination_dir / 'fluka'), dirs_exist_ok=True)
298 return True
299 if len(content) > 1:
300 shutil.copytree(str(unpacking_directory), str(destination_dir / 'fluka'), dirs_exist_ok=True)
301 return True
302 return False
305def download_fluka_from_s3(download_dir: Path, endpoint: str, access_key: str, secret_key: str, bucket: str,
306 password: str, salt: str, key: str) -> bool:
307 """Download (and decrypt) Fluka from S3 bucket"""
308 s3_client = boto3.client("s3",
309 aws_access_key_id=access_key,
310 aws_secret_access_key=secret_key,
311 endpoint_url=endpoint)
313 if not validate_connection_data(bucket, key, s3_client):
314 return False
316 with tempfile.TemporaryDirectory() as tmpdir_name:
317 tmp_dir = Path(tmpdir_name).resolve()
318 tmp_archive = tmp_dir / 'fluka.tgz'
319 tmp_dir_path = tmp_dir / 'fluka'
320 download_and_decrypt_status = download_file(key=key,
321 bucket=bucket,
322 s3_client=s3_client,
323 decrypt=True,
324 password=password,
325 salt=salt,
326 destination_file_path=tmp_archive)
327 if not download_and_decrypt_status:
328 return False
329 download_and_decrypt_status = extract_fluka_from_tar_gz(archive_path=tmp_archive,
330 unpacking_directory=tmp_dir_path,
331 destination_dir=download_dir)
333 return download_and_decrypt_status
336def upload_file_to_s3(bucket: str,
337 file_path: Path,
338 endpoint: str,
339 access_key: str,
340 secret_key: str,
341 encrypt: bool = False,
342 encryption_password: str = '',
343 encryption_salt: str = '') -> bool:
344 """Upload file to S3 bucket"""
345 # Create S3 client with disabled flexible checksums to avoid XAmzContentSHA256Mismatch errors
346 # This is needed for S3-compatible endpoints that don't support aws-chunked encoding
347 s3_client = boto3.client(
348 "s3",
349 aws_access_key_id=access_key,
350 aws_secret_access_key=secret_key,
351 endpoint_url=endpoint,
352 config=Config(
353 request_checksum_calculation='when_required',
354 ),
355 )
356 if not check_if_s3_connection_is_working(s3_client):
357 click.echo("S3 connection failed", err=True)
358 return False
360 # Check if bucket exists and create if not
361 if bucket not in [bucket["Name"] for bucket in s3_client.list_buckets()["Buckets"]]:
362 click.echo(f"Bucket {bucket} does not exist. Creating.")
363 s3_client.create_bucket(Bucket=bucket)
365 # Encrypt file
366 file_contents = file_path.read_bytes()
367 if encrypt:
368 click.echo(f"Encrypting file {file_path}")
369 file_contents = encrypt_file(file_path, encryption_password, encryption_salt)
370 try:
371 # Upload encrypted file to S3 bucket
372 click.echo(f"Uploading file {file_path}")
373 s3_client.put_object(
374 Body=file_contents,
375 Bucket=bucket,
376 Key=file_path.name
377 )
378 return True
379 except ClientError as e:
380 error_message = e.response.get("Error", {}).get("Message", str(e))
381 click.echo(f"Upload failed with error: {error_message}")
382 return False
385def encrypt_file(file_path: Path, password: str, salt: str) -> bytes:
386 """Encrypts a file using Fernet"""
387 encryption_key = derive_key(password, salt)
388 # skipcq: PTC-W6004
389 bytes_from_file = file_path.read_bytes()
390 fernet = Fernet(encryption_key)
391 encrypted = fernet.encrypt(bytes_from_file)
392 return encrypted
395def decrypt_file(file_path: Path, password: str, salt: str) -> bytes:
396 """Decrypts a file using Fernet"""
397 encryption_key = derive_key(password, salt)
398 # skipcq: PTC-W6004
399 bytes_from_file = file_path.read_bytes()
400 fernet = Fernet(encryption_key)
401 try:
402 decrypted = fernet.decrypt(bytes_from_file)
403 except cryptography.fernet.InvalidToken:
404 click.echo("Decryption failed - invalid token (password+salt)", err=True)
405 return b''
406 return decrypted
409def validate_connection_data(bucket: str, key: str, s3_client) -> bool:
410 """Validate S3 connection"""
411 if not check_if_s3_connection_is_working(s3_client):
412 click.echo("S3 connection failed", err=True)
413 return False
415 # Check if bucket name is valid
416 if not bucket:
417 click.echo("Bucket name is empty", err=True)
418 return False
420 # Check if key is valid
421 if not key:
422 click.echo("Key is empty", err=True)
423 return False
425 # Check if bucket exists
426 try:
427 s3_client.head_bucket(Bucket=bucket)
428 except ClientError as e:
429 click.echo(f"Problem accessing bucket named {bucket}: {e}", err=True)
430 return False
432 # Check if key exists
433 try:
434 s3_client.head_object(Bucket=bucket, Key=key)
435 except ClientError as e:
436 click.echo(f"Problem accessing key named {key} in bucket {bucket}: {e}", err=True)
437 return False
439 return True
442def download_file(key: str,
443 bucket: str,
444 s3_client,
445 destination_file_path: Path,
446 decrypt: bool = False,
447 password: str = '',
448 salt: str = ''):
449 """Handle download with encryption"""
450 try:
451 with tempfile.NamedTemporaryFile() as temp_file:
452 click.echo(f"Downloading {key} from {bucket} to {temp_file.name}")
453 s3_client.download_fileobj(Bucket=bucket, Key=key, Fileobj=temp_file)
455 if decrypt:
456 click.echo("Decrypting downloaded file")
457 if not password or not salt:
458 click.echo("Password or salt not set", err=True)
459 return False
460 bytes_from_decrypted_file = decrypt_file(file_path=Path(temp_file.name), password=password, salt=salt)
461 if not bytes_from_decrypted_file:
462 click.echo("Decryption failed", err=True)
463 return False
465 Path(destination_file_path).parent.mkdir(parents=True, exist_ok=True)
466 Path(destination_file_path).write_bytes(bytes_from_decrypted_file)
467 else:
468 click.echo(f"Copying {temp_file.name} to {destination_file_path}")
469 shutil.copy2(temp_file.name, destination_file_path)
470 except ClientError as e:
471 click.echo(f"S3 download failed with client error: {e}", err=True)
472 return False
474 destination_file_path.chmod(0o700)
475 return True
478def derive_key(password: str, salt: str) -> bytes:
479 """Derives a key from the password and salt"""
480 kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt.encode(), iterations=480_000)
481 key = urlsafe_b64encode(kdf.derive(password.encode()))
482 return key