Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .dlt/secrets.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[destination.postgres.credentials]
host = "localhost"
port = 5432
database = "stables"
username = "postgres"
password = "your_password_here"
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ETHERSCAN_API_KEY=

# PostgreSQL Configuration
POSTGRES_HOST=
POSTGRES_PORT=
POSTGRES_DB=
POSTGRES_USER=
POSTGRES_PASSWORD=
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ venv/
ENV/
env/
.env
.dlt/secrets.toml
.venv/

# IDE
Expand Down
Empty file removed data/staged/.gitkeep
Empty file.
23 changes: 14 additions & 9 deletions dbt_subprojects/curve/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ etherscan_raw:
target: staged
outputs:
raw:
type: duckdb
path: ../../data/raw/raw_curve.duckdb
type: postgres
host: localhost
port: 5432
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: raw_curve
threads: 4

staged:
type: duckdb
path: ../../data/staged/staged_curve.duckdb
type: postgres
host: localhost
port: 5432
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: crvusd_market
threads: 4
# schema: "staged_crvusd_market"
schema: "crvusd_market"
attach:
- path: ../../data/raw/raw_curve.duckdb
alias: raw_db



Expand Down
37 changes: 21 additions & 16 deletions dbt_subprojects/ethena/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,31 @@ etherscan_raw:
target: staged
outputs:
raw:
type: duckdb
path: ../../data/raw/raw_ethena.duckdb
type: postgres
host: localhost
port: 5432
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: raw_ethena
threads: 4

staged:
type: duckdb
path: ../../data/staged/staged_ethena.duckdb
type: postgres
host: localhost
port: 5432
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: usde
threads: 4
schema: "usde"
attach:
- path: ../../data/raw/raw_ethena.duckdb
alias: raw_db

marts:
type: duckdb
path: ../../data/marts/marts_ethena.duckdb
type: postgres
host: localhost
port: 5432
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: usde_marts
threads: 4
schema: "usde_marts"
attach:
- path: ../../data/raw/raw_ethena.duckdb
alias: raw_db
- path: ../../data/staged/staged_ethena.duckdb
alias: staged_db
18 changes: 18 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: "3.8"

services:
postgres:
image: postgres:15
container_name: stables_postgres
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
ports:
- "${POSTGRES_PORT}:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped

volumes:
postgres_data:
File renamed without changes.
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ description = "stables data"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"dbt-duckdb>=1.9.4",
"dlt[duckdb]>=1.12.3",
"psycopg2-binary>=2.9.0",
"jupyter>=1.1.1",
"jupyter-contrib-nbextensions",
"matplotlib>=3.10.3",
Expand All @@ -18,6 +17,10 @@ dependencies = [
"scikit-learn>=1.6.1",
"scipy>=1.15.3",
"seaborn>=0.13.2",
"sqlalchemy>=2.0.0",
"dlt[postgres]>=1.12.3",
"dbt-core>=1.10.4",
"dbt-postgres>=1.9.0",
]

[tool.uv.sources]
Expand Down
26 changes: 0 additions & 26 deletions run_ethena_dbt.sh

This file was deleted.

13 changes: 9 additions & 4 deletions scripts/curve_dlt_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os, json, time, logging
import dlt, duckdb
import dlt
from utils import setup_logging, get_loaded_block
from stables.data.source import etherscan_logs, get_latest_block

Expand All @@ -8,15 +8,20 @@


def backfill_logs():
duckdb_destination = "data/raw/raw_curve.duckdb"
table_catalog = "raw_curve"
table_schema = "crvusd_market"
table_name = "logs"
chainid = 1
block_chunk_size = 100000
pipeline = dlt.pipeline(
pipeline_name="curve",
destination=dlt.destinations.duckdb(duckdb_destination),
destination=dlt.destinations.postgres(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=int(os.getenv("POSTGRES_PORT", "5432")),
database=os.getenv("POSTGRES_DB"),
username=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
),
dataset_name=table_schema,
)
with open("data/address/curve_addresses.json", "r") as f:
Expand All @@ -28,7 +33,7 @@ def backfill_logs():
end_block = get_latest_block(chainid=chainid)
for controller_address in controller_addresses[4:]:
start_block = get_loaded_block(
duckdb_destination,
None, # No longer need file path for PostgreSQL
table_catalog,
table_schema,
table_name,
Expand Down
26 changes: 7 additions & 19 deletions scripts/defillama_dlt_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os, json, time, logging
import dlt, duckdb
import dlt
from utils import setup_logging
from stables.data.source import (
defillama_stables_base,
Expand All @@ -10,35 +10,22 @@

logger = logging.getLogger(__name__)
setup_logging(log_file="logs/defillama_dlt_pipeline.log")
from dotenv import load_dotenv

load_dotenv()


def defillama_stables_pipeline():
"""
Runs the DeFiLlama stablecoins pipeline and displays the loaded data.
"""
duckdb_destination = "data/raw/raw_defillama.duckdb"
# DLT will now use the credentials from .dlt/secrets.toml
pipeline = dlt.pipeline(
pipeline_name="defillama_stables",
destination=dlt.destinations.duckdb(duckdb_destination),
destination="postgres",
dataset_name="yields",
)

# Run the pipeline
# pipeline.run(
# defillama_stables_base(), table_name="base", write_disposition="replace"
# )
# pipeline.run(
# defillama_stables_chain_circulating(),
# table_name="chain_circulating",
# write_disposition="replace",
# )

# pipeline.run(
# defillama_stablecoin_chain_tokens(stablecoin_id=110),
# table_name="chain_tokens",
# write_disposition="replace",
# )

pipeline.run(
defillama_yield_pools(),
table_name="all_pools",
Expand All @@ -48,3 +35,4 @@ def defillama_stables_pipeline():

if __name__ == "__main__":
defillama_stables_pipeline()
# check_variables()
50 changes: 32 additions & 18 deletions scripts/ethena_dlt_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os, json, time, logging
import dlt, duckdb
import dlt
import psycopg2
from utils import setup_logging, get_loaded_block
from stables.data.source import etherscan_logs, get_latest_block

Expand All @@ -12,7 +13,6 @@ def load_logs(
table_name,
chainid,
pipeline,
duckdb_destination,
table_catalog,
table_schema,
block_chunk_size=100000,
Expand All @@ -21,7 +21,7 @@ def load_logs(
):
if start_block is None:
start_block = get_loaded_block(
duckdb_destination,
None, # No longer need file path for PostgreSQL
table_catalog,
table_schema,
table_name,
Expand All @@ -40,13 +40,18 @@ def load_logs(
retries = max_retries
while retries > 0:
try:
n_before = (
duckdb.connect(duckdb_destination)
.execute(
f"SELECT COUNT(*) FROM {table_catalog}.{table_schema}.{table_name}"
)
.fetchone()[0]
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=int(os.getenv("POSTGRES_PORT", "5432")),
database=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
)
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_schema}.{table_name}")
n_before = cursor.fetchone()[0]
cursor.close()
conn.close()
pipeline.run(
etherscan_logs(
chainid=chainid,
Expand All @@ -57,13 +62,18 @@ def load_logs(
table_name=table_name,
write_disposition="append",
)
n_after = (
duckdb.connect(duckdb_destination)
.execute(
f"SELECT COUNT(*) FROM {table_catalog}.{table_schema}.{table_name}"
)
.fetchone()[0]
conn = psycopg2.connect(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=int(os.getenv("POSTGRES_PORT", "5432")),
database=os.getenv("POSTGRES_DB"),
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
)
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_schema}.{table_name}")
n_after = cursor.fetchone()[0]
cursor.close()
conn.close()
if n_after - n_before == 1000:
logger.warning(
f"Loaded 1000 logs from {from_block} to {to_block}, smaller batch size may be needed."
Expand All @@ -88,14 +98,19 @@ def load_logs(


def backfill_logs():
duckdb_destination = "data/raw/raw_ethena.duckdb"
table_catalog = "raw_ethena"
table_schema = "usde"
table_name = "logs"
chainid = 1
pipeline = dlt.pipeline(
pipeline_name="ethena",
destination=dlt.destinations.duckdb(duckdb_destination),
destination=dlt.destinations.postgres(
host=os.getenv("POSTGRES_HOST", "localhost"),
port=int(os.getenv("POSTGRES_PORT", "5432")),
database=os.getenv("POSTGRES_DB"),
username=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
),
dataset_name=table_schema,
)
contract_address = "0x4c9EDD5852cd905f086C759E8383e09bff1E68B3".lower()
Expand All @@ -104,7 +119,6 @@ def backfill_logs():
table_name,
chainid,
pipeline,
duckdb_destination,
table_catalog,
table_schema,
block_chunk_size=1000,
Expand Down
Loading
Loading