-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata.py
More file actions
157 lines (132 loc) · 4.27 KB
/
data.py
File metadata and controls
157 lines (132 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import asyncio
import json
import os
from os import getenv
from dotenv import load_dotenv
from random import sample
import boto3
import decimal
from constants import DATA_PATH, MAX_LECTURER_LEVEL
data = {}
_data_files = {}
load_dotenv()
boto_kwargs = {
"aws_access_key_id": getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": getenv("AWS_SECRET_ACCESS_KEY"),
"region_name": getenv("AWS_REGION")
}
async def saveloop():
while True:
save_data()
await asyncio.sleep(10)
def replace_decimals(obj):
if isinstance(obj, list):
for i in range(len(obj)):
obj[i] = replace_decimals(obj[i])
return obj
elif isinstance(obj, dict):
for k in obj:
obj[k] = replace_decimals(obj[k])
return obj
elif isinstance(obj, decimal.Decimal):
if obj % 1 == 0:
return int(obj)
else:
return float(obj)
else:
return obj
def replace_floats(obj):
if isinstance(obj, list):
for i in range(len(obj)):
obj[i] = replace_floats(obj[i])
return obj
elif isinstance(obj, dict):
for k in obj:
obj[k] = replace_floats(obj[k])
return obj
elif isinstance(obj, float):
return decimal.Decimal(obj)
else:
return obj
def load_data(bot):
global data
# checks if data file exist, if not, writes an empty dict to it
# if os.path.exists(DATA_PATH+"data.json"):
# with open(DATA_PATH+"data.json", "r") as json_file:
# data = json.load(json_file)
# else:
# data = {}
#
dynamodb = boto3.Session(**boto_kwargs).resource("dynamodb")
table = dynamodb.Table('data')
try:
response = table.get_item(Key={'id': "data"})
except ClientError as e:
print(e.response['Error']['Message'])
data = {}
else:
data = replace_decimals(response["Item"]["data"])
# print(data)
bot.loop.create_task(saveloop())
def add_data(*keys):
"""Add or insert a data entry."""
keys, value = keys[:-1], keys[-1]
current_dict = data
for key in keys[:-1]:
key = str(key)
if not key in current_dict:
current_dict[key] = {}
current_dict = current_dict[key]
key = str(keys[-1])
if isinstance(current_dict, dict):
current_dict[key] = value
save_data()
def get_data(*keys, default_val=None):
"""Get a data entry."""
current_dict = data
for key in keys[:-1]:
key = str(key)
if not key in current_dict:
return default_val
current_dict = current_dict[key]
key = str(keys[-1])
if isinstance(current_dict, dict):
return current_dict.get(key, default_val)
else:
return default_val
def save_data():
# with open(DATA_PATH+'data.json', 'w') as outfile:
# json.dump(data, outfile, sort_keys=True, indent=4)
dynamodb = boto3.Session(**boto_kwargs).resource("dynamodb")
table = dynamodb.Table('data')
response = table.put_item(Item={"id":"data",
"data": replace_floats(data)})
def _get_from_filename(filename, default = None, cached=True):
"""Starts with an underscore to signify this should only be used in this file."""
if not filename in _data_files or not cached:
dynamodb = boto3.Session(**boto_kwargs).resource("dynamodb")
table = dynamodb.Table('data')
try:
response = table.get_item(Key={'id': filename})
except ClientError as e:
print(e.response['Error']['Message'])
_data_files[filename] = default
return _data_files[filename]
if response.get("Item"):
_data_files[filename] = replace_decimals(response["Item"]["data"])
else:
_data_files[filename] = default
return _data_files[filename]
def get_items():
return _get_from_filename("items", [])
def get_labs():
return _get_from_filename("labs", [], cached=False)
def get_lecturers():
return _get_from_filename("lecturers", [])
def get_main_questions():
return _get_from_filename("main_questions", [], cached=False)
def get_trivia_questions_data():
return _get_from_filename("trivia_questions", [], cached=False)
def get_labs_subset(n):
labs = get_labs()
return sample(labs, min(n, len(labs)))