-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth_routes.py
114 lines (70 loc) · 3.11 KB
/
auth_routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from fastapi import APIRouter, status, Depends
#from fastapi.responses import JSONResponse
from database import engine, Session
from schemas import SignUpModel, LoginModel
from models import User
from fastapi.exceptions import HTTPException
from werkzeug.security import generate_password_hash, check_password_hash
from fastapi_jwt_auth import AuthJWT
from fastapi.encoders import jsonable_encoder
auth_router = APIRouter(
prefix="/auth",
tags=["auth"],
)
session = Session(bind=engine)
@auth_router.get("/")
async def hello( Authorize:AuthJWT=Depends()):
try:
Authorize.jwt_required()
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Token")
return {"message": "Hello World"}
#pydantic is a tool that helps us validate the data that we pass it to an api
@auth_router.post("/signup", status_code=status.HTTP_201_CREATED)
async def signup(user: SignUpModel): #user of type SignUpModel
db_email = session.query(User).filter(User.email == user.email).first()
if db_email is not None:
return HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User with this Email already exists")
db_username = session.query(User).filter(User.username == user.username).first()
if db_username is not None:
return HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User with this Username already exists")
new_user=User( #new_user is an instance of User
username=user.username,
email=user.email,
password =generate_password_hash(user.password), #we are going to hash password with werkzeug
is_staff=user.is_staff,
is_active=user.is_active,
)
session.add(new_user)
session.commit()
return new_user
#login route
@auth_router.post("/login", status_code=status.HTTP_200_OK)
async def login(user: LoginModel, Authorize:AuthJWT = Depends()):
db_user=session.query(User).filter(User.username==user.username).first()
#if user exists, then provide jwt tokens
if db_user and check_password_hash(db_user.password, user.password):
access_token = Authorize.create_access_token(subject=db_user.username)
refresh_token = Authorize.create_refresh_token(subject=db_user.username)
response={
"access": access_token,
"refresh": refresh_token
}
return jsonable_encoder(response)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid Username or Password")
#refreshing tokens
@auth_router.get("/refresh")
async def refresh_token(Authorize:AuthJWT=Depends()):
"""
## Create a fresh token
This creates a fresh token. It requires an refresh token.
"""
try:
Authorize.jwt_refresh_token_required()
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Please provide a valid refresh token"
)
current_user=Authorize.get_jwt_subject()
access_token=Authorize.create_access_token(subject=current_user)
return jsonable_encoder({"access":access_token})