Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 132 additions & 3 deletions data/canonical/canonical_generator_simple.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
"outputs": [],
"source": [
"import os\n",
"import json\n",
"import subprocess\n",
"import sys\n",
"from pyspark.sql import functions as F\n",
"from datetime import datetime, timedelta\n",
"import pandas as pd\n",
Expand All @@ -38,6 +41,15 @@
" dbutils.widgets.text(\"VOLUME\", \"events\")\n",
" dbutils.widgets.text(\"START_DAY\", \"70\")\n",
" dbutils.widgets.text(\"SPEED_MULTIPLIER\", \"60.0\")\n",
" dbutils.widgets.text(\"INGEST_MODE\", \"volume\")\n",
" dbutils.widgets.text(\"RAW_EVENTS_TABLE\", \"events_ingest\")\n",
" dbutils.widgets.text(\"ZEROBUS_REGION\", \"\")\n",
" dbutils.widgets.text(\"ZEROBUS_ENDPOINT\", \"\")\n",
" dbutils.widgets.text(\"ZEROBUS_SECRET_SCOPE\", \"\")\n",
" dbutils.widgets.text(\"ZEROBUS_CLIENT_ID_KEY\", \"client-id\")\n",
" dbutils.widgets.text(\"ZEROBUS_CLIENT_SECRET_KEY\", \"client-secret\")\n",
" dbutils.widgets.text(\"ZEROBUS_BATCH_SIZE\", \"1000\")\n",
" dbutils.widgets.text(\"ZEROBUS_FLUSH_EVERY_BATCHES\", \"10\")\n",
"except:\n",
" pass\n",
"\n",
Expand All @@ -47,11 +59,24 @@
"VOLUME = dbutils.widgets.get(\"VOLUME\")\n",
"START_DAY = int(dbutils.widgets.get(\"START_DAY\"))\n",
"SPEED_MULTIPLIER = float(dbutils.widgets.get(\"SPEED_MULTIPLIER\"))\n",
"INGEST_MODE = (dbutils.widgets.get(\"INGEST_MODE\") if dbutils.widgets.get(\"INGEST_MODE\") else \"volume\").lower()\n",
"RAW_EVENTS_TABLE = dbutils.widgets.get(\"RAW_EVENTS_TABLE\") if dbutils.widgets.get(\"RAW_EVENTS_TABLE\") else \"events_ingest\"\n",
"ZEROBUS_REGION = dbutils.widgets.get(\"ZEROBUS_REGION\") if dbutils.widgets.get(\"ZEROBUS_REGION\") else \"\"\n",
"ZEROBUS_ENDPOINT = dbutils.widgets.get(\"ZEROBUS_ENDPOINT\") if dbutils.widgets.get(\"ZEROBUS_ENDPOINT\") else \"\"\n",
"ZEROBUS_SECRET_SCOPE = dbutils.widgets.get(\"ZEROBUS_SECRET_SCOPE\") if dbutils.widgets.get(\"ZEROBUS_SECRET_SCOPE\") else \"\"\n",
"ZEROBUS_CLIENT_ID_KEY = dbutils.widgets.get(\"ZEROBUS_CLIENT_ID_KEY\") if dbutils.widgets.get(\"ZEROBUS_CLIENT_ID_KEY\") else \"client-id\"\n",
"ZEROBUS_CLIENT_SECRET_KEY = dbutils.widgets.get(\"ZEROBUS_CLIENT_SECRET_KEY\") if dbutils.widgets.get(\"ZEROBUS_CLIENT_SECRET_KEY\") else \"client-secret\"\n",
"ZEROBUS_BATCH_SIZE = int(dbutils.widgets.get(\"ZEROBUS_BATCH_SIZE\")) if dbutils.widgets.get(\"ZEROBUS_BATCH_SIZE\") else 1000\n",
"ZEROBUS_FLUSH_EVERY_BATCHES = int(dbutils.widgets.get(\"ZEROBUS_FLUSH_EVERY_BATCHES\")) if dbutils.widgets.get(\"ZEROBUS_FLUSH_EVERY_BATCHES\") else 10\n",
"\n",
"if INGEST_MODE not in {\"volume\", \"zerobus\"}:\n",
" raise ValueError(f\"Unsupported INGEST_MODE={INGEST_MODE}. Expected 'volume' or 'zerobus'.\")\n",
"\n",
"# Paths\n",
"VOLUME_PATH = f\"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}\"\n",
"WATERMARK_PATH = f\"/Volumes/{CATALOG}/{SCHEMA}/misc/_watermark\"\n",
"SIM_START_PATH = f\"/Volumes/{CATALOG}/{SCHEMA}/misc/_sim_start\"\n",
"RAW_EVENTS_TABLE_FQN = f\"{CATALOG}.{SCHEMA}.{RAW_EVENTS_TABLE}\"\n",
"\n",
"# Constants\n",
"DATASET_EPOCH = int(datetime(2024, 1, 1).timestamp())\n",
Expand All @@ -60,7 +85,11 @@
"NOW = datetime.utcnow()\n",
"\n",
"print(f\"Config: START_DAY={START_DAY}, SPEED={SPEED_MULTIPLIER}x\")\n",
"print(f\"Output: {VOLUME_PATH}\")\n",
"print(f\"Ingestion mode: {INGEST_MODE}\")\n",
"if INGEST_MODE == \"volume\":\n",
" print(f\"Output volume: {VOLUME_PATH}\")\n",
"else:\n",
" print(f\"Output table: {RAW_EVENTS_TABLE_FQN}\")\n",
"print(f\"Dataset cycle: {DATASET_DAYS} days ({CYCLE_SECONDS} seconds)\")\n"
]
},
Expand Down Expand Up @@ -306,9 +335,109 @@
"metadata": {},
"outputs": [],
"source": [
"def _workspace_url() -> str:\n",
" host = os.environ.get(\"DATABRICKS_HOST\") or spark.conf.get(\"spark.databricks.workspaceUrl\", \"\")\n",
" if not host:\n",
" raise ValueError(\"Could not determine Databricks workspace URL. Set DATABRICKS_HOST.\")\n",
" if not host.startswith(\"http\"):\n",
" host = f\"https://{host}\"\n",
" return host.rstrip(\"/\")\n",
"\n",
"def _workspace_id() -> str:\n",
" try:\n",
" return str(dbutils.notebook.entry_point.getDbutils().notebook().getContext().workspaceId().get())\n",
" except Exception as e:\n",
" raise ValueError(\"Could not determine workspace id from notebook context.\") from e\n",
"\n",
"def _infer_zerobus_endpoint() -> str:\n",
" if ZEROBUS_ENDPOINT:\n",
" endpoint = ZEROBUS_ENDPOINT.strip()\n",
" if endpoint.startswith(\"http\"):\n",
" return endpoint.rstrip(\"/\")\n",
" return f\"https://{endpoint}\".rstrip(\"/\")\n",
"\n",
" if not ZEROBUS_REGION:\n",
" raise ValueError(\"ZEROBUS_REGION is required when ZEROBUS_ENDPOINT is not provided.\")\n",
"\n",
" return f\"https://{_workspace_id()}.zerobus.{ZEROBUS_REGION}.cloud.databricks.com\"\n",
"\n",
"def _resolve_zerobus_credentials() -> tuple[str, str]:\n",
" if not (ZEROBUS_SECRET_SCOPE and ZEROBUS_CLIENT_ID_KEY and ZEROBUS_CLIENT_SECRET_KEY):\n",
" raise ValueError(\n",
" \"Zerobus credential settings are missing. Expect ZEROBUS_SECRET_SCOPE, ZEROBUS_CLIENT_ID_KEY, and ZEROBUS_CLIENT_SECRET_KEY.\"\n",
" )\n",
"\n",
" client_id = dbutils.secrets.get(scope=ZEROBUS_SECRET_SCOPE, key=ZEROBUS_CLIENT_ID_KEY)\n",
" client_secret = dbutils.secrets.get(scope=ZEROBUS_SECRET_SCOPE, key=ZEROBUS_CLIENT_SECRET_KEY)\n",
"\n",
" if not client_id or not client_secret:\n",
" raise ValueError(\"Retrieved empty Zerobus credentials from secret scope\")\n",
" return client_id, client_secret\n",
"\n",
"def _load_zerobus_sdk():\n",
" try:\n",
" from zerobus.sdk.sync import ZerobusSdk\n",
" from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties\n",
" return ZerobusSdk, RecordType, StreamConfigurationOptions, TableProperties\n",
" except ImportError:\n",
" subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"databricks-zerobus-ingest-sdk\"])\n",
" from zerobus.sdk.sync import ZerobusSdk\n",
" from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties\n",
" return ZerobusSdk, RecordType, StreamConfigurationOptions, TableProperties\n",
"\n",
"def _publish_via_zerobus(df):\n",
" ZerobusSdk, RecordType, StreamConfigurationOptions, TableProperties = _load_zerobus_sdk()\n",
"\n",
" endpoint = _infer_zerobus_endpoint()\n",
" workspace_url = _workspace_url()\n",
" client_id, client_secret = _resolve_zerobus_credentials()\n",
"\n",
" options = StreamConfigurationOptions(record_type=RecordType.JSON)\n",
" table_properties = TableProperties(RAW_EVENTS_TABLE_FQN)\n",
" sdk = ZerobusSdk(endpoint, workspace_url)\n",
" stream = sdk.create_stream(client_id, client_secret, table_properties, options)\n",
"\n",
" sent = 0\n",
" batch = []\n",
" batches_since_flush = 0\n",
"\n",
" try:\n",
" for row in df.toLocalIterator():\n",
" record = row.asDict(recursive=True)\n",
" if record.get(\"location_id\") is not None:\n",
" record[\"location_id\"] = int(record[\"location_id\"])\n",
" if record.get(\"sequence\") is not None:\n",
" record[\"sequence\"] = int(record[\"sequence\"])\n",
" batch.append(json.dumps(record, separators=(\",\", \":\")))\n",
"\n",
" if len(batch) >= ZEROBUS_BATCH_SIZE:\n",
" stream.ingest_records_offset(batch)\n",
" sent += len(batch)\n",
" batch = []\n",
" batches_since_flush += 1\n",
" if batches_since_flush >= ZEROBUS_FLUSH_EVERY_BATCHES:\n",
" stream.flush()\n",
" batches_since_flush = 0\n",
"\n",
" if batch:\n",
" stream.ingest_records_offset(batch)\n",
" sent += len(batch)\n",
"\n",
" stream.flush()\n",
" finally:\n",
" stream.close()\n",
"\n",
" return sent, endpoint\n",
"\n",
"# Write\n",
"final_df.write.mode(\"append\").json(VOLUME_PATH)\n",
"print(f\"Wrote {event_count:,} events\")\n",
"if INGEST_MODE == \"zerobus\":\n",
" written_count, endpoint = _publish_via_zerobus(final_df)\n",
" print(f\"Published {written_count:,} events via Zerobus -> {RAW_EVENTS_TABLE_FQN}\")\n",
" print(f\"Endpoint: {endpoint}\")\n",
"else:\n",
" final_df.write.mode(\"append\").json(VOLUME_PATH)\n",
" written_count = event_count\n",
" print(f\"Wrote {written_count:,} events to volume\")\n",
"\n",
"# Update watermark (virtual simulation cursor)\n",
"spark.createDataFrame([(str(new_end_seconds),)], [\"value\"]).write.mode(\"overwrite\").text(WATERMARK_PATH)\n",
Expand Down
63 changes: 63 additions & 0 deletions databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ sync:
- "*.pyc"
- images/**
- docs/**
- apps/caspersai-terminal/node_modules/**
- apps/caspersai-terminal/logs/**
- apps/caspersai-terminal/test/**

scripts:
cleanup:
Expand Down Expand Up @@ -105,6 +108,18 @@ targets:
default: "3"
- name: PIPELINE_SCHEDULE_MINUTES
default: "0"
- name: INGEST_MODE
default: "volume"
- name: RAW_EVENTS_TABLE
default: events_ingest
- name: ZEROBUS_REGION
default: ""
- name: ZEROBUS_ENDPOINT
default: ""
- name: ZEROBUS_BATCH_SIZE
default: "1000"
- name: ZEROBUS_FLUSH_EVERY_BATCHES
default: "10"

tasks:
- task_key: Canonical_Data
Expand Down Expand Up @@ -171,6 +186,18 @@ targets:
default: "3"
- name: PIPELINE_SCHEDULE_MINUTES
default: "0"
- name: INGEST_MODE
default: "volume"
- name: RAW_EVENTS_TABLE
default: events_ingest
- name: ZEROBUS_REGION
default: ""
- name: ZEROBUS_ENDPOINT
default: ""
- name: ZEROBUS_BATCH_SIZE
default: "1000"
- name: ZEROBUS_FLUSH_EVERY_BATCHES
default: "10"

tasks:
- task_key: Canonical_Data
Expand Down Expand Up @@ -232,6 +259,18 @@ targets:
default: "3"
- name: PIPELINE_SCHEDULE_MINUTES
default: "3"
- name: INGEST_MODE
default: "volume"
- name: RAW_EVENTS_TABLE
default: events_ingest
- name: ZEROBUS_REGION
default: ""
- name: ZEROBUS_ENDPOINT
default: ""
- name: ZEROBUS_BATCH_SIZE
default: "1000"
- name: ZEROBUS_FLUSH_EVERY_BATCHES
default: "10"

tasks:
- task_key: Canonical_Data
Expand Down Expand Up @@ -278,6 +317,18 @@ targets:
default: "3"
- name: PIPELINE_SCHEDULE_MINUTES
default: "0"
- name: INGEST_MODE
default: "volume"
- name: RAW_EVENTS_TABLE
default: events_ingest
- name: ZEROBUS_REGION
default: ""
- name: ZEROBUS_ENDPOINT
default: ""
- name: ZEROBUS_BATCH_SIZE
default: "1000"
- name: ZEROBUS_FLUSH_EVERY_BATCHES
default: "10"

tasks:
- task_key: Canonical_Data
Expand Down Expand Up @@ -374,6 +425,18 @@ targets:
default: "3"
- name: PIPELINE_SCHEDULE_MINUTES
default: "0"
- name: INGEST_MODE
default: "volume"
- name: RAW_EVENTS_TABLE
default: events_ingest
- name: ZEROBUS_REGION
default: ""
- name: ZEROBUS_ENDPOINT
default: ""
- name: ZEROBUS_BATCH_SIZE
default: "1000"
- name: ZEROBUS_FLUSH_EVERY_BATCHES
default: "10"

tasks:
- task_key: Canonical_Data
Expand Down
54 changes: 47 additions & 7 deletions pipelines/order_items/transformations/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,59 @@
)

# ──────────────────────────────────────────────────────────────
# 0. Bronze – raw event stream
# 0. Bronze – raw event sources
# ──────────────────────────────────────────────────────────────
@dlt.table(
comment = "Raw JSON events as ingested (one file per event)."
@dlt.view(
comment = "Raw event stream from Volume JSON files (canonical / Autoloader)."
)
def all_events():
def bronze_volume_events():
CATALOG = spark.conf.get("RAW_DATA_CATALOG")
SCHEMA = spark.conf.get("RAW_DATA_SCHEMA")
VOLUME = spark.conf.get("RAW_DATA_VOLUME")
SCHEMA = spark.conf.get("RAW_DATA_SCHEMA")
VOLUME = spark.conf.get("RAW_DATA_VOLUME")

return (
spark.readStream.format("cloudFiles")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}")
.select(
F.col("event_id").cast("string").alias("event_id"),
F.col("event_type").cast("string").alias("event_type"),
F.col("ts").cast("string").alias("ts"),
F.col("location_id").cast("int").alias("location_id"),
F.col("order_id").cast("string").alias("order_id"),
F.col("sequence").cast("int").alias("sequence"),
F.col("body").cast("string").alias("body"),
)
)

@dlt.view(
comment = "Raw event stream from Zerobus Delta table (events_ingest)."
)
def bronze_zerobus_events():
CATALOG = spark.conf.get("RAW_DATA_CATALOG")
SCHEMA = spark.conf.get("RAW_DATA_SCHEMA")
TABLE = spark.conf.get("RAW_DATA_TABLE", "events_ingest")

return (
spark.readStream.table(f"{CATALOG}.{SCHEMA}.{TABLE}")
.select(
F.col("event_id").cast("string").alias("event_id"),
F.col("event_type").cast("string").alias("event_type"),
F.col("ts").cast("string").alias("ts"),
F.col("location_id").cast("int").alias("location_id"),
F.col("order_id").cast("string").alias("order_id"),
F.col("sequence").cast("int").alias("sequence"),
F.col("body").cast("string").alias("body"),
)
)

@dlt.table(
comment = "Union of all raw event streams — volume (JSON) and Zerobus (Delta)."
)
def all_events():
return (
dlt.read_stream("bronze_volume_events")
.unionByName(dlt.read_stream("bronze_zerobus_events"))
)

# ──────────────────────────────────────────────────────────────
Expand Down
Loading