Coverage for yaptide/routes/utils/tokens.py: 78%

41 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-04 00:31 +0000

1from datetime import datetime, timedelta 

2from secrets import token_hex 

3from typing import Union 

4 

5import jwt 

6 

7SECRET_KEY_TOKEN = token_hex(256) 

8SECRET_KEY_TOKEN_REFRESH = token_hex(256) 

9_Refresh_Token_Expiration_Time = 120 # minutes 

10_Access_Token_Expiration_Time = 10 # minutes 

11_Keycloak_Token_Expiration_Time = 30 # minutes 

12_Simulation_Token_Expiration_time = 10080 # minutes 

13 

14 

15def encode_auth_token(user_id: int, 

16 is_refresh: bool = False, 

17 is_keycloak: bool = False) -> tuple[Union[str, Exception], datetime]: # skipcq: FLK-E101 

18 """Function encoding the token""" 

19 if is_refresh: 

20 secret = SECRET_KEY_TOKEN_REFRESH 

21 exp_time_minutes = _Refresh_Token_Expiration_Time 

22 else: 

23 secret = SECRET_KEY_TOKEN 

24 exp_time_minutes = _Keycloak_Token_Expiration_Time if is_keycloak else _Access_Token_Expiration_Time 

25 

26 exp = datetime.utcnow() + timedelta(minutes=exp_time_minutes) 

27 

28 try: 

29 # For a description of the payload fields, take look 

30 # at JSON Web Token RFC https://datatracker.ietf.org/doc/html/rfc7519 

31 payload = { 

32 'exp': exp, # Token Expiration Time 

33 'iat': datetime.utcnow(), # Issued At Time 

34 'sub': str(user_id) # Subject 

35 } 

36 return jwt.encode(payload, secret, algorithm='HS256'), exp 

37 except Exception as e: # skipcq: PYL-W0703 

38 return e, exp 

39 

40 

41def encode_simulation_auth_token(simulation_id: int): 

42 """Function that encodes JWT token for simulation 'update_key'""" 

43 secret = SECRET_KEY_TOKEN 

44 exp = datetime.utcnow() + timedelta(minutes=_Simulation_Token_Expiration_time) 

45 try: 

46 payload = { 

47 'exp': exp, # Token Expiration Time 

48 'iat': datetime.utcnow(), # Issued At Time 

49 'simulation_id': str(simulation_id) # Subject 

50 } 

51 return jwt.encode(payload, secret, algorithm='HS256') 

52 except Exception as e: # skipcq: PYL-W0703 

53 return e, exp 

54 

55 

56def decode_auth_token(token: str, is_refresh: bool = False, payload_key_to_return="sub") -> Union[int, str]: 

57 """Function decoding the token""" 

58 if is_refresh: 

59 secret = SECRET_KEY_TOKEN_REFRESH 

60 else: 

61 secret = SECRET_KEY_TOKEN 

62 

63 try: 

64 payload = jwt.decode(token, secret, algorithms=['HS256']) 

65 return int(payload[payload_key_to_return]) 

66 except jwt.ExpiredSignatureError: 

67 return 'Signature expired.' 

68 except jwt.InvalidTokenError: 

69 return 'Invalid token.'