Skip to content

keycloak_routes

routes.keycloak_routes

ROOT_DIR module-attribute

ROOT_DIR = resolve()

AuthKeycloak

Bases: Resource

Class responsible for user log in

Source code in yaptide/routes/keycloak_routes.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class AuthKeycloak(Resource):
    """Class responsible for user log in"""

    @staticmethod
    def post():
        """Method returning status of logging in (and token if it was successful)"""
        payload_dict: dict = request.get_json(force=True)
        if not payload_dict:
            return yaptide_response(message="No JSON in body", code=400)

        required_keys = {"username"}
        if required_keys != required_keys.intersection(set(payload_dict.keys())):
            diff = required_keys.difference(set(payload_dict.keys()))
            return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400)

        username = payload_dict["username"]
        logging.debug("Authenticating for user: %s", username)
        keycloak_token: str = request.headers.get('Authorization', '')

        # check if user has access to our service, if not throw an exception here
        check_user_based_on_keycloak_token(keycloak_token.replace('Bearer ', ''), username)

        cert_auth_url = os.environ.get('CERT_AUTH_URL', '')
        res_json: dict = {}
        if cert_auth_url:
            # ask cert auth service for cert and private key
            session = requests.Session()
            res: requests.Response = session.get(cert_auth_url, headers={'Authorization': keycloak_token})
            logging.debug("auth cert service response code: %d", res.status_code)
            if res.status_code == 200:
                res_json: dict = res.json()
            else:
                logging.warning("failed to get SSH certs, inspect CERT_AUTH_URL")
        else:
            logging.info("Skip fetching SSH certs as CERT_AUTH_URL not set")

        # check if user exists in our database, if not create new user
        user = fetch_keycloak_user_by_username(username=username)
        if not user:
            # user not existing, adding user together with cert and private key
            user = KeycloakUserModel(username=username, cert=res_json.get("cert"), private_key=res_json.get("private"))

            add_object_to_db(user)
        else:
            # user existing, updating cert and private key
            user.cert = res_json.get("cert")
            user.private_key = res_json.get("private")
            make_commit_to_db()

        try:
            # prepare our own tokens
            access_token, access_exp = encode_auth_token(user_id=user.id, is_keycloak=True)

            resp = yaptide_response(message='Successfully logged in',
                                    code=202,
                                    content={
                                        'access_exp': int(access_exp.timestamp() * 1000),
                                    })
            resp.set_cookie('access_token', access_token, httponly=True, samesite='Lax', expires=access_exp)
            return resp
        except Exception:  # skipcq: PYL-W0703
            return error_internal_response()

    @staticmethod
    def delete():
        """Method returning status of logging out"""
        resp = yaptide_response(message='User logged out', code=200)
        resp.delete_cookie('access_token')
        return resp

delete staticmethod

delete()

Method returning status of logging out

Source code in yaptide/routes/keycloak_routes.py
134
135
136
137
138
139
@staticmethod
def delete():
    """Method returning status of logging out"""
    resp = yaptide_response(message='User logged out', code=200)
    resp.delete_cookie('access_token')
    return resp

post staticmethod

post()

Method returning status of logging in (and token if it was successful)

Source code in yaptide/routes/keycloak_routes.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@staticmethod
def post():
    """Method returning status of logging in (and token if it was successful)"""
    payload_dict: dict = request.get_json(force=True)
    if not payload_dict:
        return yaptide_response(message="No JSON in body", code=400)

    required_keys = {"username"}
    if required_keys != required_keys.intersection(set(payload_dict.keys())):
        diff = required_keys.difference(set(payload_dict.keys()))
        return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400)

    username = payload_dict["username"]
    logging.debug("Authenticating for user: %s", username)
    keycloak_token: str = request.headers.get('Authorization', '')

    # check if user has access to our service, if not throw an exception here
    check_user_based_on_keycloak_token(keycloak_token.replace('Bearer ', ''), username)

    cert_auth_url = os.environ.get('CERT_AUTH_URL', '')
    res_json: dict = {}
    if cert_auth_url:
        # ask cert auth service for cert and private key
        session = requests.Session()
        res: requests.Response = session.get(cert_auth_url, headers={'Authorization': keycloak_token})
        logging.debug("auth cert service response code: %d", res.status_code)
        if res.status_code == 200:
            res_json: dict = res.json()
        else:
            logging.warning("failed to get SSH certs, inspect CERT_AUTH_URL")
    else:
        logging.info("Skip fetching SSH certs as CERT_AUTH_URL not set")

    # check if user exists in our database, if not create new user
    user = fetch_keycloak_user_by_username(username=username)
    if not user:
        # user not existing, adding user together with cert and private key
        user = KeycloakUserModel(username=username, cert=res_json.get("cert"), private_key=res_json.get("private"))

        add_object_to_db(user)
    else:
        # user existing, updating cert and private key
        user.cert = res_json.get("cert")
        user.private_key = res_json.get("private")
        make_commit_to_db()

    try:
        # prepare our own tokens
        access_token, access_exp = encode_auth_token(user_id=user.id, is_keycloak=True)

        resp = yaptide_response(message='Successfully logged in',
                                code=202,
                                content={
                                    'access_exp': int(access_exp.timestamp() * 1000),
                                })
        resp.set_cookie('access_token', access_token, httponly=True, samesite='Lax', expires=access_exp)
        return resp
    except Exception:  # skipcq: PYL-W0703
        return error_internal_response()

check_user_based_on_keycloak_token

check_user_based_on_keycloak_token(token, username)

Checks if user can access the service, returns True if user has acess

Source code in yaptide/routes/keycloak_routes.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def check_user_based_on_keycloak_token(token: str, username: str) -> bool:
    """Checks if user can access the service, returns True if user has acess"""
    if not token:
        logging.error("No token provided")
        raise Unauthorized(description="No token provided")
    keycloak_base_url = os.environ.get('KEYCLOAK_BASE_URL', '')
    keycloak_realm = os.environ.get('KEYCLOAK_REALM', '')
    if not keycloak_base_url or not keycloak_realm:
        logging.error("Keycloak env variables not set")
        raise Forbidden(description="Service is not available")
    keycloak_full_url = f"{keycloak_base_url}/auth/realms/{keycloak_realm}/protocol/openid-connect/certs"
    try:
        # first lets try to decode token without verifying signature
        unverified_encoded_token = jwt.decode(token, options={"verify_signature": False})
        # check if token gives access to our service
        if "PLG_YAPTIDE_ACCESS" not in unverified_encoded_token.get("plgridAccessServices", []):
            logging.error("User %s has no access to Yaptide service", username)
            raise Forbidden(description=f"User {username} has no access to our service")

        # ask keycloak for public keys
        res = requests.get(keycloak_full_url)
        jwks = res.json()

        # get public key for our token, based on kid
        public_keys = {}
        for jwk in jwks['keys']:
            kid = jwk['kid']
            public_keys[kid] = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(jwk))
        kid = jwt.get_unverified_header(token)['kid']
        key = public_keys[kid]

        # now we can verify signature of the token
        _ = jwt.decode(token,
                       key=key,
                       audience=unverified_encoded_token["aud"],
                       algorithms=['RS256'],
                       options={"verify_signature": True})

        return True

    except jwt.ExpiredSignatureError as e:
        logging.error("Signature expired: %s", e)
        raise Forbidden(description="Signature expired")
    except jwt.InvalidTokenError as e:
        logging.error("Invalid token: %s", e)
        raise Forbidden(description="Invalid token")
    except requests.exceptions.ConnectionError as e:
        logging.error("Unable to connect to keycloak: %s", e)
        raise Forbidden(description="Service is not available")