Coverage for yaptide/routes/keycloak_routes.py: 41%

91 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-07-01 12:55 +0000

1import logging 

2import os 

3from pathlib import Path 

4 

5from flask import request 

6from flask_restful import Resource 

7import json 

8import jwt 

9import requests 

10 

11from yaptide.persistence.db_methods import (add_object_to_db, fetch_keycloak_user_by_username, make_commit_to_db) 

12from yaptide.persistence.models import KeycloakUserModel 

13from yaptide.routes.utils.response_templates import (error_internal_response, yaptide_response) 

14from yaptide.routes.utils.tokens import encode_auth_token 

15from werkzeug.exceptions import Forbidden, Unauthorized 

16 

17ROOT_DIR = Path(__file__).parent.resolve() 

18 

19 

20def check_user_based_on_keycloak_token(token: str, username: str) -> bool: 

21 """Checks if user can access the service, returns True if user has acess""" 

22 if not token: 

23 logging.error("No token provided") 

24 raise Unauthorized(description="No token provided") 

25 keycloak_base_url = os.environ.get('KEYCLOAK_BASE_URL', '') 

26 keycloak_realm = os.environ.get('KEYCLOAK_REALM', '') 

27 if not keycloak_base_url or not keycloak_realm: 

28 logging.error("Keycloak env variables not set") 

29 raise Forbidden(description="Service is not available") 

30 keycloak_full_url = f"{keycloak_base_url}/auth/realms/{keycloak_realm}/protocol/openid-connect/certs" 

31 try: 

32 # first lets try to decode token without verifying signature 

33 unverified_encoded_token = jwt.decode(token, options={"verify_signature": False}) 

34 # check if token gives access to our service 

35 if "PLG_YAPTIDE_ACCESS" not in unverified_encoded_token.get("plgridAccessServices", []): 

36 logging.error("User %s has no access to Yaptide service", username) 

37 raise Forbidden(description=f"User {username} has no access to our service") 

38 

39 # ask keycloak for public keys 

40 res = requests.get(keycloak_full_url) 

41 jwks = res.json() 

42 

43 # get public key for our token, based on kid 

44 public_keys = {} 

45 for jwk in jwks['keys']: 

46 kid = jwk['kid'] 

47 public_keys[kid] = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(jwk)) 

48 kid = jwt.get_unverified_header(token)['kid'] 

49 key = public_keys[kid] 

50 

51 # now we can verify signature of the token 

52 _ = jwt.decode(token, 

53 key=key, 

54 audience=unverified_encoded_token["aud"], 

55 algorithms=['RS256'], 

56 options={"verify_signature": True}) 

57 

58 return True 

59 

60 except jwt.ExpiredSignatureError as e: 

61 logging.error("Signature expired: %s", e) 

62 raise Forbidden(description="Signature expired") 

63 except jwt.InvalidTokenError as e: 

64 logging.error("Invalid token: %s", e) 

65 raise Forbidden(description="Invalid token") 

66 except requests.exceptions.ConnectionError as e: 

67 logging.error("Unable to connect to keycloak: %s", e) 

68 raise Forbidden(description="Service is not available") 

69 

70 

71class AuthKeycloak(Resource): 

72 """Class responsible for user log in""" 

73 

74 @staticmethod 

75 def post(): 

76 """Method returning status of logging in (and token if it was successful)""" 

77 payload_dict: dict = request.get_json(force=True) 

78 if not payload_dict: 

79 return yaptide_response(message="No JSON in body", code=400) 

80 

81 required_keys = {"username"} 

82 if required_keys != required_keys.intersection(set(payload_dict.keys())): 

83 diff = required_keys.difference(set(payload_dict.keys())) 

84 return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400) 

85 

86 username = payload_dict["username"] 

87 logging.debug("Authenticating for user: %s", username) 

88 keycloak_token: str = request.headers.get('Authorization', '') 

89 

90 # check if user has access to our service, if not throw an exception here 

91 check_user_based_on_keycloak_token(keycloak_token.replace('Bearer ', ''), username) 

92 

93 cert_auth_url = os.environ.get('CERT_AUTH_URL', '') 

94 res_json: dict = {} 

95 if cert_auth_url: 

96 # ask cert auth service for cert and private key 

97 session = requests.Session() 

98 res: requests.Response = session.get(cert_auth_url, headers={'Authorization': keycloak_token}) 

99 logging.debug("auth cert service response code: %d", res.status_code) 

100 if res.status_code == 200: 

101 res_json: dict = res.json() 

102 else: 

103 logging.warning("failed to get SSH certs, inspect CERT_AUTH_URL") 

104 else: 

105 logging.info("Skip fetching SSH certs as CERT_AUTH_URL not set") 

106 

107 # check if user exists in our database, if not create new user 

108 user = fetch_keycloak_user_by_username(username=username) 

109 if not user: 

110 # user not existing, adding user together with cert and private key 

111 user = KeycloakUserModel(username=username, cert=res_json.get("cert"), private_key=res_json.get("private")) 

112 

113 add_object_to_db(user) 

114 else: 

115 # user existing, updating cert and private key 

116 user.cert = res_json.get("cert") 

117 user.private_key = res_json.get("private") 

118 make_commit_to_db() 

119 

120 try: 

121 # prepare our own tokens 

122 access_token, access_exp = encode_auth_token(user_id=user.id, is_keycloak=True) 

123 

124 resp = yaptide_response(message='Successfully logged in', 

125 code=202, 

126 content={ 

127 'access_exp': int(access_exp.timestamp() * 1000), 

128 }) 

129 resp.set_cookie('access_token', access_token, httponly=True, samesite='Lax', expires=access_exp) 

130 return resp 

131 except Exception: # skipcq: PYL-W0703 

132 return error_internal_response() 

133 

134 @staticmethod 

135 def delete(): 

136 """Method returning status of logging out""" 

137 resp = yaptide_response(message='User logged out', code=200) 

138 resp.delete_cookie('access_token') 

139 return resp