Coverage for yaptide/routes/keycloak_routes.py: 41%
91 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-22 07:31 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-22 07:31 +0000
1import logging
2import os
3from pathlib import Path
5from flask import request
6from flask_restful import Resource
7import json
8import jwt
9import requests
11from yaptide.persistence.db_methods import (add_object_to_db, fetch_keycloak_user_by_username, make_commit_to_db)
12from yaptide.persistence.models import KeycloakUserModel
13from yaptide.routes.utils.response_templates import (error_internal_response, yaptide_response)
14from yaptide.routes.utils.tokens import encode_auth_token
15from werkzeug.exceptions import Forbidden, Unauthorized
17ROOT_DIR = Path(__file__).parent.resolve()
20def check_user_based_on_keycloak_token(token: str, username: str) -> bool:
21 """Checks if user can access the service, returns True if user has acess"""
22 if not token:
23 logging.error("No token provided")
24 raise Unauthorized(description="No token provided")
25 keycloak_base_url = os.environ.get('KEYCLOAK_BASE_URL', '')
26 keycloak_realm = os.environ.get('KEYCLOAK_REALM', '')
27 if not keycloak_base_url or not keycloak_realm:
28 logging.error("Keycloak env variables not set")
29 raise Forbidden(description="Service is not available")
30 keycloak_full_url = f"{keycloak_base_url}/auth/realms/{keycloak_realm}/protocol/openid-connect/certs"
31 try:
32 # first lets try to decode token without verifying signature
33 unverified_encoded_token = jwt.decode(token, options={"verify_signature": False})
34 # check if token gives access to our service
35 if "PLG_YAPTIDE_ACCESS" not in unverified_encoded_token.get("plgridAccessServices", []):
36 logging.error("User %s has no access to Yaptide service", username)
37 raise Forbidden(description=f"User {username} has no access to our service")
39 # ask keycloak for public keys
40 res = requests.get(keycloak_full_url)
41 jwks = res.json()
43 # get public key for our token, based on kid
44 public_keys = {}
45 for jwk in jwks['keys']:
46 kid = jwk['kid']
47 public_keys[kid] = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(jwk))
48 kid = jwt.get_unverified_header(token)['kid']
49 key = public_keys[kid]
51 # now we can verify signature of the token
52 _ = jwt.decode(token,
53 key=key,
54 audience=unverified_encoded_token["aud"],
55 algorithms=['RS256'],
56 options={"verify_signature": True})
58 return True
60 except jwt.ExpiredSignatureError as e:
61 logging.error("Signature expired: %s", e)
62 raise Forbidden(description="Signature expired")
63 except jwt.InvalidTokenError as e:
64 logging.error("Invalid token: %s", e)
65 raise Forbidden(description="Invalid token")
66 except requests.exceptions.ConnectionError as e:
67 logging.error("Unable to connect to keycloak: %s", e)
68 raise Forbidden(description="Service is not available")
71class AuthKeycloak(Resource):
72 """Class responsible for user log in"""
74 @staticmethod
75 def post():
76 """Method returning status of logging in (and token if it was successful)"""
77 payload_dict: dict = request.get_json(force=True)
78 if not payload_dict:
79 return yaptide_response(message="No JSON in body", code=400)
81 required_keys = {"username"}
82 if required_keys != required_keys.intersection(set(payload_dict.keys())):
83 diff = required_keys.difference(set(payload_dict.keys()))
84 return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400)
86 username = payload_dict["username"]
87 logging.debug("Authenticating for user: %s", username)
88 keycloak_token: str = request.headers.get('Authorization', '')
90 # check if user has access to our service, if not throw an exception here
91 check_user_based_on_keycloak_token(keycloak_token.replace('Bearer ', ''), username)
93 cert_auth_url = os.environ.get('CERT_AUTH_URL', '')
94 res_json: dict = {}
95 if cert_auth_url:
96 # ask cert auth service for cert and private key
97 session = requests.Session()
98 res: requests.Response = session.get(cert_auth_url, headers={'Authorization': keycloak_token})
99 logging.debug("auth cert service response code: %d", res.status_code)
100 if res.status_code == 200:
101 res_json: dict = res.json()
102 else:
103 logging.warning("failed to get SSH certs, inspect CERT_AUTH_URL")
104 else:
105 logging.info("Skip fetching SSH certs as CERT_AUTH_URL not set")
107 # check if user exists in our database, if not create new user
108 user = fetch_keycloak_user_by_username(username=username)
109 if not user:
110 # user not existing, adding user together with cert and private key
111 user = KeycloakUserModel(username=username, cert=res_json.get("cert"), private_key=res_json.get("private"))
113 add_object_to_db(user)
114 else:
115 # user existing, updating cert and private key
116 user.cert = res_json.get("cert")
117 user.private_key = res_json.get("private")
118 make_commit_to_db()
120 try:
121 # prepare our own tokens
122 access_token, access_exp = encode_auth_token(user_id=user.id, is_keycloak=True)
124 resp = yaptide_response(message='Successfully logged in',
125 code=202,
126 content={
127 'access_exp': int(access_exp.timestamp() * 1000),
128 })
129 resp.set_cookie('access_token', access_token, httponly=True, samesite='Lax', expires=access_exp)
130 return resp
131 except Exception: # skipcq: PYL-W0703
132 return error_internal_response()
134 @staticmethod
135 def delete():
136 """Method returning status of logging out"""
137 resp = yaptide_response(message='User logged out', code=200)
138 resp.delete_cookie('access_token')
139 return resp