-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathauthentication.py
More file actions
87 lines (71 loc) · 3.06 KB
/
authentication.py
File metadata and controls
87 lines (71 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import binascii
import os
from typing import Optional
import bcrypt
import logs
from fastapi import HTTPException
from unidecode import unidecode
from config import new_database_url
from user_management.dao import UsersDAO
logger = logs.Log("authentication", "authentication.log").get_logger()
class Authentication:
def __init__(self):
self.dao = UsersDAO()
def register(self, username: str, password: str, display_name: str) -> str:
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
hashed_password_str = hashed_password.decode("utf-8")
session_token = binascii.hexlify(os.urandom(24)).decode()
try:
self.dao.add_user(
username, hashed_password_str, session_token, display_name
)
return session_token
except Exception as e:
raise HTTPException(
status_code=400, detail="Email already exists or other database error."
) from e
def delete_user(self, username: str) -> None:
self.dao.delete_user_by_username(username)
def force_login(self, username: str, regenerate_token: bool) -> str:
# session_token = self.dao.get_user(username).session_token
user = self.dao.get_user(username)
if user is None:
logger.error(f"User not found: {username}")
return None
session_token = user.session_token
if regenerate_token or not session_token:
session_token = binascii.hexlify(os.urandom(24)).decode()
self.dao.update_session_token(username, session_token)
logger.debug(f"User: {username} - Session token: {session_token}")
return session_token
def login(self, username: str, password: str) -> Optional[str]:
stored_password = self.dao.get_password_by_username(username)
if stored_password and bcrypt.checkpw(
password.encode("utf-8"), stored_password.encode("utf-8")
):
return self.force_login(username, regenerate_token=True)
return None
def convert_name(self, name: str) -> str:
name = unidecode(name).replace(" ", "_").lower()
return name
def google_login(self, id_info: dict) -> str:
display_name, google_id, username = (
id_info["name"],
id_info["sub"],
id_info["email"],
)
session_token = binascii.hexlify(os.urandom(24)).decode()
hashed_password = bcrypt.hashpw(
session_token.encode("utf-8"), bcrypt.gensalt()
).decode("utf-8")
self.dao.add_or_update_google_user(
google_id, username, hashed_password, session_token, display_name
)
logger.debug(f"User: {username} - Session token: {session_token}")
return session_token
def logout(self, username: str) -> bool:
if self.dao.validate_and_clear_session_token(username):
return True
return False
def check_token(self, username: str, session_token: str) -> bool:
return self.dao.check_session_token(username, session_token)