forked from danielfennelly/SAAT-API
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
81 lines (68 loc) · 2.33 KB
/
models.py
File metadata and controls
81 lines (68 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import psycopg2
def payload_to_sql_post_rrinterval(payload):
REQUIRED_KEYS = ('mobile_time', 'batch_index', 'value')
validate_payload_keys(payload, REQUIRED_KEYS)
sql_text = (
"INSERT INTO rr_intervals (user_id,mobile_time,batch_index,value) " +
"VALUES (%s, %s, %s, %s)", (payload['user_id'],
payload['mobile_time'],
payload['batch_index'],
payload['value'])
)
return sql_text
def payload_to_sql_post_subjective(payload):
REQUIRED_KEYS = ('mobile_time', 'value')
validate_payload_keys(payload, REQUIRED_KEYS)
# TODO: make sure below SQL complies with spec
sql_text = (
"INSERT INTO subjective (user_id,mobile_time,event_type,value) " +
"VALUES (%s,%s,%s,%s)",
(payload['user_id'],
payload['mobile_time'],
payload['event_type'],
payload['value'])
)
return sql_text
def payload_to_sql_get_rrinterval(payload):
REQUIRED_KEYS = ('start_time', 'end_time')
validate_payload_keys(payload, REQUIRED_KEYS)
# TODO: create spec for this
sql_text = (
"SELECT * FROM rr_intervals " +
"WHERE (user_id = %s AND mobile_time BETWEEN %s and %s)",
(payload['user_id'],
payload['start_time'],
payload['end_time'])
)
return sql_text
def validate_payload_keys(payload, required_keys):
for k in required_keys:
if k not in payload:
raise KeyError(f"missing required key {k} in posted JSON: {payload}")
def connect_saat(config):
PG_HOST = config["PG_HOST"]
PG_DB = config["PG_DB"]
PG_USER = config["PG_USER"]
PG_PASS = config["PG_PASS"]
print(f"Connecting to Postgres database: {PG_USER}@{PG_HOST}/{PG_DB}")
db_conn = psycopg2.connect(user=PG_USER, password=PG_PASS,
host=PG_HOST, dbname=PG_DB)
return db_conn
def run_sql(sql_text, db_conn):
cur = db_conn.cursor()
#print(f"executing SQL: {sql_text}")
try:
cur.execute(*sql_text)
except psycopg2.Error as e:
db_conn.rollback()
print(e.pgerror)
abort(400,"SQL error: " + e.pgerror)
db_conn.commit()
schema_post_rri = {
"type":"object",
"properties": {
"mobile_time":"date-time",
"batch_index":"integer",
"value":"integer"
}
}