-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
117 lines (91 loc) · 3.33 KB
/
script.py
File metadata and controls
117 lines (91 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import mysql.connector
import requests
import pandas as pd
import time
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_NAME = os.getenv("DB_NAME")
API_KEY = os.getenv("API_KEY")
class CryptoDB:
"""connection setup """
def __init__(self, host, user, password, database):
self.conn = mysql.connector.connect(
host=host, user=user, password=password, database=database
)
self.cursor = self.conn.cursor()
self._create_table()
def _create_table(self):
"""Create table if it does not exist."""
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS crypto_prices (
id INT AUTO_INCREMENT PRIMARY KEY,
symbol VARCHAR(20),
ts DATETIME(3),
price_usd DECIMAL(18,8),
UNIQUE(symbol, ts)
)
""")
self.conn.commit()
def upsert_price(self, symbol, ts, price):
"""Insert or update a price record for a symbol and timestamp."""
self.cursor.execute("""
SELECT id FROM crypto_prices WHERE symbol = %s AND ts = %s
""", (symbol, ts))
row = self.cursor.fetchone()
if row:
self.cursor.execute("""
UPDATE crypto_prices
SET price_usd = %s
WHERE symbol = %s AND ts = %s
""", (price, symbol, ts))
print(f"Updated: {symbol} at {ts} = ${price} (id {row[0]})")
else:
self.cursor.execute("""
INSERT INTO crypto_prices (symbol, ts, price_usd)
VALUES (%s, %s, %s)
""", (symbol, ts, price))
print(f"Inserted: {symbol} at {ts} = ${price}")
self.conn.commit()
def fetch_recent(self, limit=10):
"""Fetch recent rows from DB."""
return pd.read_sql(
f"SELECT * FROM crypto_prices ORDER BY ts DESC LIMIT {limit}",
self.conn
)
def close(self):
"""Close DB connection."""
self.cursor.close()
self.conn.close()
class CryptoPriceLoader:
"""Handles fetching from API and incremental loading."""
def __init__(self, db: CryptoDB, api_key: str):
self.db = db
self.headers = {"x-cg-demo-api-key": api_key}
def fetch_price(self, symbol="bitcoin"):
url = "https://api.coingecko.com/api/v3/simple/price"
params = {"ids": symbol, "vs_currencies": "usd"}
resp = requests.get(url, params=params, headers=self.headers)
data = resp.json()
return data[symbol]["usd"]
def incremental_load(self, symbol="bitcoin"):
price = self.fetch_price(symbol)
# Round to nearest minute
ts_minute = datetime.now().replace(second=0, microsecond=0)
ts_minute_str = ts_minute.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self.db.upsert_price(symbol, ts_minute_str, price)
if __name__ == "__main__":
db = CryptoDB(DB_HOST, DB_USER, DB_PASS, DB_NAME)
loader = CryptoPriceLoader(db, API_KEY)
print("\n--- Starting Incremental Load Demo ---")
for i in range(9):
loader.incremental_load("bitcoin")
time.sleep(20)
df_final = db.fetch_recent(limit=10)
print("\nLatest rows in DB:")
print(df_final)
db.close()