-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
auth.py
106 lines (91 loc) · 3.47 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import jwt
from fastapi import HTTPException
from passlib.context import CryptContext
from datetime import datetime, timedelta
from config import settings
import base64
import json
from jwt.algorithms import ECAlgorithm
def load_ES256_from_jwk_env():
algo = ECAlgorithm('ES256')
key = settings.ES256_KEY
encode_key = base64.b64decode(key)
json_key = json.loads(encode_key)
ES256_key = algo.from_jwk(json_key.get("keys")[0])
return ES256_key
class Auth():
hasher = CryptContext(schemes=['bcrypt'])
secret = settings.JWT_SECRET_KEY
def encode_password(self, password):
return self.hasher.hash(password)
def verify_password(self, password, encoded_password):
return self.hasher.verify(password, encoded_password)
def encode_token(self, email):
payload = {
'exp': datetime.utcnow() + timedelta(days=0, minutes=30),
'iat': datetime.utcnow(),
'scope': 'access_token',
'sub': email
}
signing_key = load_ES256_from_jwk_env()
return jwt.encode( payload, signing_key, algorithm=settings.JWT_ALGO, headers={"kid": settings.ES256_KID} )
def decode_token(self, token):
try:
pub_key = load_ES256_from_jwk_env().public_key() # use public key to decode
decoded = jwt.decode(
token,
pub_key,
algorithms=settings.JWT_ALGO
)
print(decoded)
return decoded
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Token expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail='Invalid token')
def encode_refresh_token(self, email):
payload = {
'exp': datetime.utcnow() + timedelta(days=0, hours=10),
'iat': datetime.utcnow(),
'scope': 'refresh_token',
'sub': email
}
signing_key = load_ES256_from_jwk_env()
return jwt.encode(
payload,
signing_key,
algorithm=settings.JWT_ALGO,
headers={"kid": settings.ES256_KID}
)
def refresh_token(self, refresh_token):
try:
pub_key = load_ES256_from_jwk_env().public_key() # use public key to decode
payload = jwt.decode(
refresh_token,
pub_key,
algorithms=settings.JWT_ALGO
)
if payload['scope'] == 'refresh_token':
email = payload['sub']
new_token = self.encode_token(email)
return new_token
raise HTTPException(status_code=401, detail='Invalid scope for token')
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail='Refresh token expired')
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail='Invalid refresh token')
def encode_reset_password_token(self, email):
payload = {
'exp': datetime.utcnow() + timedelta(days=0, hours=10),
'iat': datetime.utcnow(),
'scope': 'reset_password',
'sub': email
}
signing_key = load_ES256_from_jwk_env()
# algo.sign(json.dumps(payload).encode("ascii"), signing_key)
return jwt.encode(
payload,
signing_key,
algorithm=settings.JWT_ALGO,
headers={"kid": settings.ES256_KID}
)