-
Notifications
You must be signed in to change notification settings - Fork 1
/
authentication.py
122 lines (107 loc) · 5.28 KB
/
authentication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# This file is responsible for Login, encoding, decoding and returning jwt
import datetime
import time
import jwt
from Schemas import User
import datetime
from database import DBClient, DBName, UserCollection
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
secret = 'c427efa41a05507e7bd9637270ff5a20db6ad20f2dbb0cf8d5bbf5af4f89aea5'
algorithm = 'HS256'
AccessTimeDelta = 600 # Seconds
RefreshTimeDelta = 600 # Seconds
# Generating Access Token
def AccessToken(cred: dict):
try:
user = list(
DBClient[DBName][UserCollection].find({'$and': [
{'$or': [{'username': cred['username']}, {'phoneno': cred['username']}, {'email': cred['username']}]},
{'password': cred['password']}]}))
# print(user[0]['Additional'] == {} or 'Login' not in user[0]['Additional'].keys())
if user[0]['Additional'] == [] or 'Login' not in user[0]['Additional'].keys():
user[0]['Additional']['Login'] = [cred['host']]
DBClient[DBName][UserCollection].update_one({'_id': user[0]['_id']}, {'$set': user[0]})
# print(user[0]['Additional']['Login'])
# print((cred['host'] not in user[0]['Additional']['Login']) and (len(user[0]['Additional']['Login']) < 2))
print(user[0]['Additional'])
if cred['host'] not in user[0]['Additional']['Login'] and len(user[0]['Additional']['Login']) < 2:
user[0]['Additional']['Login'].append(cred['host'])
DBClient[DBName][UserCollection].update_one({'_id': user[0]['_id']}, {'$set': user[0]})
if user and cred['host'] in user[0]['Additional']['Login']:
encode = jwt.encode(
{'username': user[0]['username'], 'iat': datetime.datetime.now(tz=datetime.timezone.utc),
'exp': datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
seconds=AccessTimeDelta)},
key=secret,
algorithm=algorithm)
# print(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=1))
# print(encode)
return {'token': encode}
elif len(user[0]['Additional']['Login']) >= 2:
raise MemoryError("You are login device limit over")
else:
return {"Error": "With given credential user doesn't exists"}
# @wraps(f)
# def decorated(*args, **kwargs):
# print("OK")
# return f
# return decorated
except Exception as e:
return JSONResponse(content={"Error": str(e)}, status_code=400)
# Refresh Access Token
def RefreshToken(data):
try:
d = jwt.decode(data['token'], secret, algorithm)
# print(d)
encode = jwt.encode({'username': d['username'], 'iat': datetime.datetime.now(tz=datetime.timezone.utc),
'exp': datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
seconds=RefreshTimeDelta)},
key=secret, algorithm=algorithm)
return JSONResponse(content={'token': encode}, status_code=200)
except jwt.ExpiredSignatureError as e:
return JSONResponse(content={"Error": str(e)}, status_code=401)
except Exception as e:
return JSONResponse(content={"Error": str(e)}, status_code=400)
# Verify Access Token
def VerifyToken(data):
try:
d = jwt.decode(data['token'], secret, algorithm, verify_exp=data['token'])
return JSONResponse(content=data, status_code=200)
except jwt.ExpiredSignatureError as e:
return JSONResponse(content={"Error": str(e)}, status_code=401)
except Exception as e:
return JSONResponse(content={"Error": str(e)}, status_code=400)
def decodeJWT(token: str) -> dict:
try:
token = token.replace(' ', '')
decoded_token = jwt.decode(token, secret, algorithms=[algorithm])
print(decoded_token["exp"])
print(int(datetime.datetime.now().timestamp()))
return decoded_token if decoded_token["exp"] >= int(datetime.datetime.now().timestamp()) else None
except Exception as e:
return str(e)
# Verifying access token from the Headers
# This class helps to check whether the request is authorized or not using the HTTPBearer
class jwtBearer(HTTPBearer):
def __int__(self, auto_Error: bool = True):
super(jwtBearer, self.__init__(auto_error=auto_Error))
async def __call__(self, request: Request):
print('jwt')
credentials: HTTPAuthorizationCredentials = await super(jwtBearer, self).__call__(request)
if credentials:
if not credentials.scheme == 'Bearer':
raise HTTPException(status_code=403, detail="Invalid or Expered Token")
data = decodeJWT(credentials.credentials)
if data == "Signature has expired":
raise HTTPException(status_code=403, detail="Signature has expired")
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid or Expired Token")
def verify_jwt(self, jwtoken: str):
isTokenValid: bool = False # Default Flag
payload = decodeJWT(jwtoken)
if payload:
isTokenValid = True
return isTokenValid