This repository was archived by the owner on Jan 2, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
86 lines (80 loc) · 3.44 KB
/
database.py
File metadata and controls
86 lines (80 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import bcrypt
from functools import wraps
from mysql.connector import errors
from mysql.connector.pooling import MySQLConnectionPool
def reconnect_from_pool(func):
""" Get SQL connection from pool if the previous was closed or broken """
@wraps(func)
def wrapped(*args, **kwargs):
self = args[0]
try:
self.cursor.execute('SELECT 1')
self.cursor.fetchall()
except errors.DatabaseError:
self.cursor.close()
self.connection.close()
self.connection = self.pool.get_connection()
self.cursor = self.connection.cursor()
return func(*args, **kwargs)
return wrapped
class WebpicDatabase:
def __init__(self, **config):
""" Initialize database connection pool
:keyword host_list - List of assumed SQL hosts
:keyword port - SQL connection port
:keyword database - SQL database name
:keyword username - SQL user
:keyword password - SQL user password
:keyword ssl_cert - path to SSL certificate file (if required)
:keyword ssl_key - path to SSL private key file (if required)
:keyword ssl_ca - path to SSL center authority file (if required)
"""
self.POOL_NAME = 'webpic_sql_pool'
self.pool = None
host_list = config.pop('host', ['localhost'])
for host in host_list:
try:
self.pool = MySQLConnectionPool(pool_name=self.POOL_NAME, pool_size=1,
host=host, **config)
print(f'SQL: connected on {host}')
break
except (errors.PoolError, errors.DatabaseError):
print(f'SQL: connection on {host} failed')
if self.pool:
self.connection = self.pool.get_connection()
self.cursor = self.connection.cursor()
@reconnect_from_pool
def get_user(self, name):
""" Get user parameters """
self.cursor.execute('SELECT * FROM user WHERE name = %s', params=(name, ))
response = self.cursor.fetchall()
if not response:
return None
fields = [field[0] for field in self.cursor.description]
result = {k: v.encode() if k == 'pwd' else v for k, v in zip(fields, response[0])}
return result
@reconnect_from_pool
def set_user(self, data: dict):
""" Set user parameters (create/modify user) """
self.cursor.execute('SHOW COLUMNS FROM user')
response = self.cursor.fetchall()
if not response:
return False
unique_fields = [col[0] for col in response if col[3]]
# encrypt password
if pwd := data.get('pwd', None):
salt = bcrypt.gensalt(rounds=5)
data['pwd'] = bcrypt.hashpw(pwd, salt)
# prepare query fillers
field_names = ', '.join(data.keys())
placeholder = ', '.join(['%s'] * len(data.keys()))
update_params = ', '.join([f'{col_name} = %s' for col_name in data.keys() if col_name not in unique_fields])
data_params = list(data.values()) + [val for col, val in data.items() if col not in unique_fields]
# query
try:
self.cursor.execute(f'INSERT INTO user ({field_names}) VALUE ({placeholder}) '
f'ON DUPLICATE KEY UPDATE {update_params}', params=data_params)
except errors.ProgrammingError:
return False
self.connection.commit()
return True