From e8b22189a6420e9828783fd4a082b1e4b0076761 Mon Sep 17 00:00:00 2001 From: Nick Karpov Date: Tue, 3 Mar 2026 09:44:34 -0800 Subject: [PATCH 1/2] feat(canonical): add dual-read ingestion and zerobus bootstrap - make bronze ingestion read both volume files and raw events table\n- auto-bootstrap zerobus service principal, secrets, and UC grants\n- add ingest/zerobus params with volume default across targets\n- keep free target on volume by default while enabling runtime cutover --- .../canonical_generator_simple.ipynb | 135 ++++++++++++- databricks.yml | 60 ++++++ .../transformations/transformation.py | 32 ++- stages/canonical_data.ipynb | 188 ++++++++++++++++-- stages/lakeflow.ipynb | 17 +- 5 files changed, 397 insertions(+), 35 deletions(-) diff --git a/data/canonical/canonical_generator_simple.ipynb b/data/canonical/canonical_generator_simple.ipynb index cb289f8..a534aac 100644 --- a/data/canonical/canonical_generator_simple.ipynb +++ b/data/canonical/canonical_generator_simple.ipynb @@ -27,6 +27,9 @@ "outputs": [], "source": [ "import os\n", + "import json\n", + "import subprocess\n", + "import sys\n", "from pyspark.sql import functions as F\n", "from datetime import datetime, timedelta\n", "import pandas as pd\n", @@ -38,6 +41,15 @@ " dbutils.widgets.text(\"VOLUME\", \"events\")\n", " dbutils.widgets.text(\"START_DAY\", \"70\")\n", " dbutils.widgets.text(\"SPEED_MULTIPLIER\", \"60.0\")\n", + " dbutils.widgets.text(\"INGEST_MODE\", \"volume\")\n", + " dbutils.widgets.text(\"RAW_EVENTS_TABLE\", \"events_ingest\")\n", + " dbutils.widgets.text(\"ZEROBUS_REGION\", \"\")\n", + " dbutils.widgets.text(\"ZEROBUS_ENDPOINT\", \"\")\n", + " dbutils.widgets.text(\"ZEROBUS_SECRET_SCOPE\", \"\")\n", + " dbutils.widgets.text(\"ZEROBUS_CLIENT_ID_KEY\", \"client-id\")\n", + " dbutils.widgets.text(\"ZEROBUS_CLIENT_SECRET_KEY\", \"client-secret\")\n", + " dbutils.widgets.text(\"ZEROBUS_BATCH_SIZE\", \"1000\")\n", + " dbutils.widgets.text(\"ZEROBUS_FLUSH_EVERY_BATCHES\", \"10\")\n", "except:\n", " pass\n", "\n", @@ -47,11 +59,24 @@ "VOLUME = dbutils.widgets.get(\"VOLUME\")\n", "START_DAY = int(dbutils.widgets.get(\"START_DAY\"))\n", "SPEED_MULTIPLIER = float(dbutils.widgets.get(\"SPEED_MULTIPLIER\"))\n", + "INGEST_MODE = (dbutils.widgets.get(\"INGEST_MODE\") if dbutils.widgets.get(\"INGEST_MODE\") else \"volume\").lower()\n", + "RAW_EVENTS_TABLE = dbutils.widgets.get(\"RAW_EVENTS_TABLE\") if dbutils.widgets.get(\"RAW_EVENTS_TABLE\") else \"events_ingest\"\n", + "ZEROBUS_REGION = dbutils.widgets.get(\"ZEROBUS_REGION\") if dbutils.widgets.get(\"ZEROBUS_REGION\") else \"\"\n", + "ZEROBUS_ENDPOINT = dbutils.widgets.get(\"ZEROBUS_ENDPOINT\") if dbutils.widgets.get(\"ZEROBUS_ENDPOINT\") else \"\"\n", + "ZEROBUS_SECRET_SCOPE = dbutils.widgets.get(\"ZEROBUS_SECRET_SCOPE\") if dbutils.widgets.get(\"ZEROBUS_SECRET_SCOPE\") else \"\"\n", + "ZEROBUS_CLIENT_ID_KEY = dbutils.widgets.get(\"ZEROBUS_CLIENT_ID_KEY\") if dbutils.widgets.get(\"ZEROBUS_CLIENT_ID_KEY\") else \"client-id\"\n", + "ZEROBUS_CLIENT_SECRET_KEY = dbutils.widgets.get(\"ZEROBUS_CLIENT_SECRET_KEY\") if dbutils.widgets.get(\"ZEROBUS_CLIENT_SECRET_KEY\") else \"client-secret\"\n", + "ZEROBUS_BATCH_SIZE = int(dbutils.widgets.get(\"ZEROBUS_BATCH_SIZE\")) if dbutils.widgets.get(\"ZEROBUS_BATCH_SIZE\") else 1000\n", + "ZEROBUS_FLUSH_EVERY_BATCHES = int(dbutils.widgets.get(\"ZEROBUS_FLUSH_EVERY_BATCHES\")) if dbutils.widgets.get(\"ZEROBUS_FLUSH_EVERY_BATCHES\") else 10\n", + "\n", + "if INGEST_MODE not in {\"volume\", \"zerobus\"}:\n", + " raise ValueError(f\"Unsupported INGEST_MODE={INGEST_MODE}. Expected 'volume' or 'zerobus'.\")\n", "\n", "# Paths\n", "VOLUME_PATH = f\"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}\"\n", "WATERMARK_PATH = f\"/Volumes/{CATALOG}/{SCHEMA}/misc/_watermark\"\n", "SIM_START_PATH = f\"/Volumes/{CATALOG}/{SCHEMA}/misc/_sim_start\"\n", + "RAW_EVENTS_TABLE_FQN = f\"{CATALOG}.{SCHEMA}.{RAW_EVENTS_TABLE}\"\n", "\n", "# Constants\n", "DATASET_EPOCH = int(datetime(2024, 1, 1).timestamp())\n", @@ -60,7 +85,11 @@ "NOW = datetime.utcnow()\n", "\n", "print(f\"Config: START_DAY={START_DAY}, SPEED={SPEED_MULTIPLIER}x\")\n", - "print(f\"Output: {VOLUME_PATH}\")\n", + "print(f\"Ingestion mode: {INGEST_MODE}\")\n", + "if INGEST_MODE == \"volume\":\n", + " print(f\"Output volume: {VOLUME_PATH}\")\n", + "else:\n", + " print(f\"Output table: {RAW_EVENTS_TABLE_FQN}\")\n", "print(f\"Dataset cycle: {DATASET_DAYS} days ({CYCLE_SECONDS} seconds)\")\n" ] }, @@ -306,9 +335,109 @@ "metadata": {}, "outputs": [], "source": [ + "def _workspace_url() -> str:\n", + " host = os.environ.get(\"DATABRICKS_HOST\") or spark.conf.get(\"spark.databricks.workspaceUrl\", \"\")\n", + " if not host:\n", + " raise ValueError(\"Could not determine Databricks workspace URL. Set DATABRICKS_HOST.\")\n", + " if not host.startswith(\"http\"):\n", + " host = f\"https://{host}\"\n", + " return host.rstrip(\"/\")\n", + "\n", + "def _workspace_id() -> str:\n", + " try:\n", + " return str(dbutils.notebook.entry_point.getDbutils().notebook().getContext().workspaceId().get())\n", + " except Exception as e:\n", + " raise ValueError(\"Could not determine workspace id from notebook context.\") from e\n", + "\n", + "def _infer_zerobus_endpoint() -> str:\n", + " if ZEROBUS_ENDPOINT:\n", + " endpoint = ZEROBUS_ENDPOINT.strip()\n", + " if endpoint.startswith(\"http\"):\n", + " return endpoint.rstrip(\"/\")\n", + " return f\"https://{endpoint}\".rstrip(\"/\")\n", + "\n", + " if not ZEROBUS_REGION:\n", + " raise ValueError(\"ZEROBUS_REGION is required when ZEROBUS_ENDPOINT is not provided.\")\n", + "\n", + " return f\"https://{_workspace_id()}.zerobus.{ZEROBUS_REGION}.cloud.databricks.com\"\n", + "\n", + "def _resolve_zerobus_credentials() -> tuple[str, str]:\n", + " if not (ZEROBUS_SECRET_SCOPE and ZEROBUS_CLIENT_ID_KEY and ZEROBUS_CLIENT_SECRET_KEY):\n", + " raise ValueError(\n", + " \"Zerobus credential settings are missing. Expect ZEROBUS_SECRET_SCOPE, ZEROBUS_CLIENT_ID_KEY, and ZEROBUS_CLIENT_SECRET_KEY.\"\n", + " )\n", + "\n", + " client_id = dbutils.secrets.get(scope=ZEROBUS_SECRET_SCOPE, key=ZEROBUS_CLIENT_ID_KEY)\n", + " client_secret = dbutils.secrets.get(scope=ZEROBUS_SECRET_SCOPE, key=ZEROBUS_CLIENT_SECRET_KEY)\n", + "\n", + " if not client_id or not client_secret:\n", + " raise ValueError(\"Retrieved empty Zerobus credentials from secret scope\")\n", + " return client_id, client_secret\n", + "\n", + "def _load_zerobus_sdk():\n", + " try:\n", + " from zerobus.sdk.sync import ZerobusSdk\n", + " from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties\n", + " return ZerobusSdk, RecordType, StreamConfigurationOptions, TableProperties\n", + " except ImportError:\n", + " subprocess.check_call([sys.executable, \"-m\", \"pip\", \"install\", \"databricks-zerobus-ingest-sdk\"])\n", + " from zerobus.sdk.sync import ZerobusSdk\n", + " from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties\n", + " return ZerobusSdk, RecordType, StreamConfigurationOptions, TableProperties\n", + "\n", + "def _publish_via_zerobus(df):\n", + " ZerobusSdk, RecordType, StreamConfigurationOptions, TableProperties = _load_zerobus_sdk()\n", + "\n", + " endpoint = _infer_zerobus_endpoint()\n", + " workspace_url = _workspace_url()\n", + " client_id, client_secret = _resolve_zerobus_credentials()\n", + "\n", + " options = StreamConfigurationOptions(record_type=RecordType.JSON)\n", + " table_properties = TableProperties(RAW_EVENTS_TABLE_FQN)\n", + " sdk = ZerobusSdk(endpoint, workspace_url)\n", + " stream = sdk.create_stream(client_id, client_secret, table_properties, options)\n", + "\n", + " sent = 0\n", + " batch = []\n", + " batches_since_flush = 0\n", + "\n", + " try:\n", + " for row in df.toLocalIterator():\n", + " record = row.asDict(recursive=True)\n", + " if record.get(\"location_id\") is not None:\n", + " record[\"location_id\"] = int(record[\"location_id\"])\n", + " if record.get(\"sequence\") is not None:\n", + " record[\"sequence\"] = int(record[\"sequence\"])\n", + " batch.append(json.dumps(record, separators=(\",\", \":\")))\n", + "\n", + " if len(batch) >= ZEROBUS_BATCH_SIZE:\n", + " stream.ingest_records_offset(batch)\n", + " sent += len(batch)\n", + " batch = []\n", + " batches_since_flush += 1\n", + " if batches_since_flush >= ZEROBUS_FLUSH_EVERY_BATCHES:\n", + " stream.flush()\n", + " batches_since_flush = 0\n", + "\n", + " if batch:\n", + " stream.ingest_records_offset(batch)\n", + " sent += len(batch)\n", + "\n", + " stream.flush()\n", + " finally:\n", + " stream.close()\n", + "\n", + " return sent, endpoint\n", + "\n", "# Write\n", - "final_df.write.mode(\"append\").json(VOLUME_PATH)\n", - "print(f\"Wrote {event_count:,} events\")\n", + "if INGEST_MODE == \"zerobus\":\n", + " written_count, endpoint = _publish_via_zerobus(final_df)\n", + " print(f\"Published {written_count:,} events via Zerobus -> {RAW_EVENTS_TABLE_FQN}\")\n", + " print(f\"Endpoint: {endpoint}\")\n", + "else:\n", + " final_df.write.mode(\"append\").json(VOLUME_PATH)\n", + " written_count = event_count\n", + " print(f\"Wrote {written_count:,} events to volume\")\n", "\n", "# Update watermark (virtual simulation cursor)\n", "spark.createDataFrame([(str(new_end_seconds),)], [\"value\"]).write.mode(\"overwrite\").text(WATERMARK_PATH)\n", diff --git a/databricks.yml b/databricks.yml index e1fac7d..6d054c8 100644 --- a/databricks.yml +++ b/databricks.yml @@ -105,6 +105,18 @@ targets: default: "3" - name: PIPELINE_SCHEDULE_MINUTES default: "0" + - name: INGEST_MODE + default: "volume" + - name: RAW_EVENTS_TABLE + default: events_ingest + - name: ZEROBUS_REGION + default: "" + - name: ZEROBUS_ENDPOINT + default: "" + - name: ZEROBUS_BATCH_SIZE + default: "1000" + - name: ZEROBUS_FLUSH_EVERY_BATCHES + default: "10" tasks: - task_key: Canonical_Data @@ -171,6 +183,18 @@ targets: default: "3" - name: PIPELINE_SCHEDULE_MINUTES default: "0" + - name: INGEST_MODE + default: "volume" + - name: RAW_EVENTS_TABLE + default: events_ingest + - name: ZEROBUS_REGION + default: "" + - name: ZEROBUS_ENDPOINT + default: "" + - name: ZEROBUS_BATCH_SIZE + default: "1000" + - name: ZEROBUS_FLUSH_EVERY_BATCHES + default: "10" tasks: - task_key: Canonical_Data @@ -232,6 +256,18 @@ targets: default: "3" - name: PIPELINE_SCHEDULE_MINUTES default: "3" + - name: INGEST_MODE + default: "volume" + - name: RAW_EVENTS_TABLE + default: events_ingest + - name: ZEROBUS_REGION + default: "" + - name: ZEROBUS_ENDPOINT + default: "" + - name: ZEROBUS_BATCH_SIZE + default: "1000" + - name: ZEROBUS_FLUSH_EVERY_BATCHES + default: "10" tasks: - task_key: Canonical_Data @@ -278,6 +314,18 @@ targets: default: "3" - name: PIPELINE_SCHEDULE_MINUTES default: "0" + - name: INGEST_MODE + default: "volume" + - name: RAW_EVENTS_TABLE + default: events_ingest + - name: ZEROBUS_REGION + default: "" + - name: ZEROBUS_ENDPOINT + default: "" + - name: ZEROBUS_BATCH_SIZE + default: "1000" + - name: ZEROBUS_FLUSH_EVERY_BATCHES + default: "10" tasks: - task_key: Canonical_Data @@ -374,6 +422,18 @@ targets: default: "3" - name: PIPELINE_SCHEDULE_MINUTES default: "0" + - name: INGEST_MODE + default: "volume" + - name: RAW_EVENTS_TABLE + default: events_ingest + - name: ZEROBUS_REGION + default: "" + - name: ZEROBUS_ENDPOINT + default: "" + - name: ZEROBUS_BATCH_SIZE + default: "1000" + - name: ZEROBUS_FLUSH_EVERY_BATCHES + default: "10" tasks: - task_key: Canonical_Data diff --git a/pipelines/order_items/transformations/transformation.py b/pipelines/order_items/transformations/transformation.py index 28908a5..81dff6f 100644 --- a/pipelines/order_items/transformations/transformation.py +++ b/pipelines/order_items/transformations/transformation.py @@ -12,18 +12,44 @@ # 0. Bronze – raw event stream # ────────────────────────────────────────────────────────────── @dlt.table( - comment = "Raw JSON events as ingested (one file per event)." + comment = "Raw event stream from both volume JSON files and Zerobus Delta table." ) def all_events(): CATALOG = spark.conf.get("RAW_DATA_CATALOG") SCHEMA = spark.conf.get("RAW_DATA_SCHEMA") VOLUME = spark.conf.get("RAW_DATA_VOLUME") - return ( - spark.readStream.format("cloudFiles") + TABLE = spark.conf.get("RAW_DATA_TABLE", "events_ingest") + + volume_events = ( + spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load(f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}") + .select( + F.col("event_id").cast("string").alias("event_id"), + F.col("event_type").cast("string").alias("event_type"), + F.col("ts").cast("string").alias("ts"), + F.col("location_id").cast("int").alias("location_id"), + F.col("order_id").cast("string").alias("order_id"), + F.col("sequence").cast("int").alias("sequence"), + F.col("body").cast("string").alias("body"), + ) ) + zerobus_events = ( + spark.readStream.table(f"{CATALOG}.{SCHEMA}.{TABLE}") + .select( + F.col("event_id").cast("string").alias("event_id"), + F.col("event_type").cast("string").alias("event_type"), + F.col("ts").cast("string").alias("ts"), + F.col("location_id").cast("int").alias("location_id"), + F.col("order_id").cast("string").alias("order_id"), + F.col("sequence").cast("int").alias("sequence"), + F.col("body").cast("string").alias("body"), + ) + ) + + return volume_events.unionByName(zerobus_events) + # ────────────────────────────────────────────────────────────── # 1. Silver – explode order items, add extended_price # ────────────────────────────────────────────────────────────── diff --git a/stages/canonical_data.ipynb b/stages/canonical_data.ipynb index 93cf25b..cedba3a 100644 --- a/stages/canonical_data.ipynb +++ b/stages/canonical_data.ipynb @@ -33,12 +33,41 @@ "cell_type": "code", "metadata": {}, "source": [ + "import re\n", + "\n", "CATALOG = dbutils.widgets.get(\"CATALOG\")\n", "EVENTS_VOLUME = dbutils.widgets.get(\"EVENTS_VOLUME\")\n", "SIMULATOR_SCHEMA = dbutils.widgets.get(\"SIMULATOR_SCHEMA\")\n", "START_DAY = dbutils.widgets.get(\"START_DAY\") if dbutils.widgets.get(\"START_DAY\") else \"20\"\n", "SPEED_MULTIPLIER = dbutils.widgets.get(\"SPEED_MULTIPLIER\") if dbutils.widgets.get(\"SPEED_MULTIPLIER\") else \"1.0\"\n", - "SCHEDULE_MINUTES = dbutils.widgets.get(\"SCHEDULE_MINUTES\") if dbutils.widgets.get(\"SCHEDULE_MINUTES\") else \"5\"" + "SCHEDULE_MINUTES = dbutils.widgets.get(\"SCHEDULE_MINUTES\") if dbutils.widgets.get(\"SCHEDULE_MINUTES\") else \"5\"\n", + "INGEST_MODE = (dbutils.widgets.get(\"INGEST_MODE\") if dbutils.widgets.get(\"INGEST_MODE\") else \"volume\").lower()\n", + "RAW_EVENTS_TABLE = dbutils.widgets.get(\"RAW_EVENTS_TABLE\") if dbutils.widgets.get(\"RAW_EVENTS_TABLE\") else \"events_ingest\"\n", + "ZEROBUS_REGION = dbutils.widgets.get(\"ZEROBUS_REGION\") if dbutils.widgets.get(\"ZEROBUS_REGION\") else \"\"\n", + "ZEROBUS_ENDPOINT = dbutils.widgets.get(\"ZEROBUS_ENDPOINT\") if dbutils.widgets.get(\"ZEROBUS_ENDPOINT\") else \"\"\n", + "ZEROBUS_BATCH_SIZE = dbutils.widgets.get(\"ZEROBUS_BATCH_SIZE\") if dbutils.widgets.get(\"ZEROBUS_BATCH_SIZE\") else \"1000\"\n", + "ZEROBUS_FLUSH_EVERY_BATCHES = dbutils.widgets.get(\"ZEROBUS_FLUSH_EVERY_BATCHES\") if dbutils.widgets.get(\"ZEROBUS_FLUSH_EVERY_BATCHES\") else \"10\"\n", + "\n", + "if INGEST_MODE not in {\"volume\", \"zerobus\"}:\n", + " raise ValueError(f\"Unsupported INGEST_MODE={INGEST_MODE}. Expected 'volume' or 'zerobus'.\")\n", + "\n", + "def _safe_suffix(value: str, max_len: int = 40) -> str:\n", + " cleaned = re.sub(r\"[^a-zA-Z0-9_.-]\", \"-\", value).strip(\"-._\").lower()\n", + " cleaned = cleaned[:max_len]\n", + " return cleaned or \"default\"\n", + "\n", + "CATALOG_SUFFIX = _safe_suffix(CATALOG)\n", + "ZEROBUS_SP_DISPLAY_NAME = f\"caspers-zerobus-{CATALOG_SUFFIX}\"\n", + "ZEROBUS_SECRET_SCOPE = f\"caspers-zerobus-{CATALOG_SUFFIX}\"\n", + "ZEROBUS_CLIENT_ID_KEY = \"client-id\"\n", + "ZEROBUS_CLIENT_SECRET_KEY = \"client-secret\"\n", + "\n", + "print(f\"Ingestion mode: {INGEST_MODE}\")\n", + "if INGEST_MODE == \"zerobus\":\n", + " print(f\"Target events table: {CATALOG}.{SIMULATOR_SCHEMA}.{RAW_EVENTS_TABLE}\")\n", + " print(f\"Zerobus region: {ZEROBUS_REGION or '(from endpoint override)'}\")\n", + "print(f\"Auto SP display name: {ZEROBUS_SP_DISPLAY_NAME}\")\n", + "print(f\"Auto secret scope: {ZEROBUS_SECRET_SCOPE}\")\n" ], "execution_count": null, "outputs": [] @@ -58,7 +87,17 @@ "CREATE CATALOG IF NOT EXISTS ${CATALOG};\n", "CREATE SCHEMA IF NOT EXISTS ${CATALOG}.${SIMULATOR_SCHEMA};\n", "CREATE VOLUME IF NOT EXISTS ${CATALOG}.${SIMULATOR_SCHEMA}.${EVENTS_VOLUME};\n", - "CREATE VOLUME IF NOT EXISTS ${CATALOG}.${SIMULATOR_SCHEMA}.misc;" + "CREATE VOLUME IF NOT EXISTS ${CATALOG}.${SIMULATOR_SCHEMA}.misc;\n", + "\n", + "CREATE TABLE IF NOT EXISTS ${CATALOG}.${SIMULATOR_SCHEMA}.${RAW_EVENTS_TABLE} (\n", + " event_id STRING,\n", + " event_type STRING,\n", + " ts STRING,\n", + " location_id INT,\n", + " order_id STRING,\n", + " sequence INT,\n", + " body STRING\n", + ") USING DELTA;" ], "execution_count": null, "outputs": [] @@ -97,7 +136,7 @@ "spark.createDataFrame(pd.read_parquet(\"../data/canonical/canonical_dataset/brand_locations.parquet\")) \\\n", " .write.mode(\"overwrite\").saveAsTable(f\"{CATALOG}.{SIMULATOR_SCHEMA}.brand_locations\")\n", "\n", - "print(\"✅ Dimensional tables created from canonical dataset\")" + "print(\"\u2705 Dimensional tables created from canonical dataset\")" ], "execution_count": null, "outputs": [] @@ -118,6 +157,8 @@ "from databricks.sdk import WorkspaceClient\n", "import databricks.sdk.service.jobs as j\n", "import os\n", + "import json\n", + "import urllib.request\n", "\n", "w = WorkspaceClient()\n", "\n", @@ -131,6 +172,94 @@ "sys.path.append('../utils')\n", "from uc_state import add\n", "\n", + "def ensure_service_principal(display_name: str):\n", + " existing = [sp for sp in w.service_principals.list() if sp.display_name == display_name]\n", + " if existing:\n", + " return existing[0], False\n", + " return w.service_principals.create(display_name=display_name), True\n", + "\n", + "def ensure_secret_scope(scope_name: str):\n", + " try:\n", + " w.secrets.create_scope(scope=scope_name, initial_manage_principal=\"users\")\n", + " return True\n", + " except Exception as e:\n", + " msg = str(e)\n", + " if \"RESOURCE_ALREADY_EXISTS\" in msg:\n", + " return False\n", + " # fallback for runtimes that do not allow users principal\n", + " try:\n", + " w.secrets.create_scope(scope=scope_name)\n", + " return True\n", + " except Exception as e2:\n", + " if \"RESOURCE_ALREADY_EXISTS\" in str(e2):\n", + " return False\n", + " raise\n", + "\n", + "def create_service_principal_secret(service_principal_id: str, lifetime: str = \"31536000s\") -> str:\n", + " if hasattr(w, \"service_principal_secrets_proxy\"):\n", + " resp = w.service_principal_secrets_proxy.create(\n", + " service_principal_id=service_principal_id,\n", + " lifetime=lifetime,\n", + " )\n", + " return resp.secret\n", + "\n", + " # fallback for older SDKs: raw API call\n", + " host = w.config.host\n", + " if not host.startswith(\"http\"):\n", + " host = f\"https://{host}\"\n", + " host = host.rstrip(\"/\")\n", + "\n", + " auth = w.config.authenticate()\n", + " auth_header = auth.get(\"Authorization\") or auth.get(\"authorization\")\n", + " if not auth_header:\n", + " raise ValueError(f\"No authorization header from SDK auth: {auth}\")\n", + "\n", + " workspace_id = str(dbutils.notebook.entry_point.getDbutils().notebook().getContext().workspaceId().get())\n", + " req = urllib.request.Request(\n", + " f\"{host}/api/2.0/accounts/servicePrincipals/{service_principal_id}/credentials/secrets\",\n", + " data=json.dumps({\"lifetime\": lifetime}).encode(\"utf-8\"),\n", + " method=\"POST\",\n", + " headers={\n", + " \"Authorization\": auth_header,\n", + " \"Content-Type\": \"application/json\",\n", + " \"Accept\": \"application/json\",\n", + " \"X-Databricks-Org-Id\": workspace_id,\n", + " },\n", + " )\n", + " with urllib.request.urlopen(req, timeout=30) as resp:\n", + " payload = json.loads(resp.read().decode(\"utf-8\"))\n", + " return payload.get(\"secret\")\n", + "\n", + "# Ensure SP and secret scope\n", + "sp, created_sp = ensure_service_principal(ZEROBUS_SP_DISPLAY_NAME)\n", + "if not sp.id or not sp.application_id:\n", + " raise ValueError(f\"Service principal missing identifiers: {sp}\")\n", + "\n", + "print(f\"{'\u2705' if created_sp else '\u267b\ufe0f'} Zerobus service principal: id={sp.id}, app_id={sp.application_id}\")\n", + "created_scope = ensure_secret_scope(ZEROBUS_SECRET_SCOPE)\n", + "print(f\"{'\u2705' if created_scope else '\u267b\ufe0f'} Secret scope: {ZEROBUS_SECRET_SCOPE}\")\n", + "\n", + "# Ensure credentials are available in secret scope\n", + "existing_secret_keys = {s.key for s in w.secrets.list_secrets(scope=ZEROBUS_SECRET_SCOPE)}\n", + "w.secrets.put_secret(scope=ZEROBUS_SECRET_SCOPE, key=ZEROBUS_CLIENT_ID_KEY, string_value=sp.application_id)\n", + "\n", + "if ZEROBUS_CLIENT_SECRET_KEY not in existing_secret_keys:\n", + " sp_secret = create_service_principal_secret(str(sp.id))\n", + " if not sp_secret:\n", + " raise ValueError(\"Failed to create service principal secret\")\n", + " w.secrets.put_secret(scope=ZEROBUS_SECRET_SCOPE, key=ZEROBUS_CLIENT_SECRET_KEY, string_value=sp_secret)\n", + " print(\"\u2705 Created and stored Zerobus service principal secret\")\n", + "else:\n", + " print(\"\u267b\ufe0f Reusing existing Zerobus service principal secret\")\n", + "\n", + "# Ensure SP has required UC permissions\n", + "sp_principal = sp.application_id\n", + "spark.sql(f\"GRANT USE CATALOG ON CATALOG `{CATALOG}` TO `{sp_principal}`\")\n", + "spark.sql(f\"GRANT USE SCHEMA ON SCHEMA `{CATALOG}`.`{SIMULATOR_SCHEMA}` TO `{sp_principal}`\")\n", + "spark.sql(f\"GRANT SELECT, MODIFY ON TABLE `{CATALOG}`.`{SIMULATOR_SCHEMA}`.`{RAW_EVENTS_TABLE}` TO `{sp_principal}`\")\n", + "print(f\"\u2705 Granted Zerobus table permissions to {sp_principal}\")\n", + "\n", + "# Schedule replay job\n", "job_name = f\"Canonical Data Replay ({CATALOG})\"\n", "schedule_minutes = int(SCHEDULE_MINUTES)\n", "cron_expression = f\"0 0/{schedule_minutes} * * * ?\"\n", @@ -146,6 +275,15 @@ " \"SCHEMA\": SIMULATOR_SCHEMA,\n", " \"START_DAY\": START_DAY,\n", " \"SPEED_MULTIPLIER\": SPEED_MULTIPLIER,\n", + " \"INGEST_MODE\": INGEST_MODE,\n", + " \"RAW_EVENTS_TABLE\": RAW_EVENTS_TABLE,\n", + " \"ZEROBUS_REGION\": ZEROBUS_REGION,\n", + " \"ZEROBUS_ENDPOINT\": ZEROBUS_ENDPOINT,\n", + " \"ZEROBUS_SECRET_SCOPE\": ZEROBUS_SECRET_SCOPE,\n", + " \"ZEROBUS_CLIENT_ID_KEY\": ZEROBUS_CLIENT_ID_KEY,\n", + " \"ZEROBUS_CLIENT_SECRET_KEY\": ZEROBUS_CLIENT_SECRET_KEY,\n", + " \"ZEROBUS_BATCH_SIZE\": ZEROBUS_BATCH_SIZE,\n", + " \"ZEROBUS_FLUSH_EVERY_BATCHES\": ZEROBUS_FLUSH_EVERY_BATCHES,\n", " },\n", " )\n", " )\n", @@ -162,17 +300,17 @@ " w.jobs.reset(job_id=job_id, new_settings=j.JobSettings(\n", " name=job_name, tasks=task_def, schedule=schedule_def,\n", " ))\n", - " print(f\"♻️ Updated existing job_id={job_id} for {job_name}\")\n", + " print(f\"\u267b\ufe0f Updated existing job_id={job_id} for {job_name}\")\n", "else:\n", " job = w.jobs.create(name=job_name, tasks=task_def, schedule=schedule_def)\n", " job_id = job.job_id\n", " add(CATALOG, \"jobs\", job)\n", - " print(f\"✅ Created scheduled job_id={job_id} for {job_name}\")\n", + " print(f\"\u2705 Created scheduled job_id={job_id} for {job_name}\")\n", "\n", "print(f\" Schedule: Every {schedule_minutes} minutes\")\n", "\n", "w.jobs.run_now(job_id=job_id)\n", - "print(f\"🚀 Started initial run of job {job_id}\")" + "print(f\"\ud83d\ude80 Started initial run of job {job_id}\")\n" ], "execution_count": null, "outputs": [] @@ -181,11 +319,11 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "##### Blocking cell to wait for some data to arrive at the volume.\n", + "##### Blocking cell to wait for source data to become available.\n", "\n", "The lakeflow declarative pipeline that comes next infers the schema from existing data.\n", "\n", - "Lakeflow Jobs doesn't have a file arrival trigger at the task level (yet?)" + "For volume mode, we wait for file arrival. For Zerobus mode, we wait for at least one row in the target Delta table." ] }, { @@ -194,29 +332,35 @@ "source": [ "import time\n", "\n", - "# Construct the path to the volume where JSONs will arrive\n", "volume_path = f\"/Volumes/{CATALOG}/{SIMULATOR_SCHEMA}/{EVENTS_VOLUME}\"\n", + "raw_events_table = f\"{CATALOG}.{SIMULATOR_SCHEMA}.{RAW_EVENTS_TABLE}\"\n", "\n", - "def wait_for_data(path, timeout=300, poll_interval=5):\n", - " \"\"\"\n", - " Wait until at least one file appears in the given path.\n", - " Args:\n", - " path (str): The directory to watch.\n", - " timeout (int): Maximum seconds to wait.\n", - " poll_interval (int): Seconds between checks.\n", - " Raises:\n", - " TimeoutError: If no file appears within the timeout.\n", - " \"\"\"\n", + "def wait_for_volume_data(path, timeout=300, poll_interval=5):\n", " start = time.time()\n", " while time.time() - start < timeout:\n", " files = dbutils.fs.ls(path)\n", - " if any(f.size > 0 for f in files if not f.path.endswith('/')):\n", - " print(\"✅ Data arrived. Safe to proceed.\")\n", + " if any(f.size > 0 for f in files if not f.path.endswith(\"/\")):\n", + " print(\"\u2705 Volume data arrived. Safe to proceed.\")\n", " return\n", " time.sleep(poll_interval)\n", " raise TimeoutError(f\"No data found in {path} after {timeout} seconds.\")\n", "\n", - "wait_for_data(volume_path)" + "def wait_for_table_data(table_name, timeout=300, poll_interval=5):\n", + " start = time.time()\n", + " while time.time() - start < timeout:\n", + " try:\n", + " if spark.sql(f\"SELECT 1 FROM {table_name} LIMIT 1\").count() > 0:\n", + " print(\"\u2705 Zerobus table has data. Safe to proceed.\")\n", + " return\n", + " except Exception:\n", + " pass\n", + " time.sleep(poll_interval)\n", + " raise TimeoutError(f\"No rows found in {table_name} after {timeout} seconds.\")\n", + "\n", + "if INGEST_MODE == \"zerobus\":\n", + " wait_for_table_data(raw_events_table)\n", + "else:\n", + " wait_for_volume_data(volume_path)\n" ], "execution_count": null, "outputs": [] diff --git a/stages/lakeflow.ipynb b/stages/lakeflow.ipynb index e5b78a4..0126400 100644 --- a/stages/lakeflow.ipynb +++ b/stages/lakeflow.ipynb @@ -54,11 +54,13 @@ "EVENTS_VOLUME = dbutils.widgets.get(\"EVENTS_VOLUME\")\n", "SIMULATOR_SCHEMA = dbutils.widgets.get(\"SIMULATOR_SCHEMA\")\n", "PIPELINE_SCHEDULE_MINUTES = int(dbutils.widgets.get(\"PIPELINE_SCHEDULE_MINUTES\"))\n", + "RAW_EVENTS_TABLE = dbutils.widgets.get(\"RAW_EVENTS_TABLE\") if dbutils.widgets.get(\"RAW_EVENTS_TABLE\") else \"events_ingest\"\n", "\n", "# 0 = continuous mode, N > 0 = triggered mode with schedule every N minutes\n", "continuous_mode = (PIPELINE_SCHEDULE_MINUTES == 0)\n", "\n", - "print(f\"Pipeline mode: {'Continuous' if continuous_mode else f'Triggered (every {PIPELINE_SCHEDULE_MINUTES} minutes)'}\")" + "print(f\"Pipeline mode: {'Continuous' if continuous_mode else f'Triggered (every {PIPELINE_SCHEDULE_MINUTES} minutes)'}\")\n", + "print(f\"Raw events table: {CATALOG}.{SIMULATOR_SCHEMA}.{RAW_EVENTS_TABLE}\")\n" ], "execution_count": null, "outputs": [] @@ -101,6 +103,7 @@ " \"RAW_DATA_CATALOG\": CATALOG,\n", " \"RAW_DATA_SCHEMA\": SIMULATOR_SCHEMA,\n", " \"RAW_DATA_VOLUME\": EVENTS_VOLUME,\n", + " \"RAW_DATA_TABLE\": RAW_EVENTS_TABLE,\n", " },\n", " root_path=root_dbx_path,\n", " libraries=[p.PipelineLibrary(glob=p.PathPattern(include=f\"{root_dbx_path}/**\"))],\n", @@ -114,7 +117,7 @@ "if existing_pipelines:\n", " pipeline_id = existing_pipelines[0].pipeline_id\n", " w.pipelines.update(pipeline_id=pipeline_id, **pipeline_config)\n", - " print(f\"♻️ Updated existing pipeline: {pipeline_id}\")\n", + " print(f\"\u267b\ufe0f Updated existing pipeline: {pipeline_id}\")\n", "else:\n", " created = w.pipelines.create(**pipeline_config)\n", " pipeline_id = created.pipeline_id\n", @@ -122,7 +125,7 @@ " sys.path.append('../utils')\n", " from uc_state import add\n", " add(CATALOG, \"pipelines\", created)\n", - " print(f\"✅ Created pipeline: {pipeline_id}\")\n", + " print(f\"\u2705 Created pipeline: {pipeline_id}\")\n", "\n", "if not continuous_mode:\n", " import databricks.sdk.service.jobs as j\n", @@ -146,7 +149,7 @@ " w.jobs.reset(job_id=job_id, new_settings=j.JobSettings(\n", " name=job_name, tasks=task_def, schedule=schedule_def,\n", " ))\n", - " print(f\"♻️ Updated existing scheduler job: {job_id}\")\n", + " print(f\"\u267b\ufe0f Updated existing scheduler job: {job_id}\")\n", " else:\n", " pipeline_job = w.jobs.create(name=job_name, tasks=task_def, schedule=schedule_def)\n", " job_id = pipeline_job.job_id\n", @@ -154,10 +157,10 @@ " sys.path.append('../utils')\n", " from uc_state import add\n", " add(CATALOG, \"jobs\", pipeline_job)\n", - " print(f\"✅ Created scheduler job: {job_id}\")\n", + " print(f\"\u2705 Created scheduler job: {job_id}\")\n", "\n", " w.jobs.run_now(job_id=job_id)\n", - " print(f\"🚀 Started initial pipeline run\")" + " print(f\"\ud83d\ude80 Started initial pipeline run\")\n" ], "execution_count": null, "outputs": [] @@ -204,7 +207,7 @@ } }, "source": [ - "print(f\"✅ Lakeflow stage complete (pipeline_id={pipeline_id})\")" + "print(f\"\u2705 Lakeflow stage complete (pipeline_id={pipeline_id})\")" ], "execution_count": null, "outputs": [] From 0ee11466004be17a9be6b92ea9bf200b228889aa Mon Sep 17 00:00:00 2001 From: Nick Karpov Date: Tue, 3 Mar 2026 12:01:26 -0800 Subject: [PATCH 2/2] feat(pipeline): split all_events into bronze views + exclude caspersai-terminal artifacts from sync - refactor bronze layer: replace single all_events table with two @dlt.view sources (bronze_volume_events, bronze_zerobus_events) feeding all_events table, making both ingestion paths visible in the pipeline DAG - exclude apps/caspersai-terminal/node_modules, logs, and test from bundle sync Co-Authored-By: Claude Sonnet 4.6 --- databricks.yml | 3 ++ .../transformations/transformation.py | 34 +++++++++++++------ 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/databricks.yml b/databricks.yml index 6d054c8..7edaec1 100644 --- a/databricks.yml +++ b/databricks.yml @@ -28,6 +28,9 @@ sync: - "*.pyc" - images/** - docs/** + - apps/caspersai-terminal/node_modules/** + - apps/caspersai-terminal/logs/** + - apps/caspersai-terminal/test/** scripts: cleanup: diff --git a/pipelines/order_items/transformations/transformation.py b/pipelines/order_items/transformations/transformation.py index 81dff6f..c98bca1 100644 --- a/pipelines/order_items/transformations/transformation.py +++ b/pipelines/order_items/transformations/transformation.py @@ -9,18 +9,17 @@ ) # ────────────────────────────────────────────────────────────── -# 0. Bronze – raw event stream +# 0. Bronze – raw event sources # ────────────────────────────────────────────────────────────── -@dlt.table( - comment = "Raw event stream from both volume JSON files and Zerobus Delta table." +@dlt.view( + comment = "Raw event stream from Volume JSON files (canonical / Autoloader)." ) -def all_events(): +def bronze_volume_events(): CATALOG = spark.conf.get("RAW_DATA_CATALOG") - SCHEMA = spark.conf.get("RAW_DATA_SCHEMA") - VOLUME = spark.conf.get("RAW_DATA_VOLUME") - TABLE = spark.conf.get("RAW_DATA_TABLE", "events_ingest") + SCHEMA = spark.conf.get("RAW_DATA_SCHEMA") + VOLUME = spark.conf.get("RAW_DATA_VOLUME") - volume_events = ( + return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load(f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}") @@ -35,7 +34,15 @@ def all_events(): ) ) - zerobus_events = ( +@dlt.view( + comment = "Raw event stream from Zerobus Delta table (events_ingest)." +) +def bronze_zerobus_events(): + CATALOG = spark.conf.get("RAW_DATA_CATALOG") + SCHEMA = spark.conf.get("RAW_DATA_SCHEMA") + TABLE = spark.conf.get("RAW_DATA_TABLE", "events_ingest") + + return ( spark.readStream.table(f"{CATALOG}.{SCHEMA}.{TABLE}") .select( F.col("event_id").cast("string").alias("event_id"), @@ -48,7 +55,14 @@ def all_events(): ) ) - return volume_events.unionByName(zerobus_events) +@dlt.table( + comment = "Union of all raw event streams — volume (JSON) and Zerobus (Delta)." +) +def all_events(): + return ( + dlt.read_stream("bronze_volume_events") + .unionByName(dlt.read_stream("bronze_zerobus_events")) + ) # ────────────────────────────────────────────────────────────── # 1. Silver – explode order items, add extended_price