-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathusers.py
185 lines (158 loc) · 6.46 KB
/
users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Adds routes for user management to FastAPI."""
import logging as log
from fastapi import APIRouter
from fastapi.exceptions import HTTPException
from passlib.hash import bcrypt
from src.api_types import (
LoginBody,
LoginResponse,
SignupBody,
SignupResponse,
UserIDResponse,
UserResponse,
UsersResponse,
UserBody,
AuthType,
)
from src.auth import generate_auth_token
from src.db import Database
from ..auth import requires_authentication
from fastapi.requests import Request
router = APIRouter()
@router.post("/users/signup", response_model=SignupResponse | None)
def signup(body: SignupBody):
with Database() as db:
hashed_password = bcrypt.hash(body.password)
query = """INSERT INTO users(username, email, pword, admin) VALUES (%s, %s, %s, false);"""
try:
db.execute(query, [body.username, body.email, hashed_password])
return SignupResponse(error="")
except Exception as e:
log.error(e)
return SignupResponse(error="Server error.")
@router.post("/users/login", response_model=LoginResponse | None)
def login(body: LoginBody):
with Database() as db:
try:
email = body.email
password = body.password
query = """SELECT users.pword, users.admin, users.id FROM users WHERE users.email = %s;"""
entry_sql = db.execute_return(query, [email])
# Returns "incorrect email/password" message if there is no such account.
if entry_sql is None or len(entry_sql) == 0:
return LoginResponse(
token="", user_id=0, error="Invalid Email or Password"
)
# Grabs the stored hash and admin status.
password_hash, admin, user_id = entry_sql[0]
# Returns "incorrect email/password" message if password is incorrect.
if not bcrypt.verify(password, password_hash):
return LoginResponse(
token="", user_id=0, error="Invalid Email or Password"
)
# Generates the token and returns
token = generate_auth_token(email, admin)
log.warn(
f"Giving token: {token}",
)
return LoginResponse(token=token, user_id=user_id, error="")
except Exception as e:
log.error(e)
# TODO: Return something better than query error
return LoginResponse(token="", user_id=0, error="Server error.")
# THIS METHOD IS NOT WORKING YET and email is treated as username
@router.post("/users/admin/signup", response_model=None)
def admin_signup(body: LoginBody):
# currently raise exception, since we don't have a frontend and we don't want people to create admin accounts this easily
raise HTTPException(404, "signup not secure so not executing")
with Database() as db:
hashed_password = bcrypt.hash(body.password)
query = """INSERT INTO users(username, email, pword, admin) VALUES (%s, %s, %s, %s);"""
try:
db.execute(query, [body.email, body.email, hashed_password, True])
except Exception:
raise HTTPException(501, "signup insert failed")
@router.get("/users/", response_model=UsersResponse)
def get_users(req: Request):
requires_authentication(AuthType.ADMIN, req)
with Database() as db:
query = """SELECT id, username, email, admin FROM users;"""
users_list = db.execute_return(query)
if users_list is not None:
users_list.sort(key=lambda user: user[0])
return UsersResponse(
users=[
UserResponse(
id=user[0], username=user[1], email=user[2], admin=user[3]
)
for user in users_list
]
)
else:
return UsersResponse(users=[])
@router.get("/user/id/{username}", response_model=UserIDResponse)
def get_user_id(username: str):
with Database() as db:
query = """SELECT id FROM users WHERE username = %s;"""
user_id = db.execute_return(query, [username])
if user_id is not None:
return UserIDResponse(id=user_id[0][0])
else:
return UserIDResponse(id=-1)
@router.get("/user/{user_id}", response_model=UserResponse)
def get_user(user_id: int):
with Database() as db:
query = """SELECT id, username, email, admin FROM users WHERE id = %s;"""
user = db.execute_return(query, [user_id])
if user is not None:
return UserResponse(
id=user[0][0], username=user[0][1], email=user[0][2], admin=user[0][3]
)
else:
return UserResponse(id=user_id, username="", email="", admin=False)
@router.get("/user/{user_id}/proteins", response_model=list[str] | None)
def get_user_proteins(user_id: int):
with Database() as db:
try:
query = """SELECT proteins.name from proteins JOIN requests ON proteins.id = requests.protein_id WHERE requests.user_id = %s;"""
proteins = db.execute_return(query, [user_id])
if proteins is not None:
return [protein[0] for protein in proteins]
else:
return None
except Exception as e:
log.error(e)
return None
@router.put("/user/{user_id}", response_model=None)
def edit_user(user_id: int, body: UserBody, req: Request):
requires_authentication(AuthType.ADMIN, req)
with Database() as db:
query = """UPDATE users SET """
arg_list = []
if body.username is not None:
query += """username = %s, """
arg_list.append(body.username)
if body.email is not None:
query += """email = %s, """
arg_list.append(body.email)
if body.admin is not None:
query += """admin = %s, """
arg_list.append(body.admin)
query = query[:-2] + """ WHERE id = %s;"""
try:
db.execute(query, arg_list + [user_id])
except Exception as e:
log.error(e)
return None
return None
@router.delete("/user/{user_id}", response_model=None)
def delete_user(user_id: int, req: Request):
requires_authentication(AuthType.ADMIN, req)
with Database() as db:
query = """DELETE FROM users WHERE id = %s;"""
try:
db.execute(query, [user_id])
except Exception as e:
log.error(e)
return None
return None