Coverage for yaptide/routes/auth_routes.py: 79%

75 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-07-01 12:55 +0000

1import logging 

2 

3from flask import request 

4from flask_restful import Resource 

5from marshmallow import Schema, ValidationError, fields 

6 

7from yaptide.persistence.db_methods import (add_object_to_db, 

8 fetch_yaptide_user_by_username) 

9from yaptide.persistence.models import YaptideUserModel 

10from yaptide.routes.utils.decorators import requires_auth 

11from yaptide.routes.utils.response_templates import ( # skipcq: FLK-E101 

12 error_internal_response, error_validation_response, yaptide_response) 

13from yaptide.routes.utils.tokens import encode_auth_token 

14 

15 

16class AuthRegister(Resource): 

17 """Class responsible for user registration""" 

18 

19 class APIParametersSchema(Schema): 

20 """Class specifies API parameters""" 

21 

22 username = fields.String() 

23 password = fields.String() 

24 

25 @staticmethod 

26 def put(): 

27 """Method returning status of registration""" 

28 try: 

29 json_data: dict = AuthRegister.APIParametersSchema().load(request.get_json(force=True)) 

30 except ValidationError: 

31 return error_validation_response() 

32 

33 user = fetch_yaptide_user_by_username(username=json_data.get('username')) 

34 

35 if not user: 

36 try: 

37 user = YaptideUserModel(username=json_data.get('username')) 

38 user.set_password(json_data.get('password')) 

39 

40 add_object_to_db(user) 

41 

42 return yaptide_response(message='User created', code=201) 

43 

44 except Exception as e: # skipcq: PYL-W0703 

45 logging.error("%s", e) 

46 return error_internal_response() 

47 else: 

48 return yaptide_response(message='User existing', code=403) 

49 

50 

51class AuthLogIn(Resource): 

52 """Class responsible for user log in""" 

53 

54 @staticmethod 

55 def post(): 

56 """Method returning status of logging in (and token if it was successful)""" 

57 payload_dict: dict = request.get_json(force=True) 

58 if not payload_dict: 

59 return yaptide_response(message="No JSON in body", code=400) 

60 

61 required_keys = {"username", "password"} 

62 

63 if required_keys != required_keys.intersection(set(payload_dict.keys())): 

64 diff = required_keys.difference(set(payload_dict.keys())) 

65 return yaptide_response(message=f"Missing keys in JSON payload: {diff}", code=400) 

66 

67 try: 

68 user: YaptideUserModel = fetch_yaptide_user_by_username(username=payload_dict['username']) 

69 if not user: 

70 return yaptide_response(message='Invalid login or password', code=401) 

71 

72 if not user.check_password(password=payload_dict['password']): 

73 return yaptide_response(message='Invalid login or password', code=401) 

74 

75 access_token, access_exp = encode_auth_token(user_id=user.id, is_refresh=False) 

76 refresh_token, refresh_exp = encode_auth_token(user_id=user.id, is_refresh=True) 

77 

78 resp = yaptide_response( 

79 message='Successfully logged in', 

80 code=202, 

81 content={ 

82 'access_exp': int(access_exp.timestamp()*1000), 

83 'refresh_exp': int(refresh_exp.timestamp()*1000), 

84 } 

85 ) 

86 resp.set_cookie('access_token', access_token, httponly=True, samesite='Lax', expires=access_exp) 

87 resp.set_cookie('refresh_token', refresh_token, httponly=True, samesite='Lax', expires=refresh_exp) 

88 return resp 

89 except Exception as e: # skipcq: PYL-W0703 

90 logging.error("%s", e) 

91 return error_internal_response() 

92 

93 

94class AuthRefresh(Resource): 

95 """Class responsible for refreshing user""" 

96 

97 @staticmethod 

98 @requires_auth(is_refresh=True) 

99 def get(user: YaptideUserModel): 

100 """Method refreshing token""" 

101 access_token, access_exp = encode_auth_token(user_id=user.id, is_refresh=False) 

102 resp = yaptide_response( 

103 message='User refreshed', 

104 code=200, 

105 content={'access_exp': int(access_exp.timestamp()*1000)} 

106 ) 

107 resp.set_cookie('access_token', access_token, httponly=True, samesite='Lax', expires=access_exp) 

108 return resp 

109 

110 

111class AuthStatus(Resource): 

112 """Class responsible for returning user status""" 

113 

114 @staticmethod 

115 @requires_auth() 

116 def get(user: YaptideUserModel): 

117 """Method returning user's status""" 

118 return yaptide_response( 

119 message='User status', 

120 code=200, 

121 content={'username': user.username} 

122 ) 

123 

124 

125class AuthLogOut(Resource): 

126 """Class responsible for user log out""" 

127 

128 @staticmethod 

129 def delete(): 

130 """Method logging the user out""" 

131 resp = yaptide_response(message='User logged out', code=200) 

132 resp.delete_cookie('access_token') 

133 resp.delete_cookie('refresh_token') 

134 return resp