-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathsql_util.py
More file actions
161 lines (134 loc) · 5.32 KB
/
sql_util.py
File metadata and controls
161 lines (134 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import json
import pandas as pd
import argparse
import pymysql.cursors
from pymysql.connections import Connection
from typing import List, Dict
class Credentials:
def __init__(self, host: str, database: str, user: str, password: str):
"""Initialize a Credentials instance for access to a MySQL database.
Args:
host (str): Host where the database server is located.
database (str): Database to be accessed.
user (str): Username to log in as.
password (str): Password for the given username.
"""
self.host = host
self.database = database
self.user = user
self.password = password
def from_args(args):
"""Initialize a Credentials instance from args."""
if args.method == "prompt":
print("Connect to a SQL database.")
host = input('Host: ').strip()
database = input('Database: ').strip()
user = input('User: ').strip()
password = input('Password: ').strip()
if args.method == "json":
with open(args.json_path) as f:
data = json.load(f)
host = data['host']
database = data['database']
user = data['user']
password = data['password']
else:
host = args.host
database = args.database
user = args.user
password = args.password
return Credentials(host, database, user, password)
def argparser() -> argparse.ArgumentParser:
"""Return an argparser with arguments for credentials."""
parser = argparse.ArgumentParser()
parser.add_argument('--method', help="How to pass credentials")
parser.add_argument('--host', help="Host name")
parser.add_argument('-d', '--database', help="Name of database")
parser.add_argument('-u', '--user', help="Name of user for login")
parser.add_argument('-p', '--password', help="Password for user")
parser.add_argument('--json_path', help="Json file with credentials")
return parser
def connect(self) -> Connection:
"""Return a connection to a MySQL server with these credentials."""
cnx = pymysql.connect(host=self.host,
database=self.database,
user=self.user,
password=self.password)
print("Connected to %s as %s." % (self.host, self.user))
return cnx
def get_fields(name: str, cursor) -> List[str]:
"""Return the field names for the given table."""
cursor.execute('DESCRIBE %s' % name)
return [row[0] for row in cursor.fetchall()]
def sanitize(val: str) -> str:
"""Sanitize the string for input to a SQL query."""
if val is None:
return 'NULL'
else:
val = val.replace("'", "''")
return val
def row_string(row: Dict[str, str], fields: List[str]) -> str:
"""Return a SQL row string for the given row."""
values = [sanitize(str(row[field])) for field in fields]
values = ["'%s'" % val for val in values]
row_str = "(%s)" % ', '.join(values)
row_str = row_str.replace("'NULL'", "NULL")
return row_str
def single_insert(name: str, row: Dict[str, str], fields: List[str]) -> str:
"""Return a MySQL INSERT statement to insert the row into the table.
Args:
name (str): Name of the table to insert these rows.
row (Dict[str, str]): Row to insert into the table.
fields (List[str]): Fields to insert.
"""
cols = ', '.join(fields)
vals = row_string(row, fields)
return ("INSERT INTO\n"
"\t%s(%s)\n"
"VALUES\n"
"\t%s;") % (name, cols, vals)
def multi_insert(name: str,
table: pd.DataFrame,
fields: List[str] = None) -> str:
"""Return a MySQL INSERT statement to insert the rows of table.
Args:
name (str): Name of the table to insert these rows.
table (pd.Dataframe): Pandas table of rows to be inserted.
fields (List[str], optional): Fields to insert. Defaults to columns.
"""
if fields is None:
fields = list(table.columns)
cols = ', '.join(fields)
rows = list(table.reset_index().iterrows())
vals = [row_string(row[1], fields) for row in rows]
vals = ',\n\t'.join(vals)
return ("INSERT INTO\n"
"\t%s(%s)\n"
"VALUES\n"
"\t%s;") % (name, cols, vals)
def get_table(table: str, credential: Credentials) -> pd.DataFrame:
"""Return a pandas dataframe for the given table.
Args:
table (str): Name of the table.
credential (Credentials): Credentials for the database with table.
Returns:
pd.DataFrame: pandas dataframe representing table.
"""
cnx = credential.connect()
df = pd.read_sql('SELECT * FROM %s' % table, cnx)
cnx.commit()
cnx.close()
return df
def query(path: str, credential: Credentials) -> pd.DataFrame:
"""Return a pandas dataframe for the resulting query at the given path.
Args:
path (str): String file path of SQL query.
credential (Credentials): Credentials for the database to query.
Returns:
pd.DataFrame: pandas dataframe with query result.
"""
cnx = credential.connect()
df = pd.read_sql(open(path).read(), cnx)
cnx.commit()
cnx.close()
return df