diff --git a/README.md b/README.md index 00c5ed6..aa276fb 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,17 @@ # 🍔 Casper's Kitchens +[![Python](https://img.shields.io/badge/Python-3.10+-3776AB?style=flat&logo=python&logoColor=white)](https://www.python.org/) +[![PySpark](https://img.shields.io/badge/PySpark-3.5+-E25A1C?style=flat&logo=apache-spark&logoColor=white)](https://spark.apache.org/) +[![Databricks](https://img.shields.io/badge/Databricks-Platform-FF3621?style=flat&logo=databricks&logoColor=white)](https://www.databricks.com/) +[![Delta Lake](https://img.shields.io/badge/Delta_Lake-Latest-00ADD8?style=flat&logo=delta&logoColor=white)](https://delta.io/) +[![MLflow](https://img.shields.io/badge/MLflow-Agent_Tracking-0194E2?style=flat&logo=mlflow&logoColor=white)](https://mlflow.org/) +[![LangChain](https://img.shields.io/badge/LangChain-AI_Agent-121212?style=flat&logo=chainlink&logoColor=white)](https://www.langchain.com/) +[![FastAPI](https://img.shields.io/badge/FastAPI-Web_App-009688?style=flat&logo=fastapi&logoColor=white)](https://fastapi.tiangolo.com/) +[![PostgreSQL](https://img.shields.io/badge/PostgreSQL-Lakebase-4169E1?style=flat&logo=postgresql&logoColor=white)](https://www.postgresql.org/) +[![License](https://img.shields.io/badge/License-Databricks-00A4EF?style=flat)](https://databricks.com/db-license-source) + +--- + Spin up a fully working ghost-kitchen business on Databricks in minutes. Casper's Kitchens is a simulated food-delivery platform that shows off the full power of Databricks: streaming ingestion, Lakeflow Declarative Pipelines, AI/BI Dashboards and Genie, Agent Bricks, and real-time apps backed by Lakebase postgres — all stitched together into one narrative. @@ -25,6 +37,8 @@ Then open Databricks and watch: That's it! Your Casper's Kitchens environment will be up and running. +> 📖 **[View Complete Documentation](./docs/README.md)** - For detailed architecture diagrams, technical reference, and developer guides, see the [docs folder](./docs/). + ## 🏗️ What is Casper's Kitchens? Casper's Kitchens is a fully functional ghost kitchen business running entirely on the Databricks platform. As a ghost kitchen, Casper's operates multiple compact commercial kitchens in shared locations, hosting restaurant vendors as tenants who create digital brands to serve diverse cuisines from single kitchen spaces. @@ -128,3 +142,10 @@ Run `destroy.ipynb` to remove all Casper's Kitchens resources from your workspac | library | description | license | source | |----------------------------------------|-------------------------|------------|-----------------------------------------------------| +| LangChain | AI agent framework | MIT | https://github.com/langchain-ai/langchain | +| FastAPI | Web framework | MIT | https://github.com/tiangolo/fastapi | +| MLflow | Model tracking | Apache 2.0 | https://github.com/mlflow/mlflow | +| SQLAlchemy | Database ORM | MIT | https://github.com/sqlalchemy/sqlalchemy | +| psycopg | PostgreSQL adapter | LGPL-3.0 | https://github.com/psycopg/psycopg | +| Databricks SDK | Databricks API client | Apache 2.0 | https://github.com/databricks/databricks-sdk-py | +| Uvicorn | ASGI server | BSD-3 | https://github.com/encode/uvicorn | diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..45c1b34 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,120 @@ +# Casper's Kitchens - Documentation + +This documentation provides comprehensive guidance for understanding and working with the Casper's Kitchens ghost kitchen data platform. + +## 📋 Documentation Overview + +### 🎯 [Dataflow Architecture Diagram](./dataflow-diagram.md) +Complete visual overview of the data architecture showing: +- Event sources and data ingestion +- Medallion architecture layers (Bronze → Silver → Gold) +- Applications and consumption patterns +- Data lineage and dependencies + +### 🔧 [Technical Reference](./technical-reference.md) +Detailed technical specifications including: +- Complete table schemas and data types +- Transformation logic and SQL implementations +- Configuration parameters and settings + +### 👨‍💻 [Developer Onboarding Guide](./developer-onboarding.md) +Step-by-step guide for new developers covering: +- Architecture overview and key concepts +- Essential files and code walkthrough +- Common development tasks and patterns +- SQL queries for monitoring and validation + +### 🎨 Visual Dataflow Diagrams +Complete dataflow visualization available in multiple formats: +- **[PNG Image](./images/dataflow-diagram.png)** - Standard resolution with dark theme +- **[High-Res PNG](./images/dataflow-diagram-hd.png)** - High resolution for presentations +- **[SVG Vector](./images/dataflow-diagram.svg)** - Scalable vector format +- **[Mermaid Source](./dataflow-diagram.mermaid)** - Source code for modifications + +## 🚀 Quick Navigation + +### For New Developers +1. Start with the [Developer Onboarding Guide](./developer-onboarding.md) +2. Review the [Dataflow Architecture](./dataflow-diagram.md) +3. Reference the [Technical Specifications](./technical-reference.md) as needed + +### For Data Engineers +1. Examine the [Technical Reference](./technical-reference.md) for implementation details +2. Use the [Dataflow Diagram](./dataflow-diagram.md) to understand data lineage +3. Follow the [Developer Guide](./developer-onboarding.md) for common tasks + +### For Architects +1. Review the [Dataflow Architecture](./dataflow-diagram.md) for system design +2. Check the [Technical Reference](./technical-reference.md) for scalability details +3. Use the [Mermaid Diagram](./dataflow-diagram.mermaid) for presentations + +## 🏗️ Architecture Summary + +Casper's Kitchens implements a modern data platform with: + +- **Real-time Event Processing**: CloudFiles streaming from ghost kitchen operations +- **Medallion Architecture**: Bronze → Silver → Gold data layers with Delta Live Tables +- **Streaming Intelligence**: ML-powered refund recommendations using LLMs +- **Operational Applications**: FastAPI web apps backed by Lakebase PostgreSQL +- **Business Intelligence**: Real-time dashboards and analytics + +## 📊 Key Components + +| Component | Purpose | Technology | +|-----------|---------|------------| +| Event Sources | Ghost kitchen operations | JSON events, GPS tracking | +| Bronze Layer | Raw event storage | Delta Live Tables, CloudFiles | +| Silver Layer | Clean operational data | Spark streaming, schema enforcement | +| Gold Layer | Business intelligence | Aggregations, time-series data | +| Streaming ML | Real-time recommendations | LLM integration, Spark streaming | +| Lakebase | Operational database | PostgreSQL, continuous sync | +| Applications | Human interfaces | FastAPI, React, REST APIs | + +## 🔄 Data Flow Pattern + +``` +Ghost Kitchens → Events → Volume → Bronze → Silver → Gold → Apps + ↓ + Dimensional Data (Parquet) + ↓ + Streaming Intelligence (ML) + ↓ + Lakebase (PostgreSQL) +``` + +## 📈 Business Metrics + +The platform tracks key business metrics including: + +- **Order Performance**: Revenue, item counts, delivery times +- **Brand Analytics**: Sales by brand, menu performance +- **Location Intelligence**: Hourly performance by ghost kitchen +- **Operational Efficiency**: Refund rates, customer satisfaction +- **Real-time Monitoring**: Live order tracking, driver performance + +## 🛠️ Development Workflow + +1. **Understand**: Review architecture and data flow +2. **Explore**: Examine key code files and notebooks +3. **Develop**: Make changes to transformations or applications +4. **Test**: Validate using SQL queries and application UI +5. **Deploy**: Use pipeline orchestration for production changes +6. **Monitor**: Track performance and data quality + +## 📚 Additional Resources + +- **Main README**: `../README.md` - Project overview and quick start +- **Code Examples**: All notebooks include detailed comments +- **Configuration**: `../data/generator/configs/` - Simulation parameters +- **Applications**: `../apps/` - Web application source code +- **Pipelines**: `../pipelines/` - Data transformation logic + +## 🤝 Contributing + +When contributing to the documentation: + +1. Keep diagrams and technical details in sync with code changes +2. Update the developer onboarding guide for new features +3. Maintain consistency in terminology and formatting +4. Test all code examples and SQL queries +5. Update the visual diagram when architecture changes diff --git a/docs/dataflow-diagram.md b/docs/dataflow-diagram.md new file mode 100644 index 0000000..8c8d744 --- /dev/null +++ b/docs/dataflow-diagram.md @@ -0,0 +1,226 @@ +# Casper's Kitchens - Dataflow Architecture Diagram + +## Overview + +This document provides a comprehensive view of the Casper's Kitchens data architecture, showing how data flows from ghost kitchen operations through the medallion architecture to applications and dashboards. + +## Visual Dataflow Diagram + +![Casper's Kitchens Dataflow Architecture](./images/dataflow-diagram.png) + +*Complete dataflow architecture showing event sources, medallion layers, streaming intelligence, and applications* + +> **Note**: For high-resolution version suitable for presentations, see [dataflow-diagram-hd.png](./images/dataflow-diagram-hd.png). Vector version available as [dataflow-diagram.svg](./images/dataflow-diagram.svg). + +## Architecture Layers + +### Event Sources Layer +The system ingests real-time events from ghost kitchen operations: + +- **Order Creation**: Customer app generates order events +- **Kitchen Events**: Cooking status updates (started, finished, ready) +- **Driver Events**: Pickup, delivery, and GPS tracking events +- **GPS Tracking**: Real-time location updates during delivery + +### Raw Data Ingestion Layer +Events are captured and stored in raw format: + +- **Volume Storage**: `/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}` +- **Event Types**: 7 distinct event types covering full order lifecycle +- **Format**: JSON files streamed via CloudFiles +- **Frequency**: Real-time streaming with configurable batch processing + +### Bronze Layer - Raw Event Store +Raw events are ingested into the lakehouse: + +```sql +-- Table: all_events +-- Purpose: Raw JSON events as ingested (one file per event) +-- Source: CloudFiles streaming from volumes +-- Schema: Raw JSON with event metadata +``` + +**Key Fields**: +- `event_type`: Type of event (order_created, gk_started, etc.) +- `order_id`: Unique order identifier +- `ts`: Event timestamp +- `body`: JSON payload with event-specific data +- `location`: Ghost kitchen location +- `gk_id`: Ghost kitchen identifier + +### Silver Layer - Clean Operational Data +Events are processed and normalized: + +```sql +-- Table: silver_order_items +-- Purpose: One row per item per order, with extended_price +-- Partitioned by: order_day +-- Processing: Explodes order items, adds calculated fields +``` + +**Key Transformations**: +- Explode order items from arrays +- Calculate `extended_price = price * qty` +- Parse customer location data +- Add temporal partitioning +- Enforce data types and schemas + +**Key Fields**: +- `order_id`, `gk_id`, `location` +- `order_ts`: Canonical event timestamp +- `item_id`, `menu_id`, `category_id`, `brand_id` +- `item_name`, `price`, `qty`, `extended_price` +- `order_day`: Partition key + +### Gold Layer - Business Intelligence +Aggregated tables for analytics and reporting: + +#### gold_order_header +```sql +-- Purpose: Per-order revenue & counts +-- Aggregation: Group by order +-- Metrics: Total revenue, item counts, brand diversity +``` + +#### gold_item_sales_day +```sql +-- Purpose: Item-level units & revenue by day +-- Partitioned by: day +-- Metrics: Units sold, gross revenue per item +``` + +#### gold_brand_sales_day +```sql +-- Purpose: Brand-level orders (approx), items, revenue by day +-- Partitioned by: day +-- Processing: Stream-safe with HyperLogLog for order counting +-- Watermark: 3 hours for late-arriving data +``` + +#### gold_location_sales_hourly +```sql +-- Purpose: Hourly orders (approx) & revenue per location +-- Partitioned by: hour_ts +-- Frequency: Real-time with 3-hour watermark +-- Metrics: Approximate order counts, revenue by location/hour +``` + +### Dimensional Data +Static reference data loaded from parquet files: + +- **brands.parquet** → `{CATALOG}.{SIMULATOR_SCHEMA}.brands` +- **categories.parquet** → `{CATALOG}.{SIMULATOR_SCHEMA}.categories` +- **items.parquet** → `{CATALOG}.{SIMULATOR_SCHEMA}.items` +- **menus.parquet** → `{CATALOG}.{SIMULATOR_SCHEMA}.menus` + +### Real-time Streaming Intelligence + +#### Refund Recommender Stream +```sql +-- Source: {CATALOG}.lakeflow.all_events +-- Filter: event_type = 'delivered' +-- Processing: ML-based refund scoring using LLM +-- Output: {CATALOG}.recommender.refund_recommendations +``` + +**Processing Logic**: +- Filters delivered orders +- Applies sampling (10% historical, 100% new data) +- Calls LLM agent for refund classification +- Outputs structured recommendations + +### Lakebase Integration +PostgreSQL instance for operational applications: + +- **Instance**: `{CATALOG}refundmanager` +- **Database**: `caspers` +- **Synced Table**: `pg_recommendations` +- **Sync Policy**: Continuous from `refund_recommendations` +- **Primary Key**: `order_id` + +### Applications Layer + +#### Refund Manager App +FastAPI application for human review: + +- **Database**: PostgreSQL via Lakebase +- **Tables**: + - `refunds.refund_decisions` (decisions made by humans) + - `recommender.pg_recommendations` (AI recommendations) +- **Features**: + - View AI recommendations + - Apply refund decisions + - Track decision history + - Order event timeline + +#### AI/BI Dashboards +Real-time analytics and monitoring: + +- **Data Sources**: Gold layer tables +- **Metrics**: Revenue, order volumes, delivery performance +- **Refresh**: Real-time streaming updates + +#### Agent Bricks +AI-powered refund decision agent: + +- **Model**: LLM-based classification +- **Input**: Order delivery performance data +- **Output**: Refund recommendations (none/partial/full) +- **Integration**: Embedded in streaming pipeline + +## Data Flow Summary + +``` +Event Sources → Raw Volume → Bronze (all_events) → Silver (silver_order_items) → Gold Tables + ↓ +Dimensional Data ────────────────────────────────────────────────────────────→ Applications + ↓ +Streaming Intelligence ←─────────────────────────────────────────────────────→ Lakebase +``` + +## Key Technical Details + +### Streaming Configuration +- **Watermarks**: 3 hours for late-arriving data +- **Checkpointing**: Managed by Delta Live Tables +- **Partitioning**: By date/hour for optimal query performance +- **Approximate Aggregations**: HyperLogLog for stream-safe distinct counts + +### Data Quality +- **Schema Enforcement**: Structured schemas for all silver/gold tables +- **Data Validation**: Check constraints on critical fields +- **Error Handling**: Robust JSON parsing with fallback values + +### Scalability +- **Partitioning Strategy**: Time-based partitioning for all fact tables +- **Streaming**: Auto-scaling with Delta Live Tables +- **Storage**: Delta format with optimized file sizes + +## Developer Onboarding + +### Key Files to Understand +1. `pipelines/order_items/transformations/transformation.py` - Core data transformations +2. `stages/raw_data.ipynb` - Data generation and ingestion setup +3. `stages/lakeflow.ipynb` - Pipeline orchestration +4. `apps/refund-manager/app/main.py` - Application layer + +### Getting Started +1. Review the event types and their schemas in the README +2. Understand the medallion architecture layers +3. Examine the transformation logic in `transformation.py` +4. Explore the streaming components and applications +5. Run the demo using the "Casper's Initializer" job + +### Common Queries +```sql +-- View recent orders +SELECT * FROM {CATALOG}.lakeflow.silver_order_items +WHERE order_day >= CURRENT_DATE - 1; + +-- Check gold layer metrics +SELECT * FROM {CATALOG}.lakeflow.gold_brand_sales_day +WHERE day = CURRENT_DATE; + +-- Monitor streaming health +DESCRIBE HISTORY {CATALOG}.lakeflow.all_events; +``` diff --git a/docs/dataflow-diagram.mermaid b/docs/dataflow-diagram.mermaid new file mode 100644 index 0000000..40c8282 --- /dev/null +++ b/docs/dataflow-diagram.mermaid @@ -0,0 +1,140 @@ +graph TB + %% Event Sources Layer + subgraph "Event Sources" + OC[Order Creation
Customer App] + KE[Kitchen Events
gk_started, gk_finished, gk_ready] + DE[Driver Events
driver_arrived, driver_picked_up] + GPS[GPS Tracking
driver_ping, delivered] + end + + %% Raw Data Ingestion + subgraph "Raw Data Ingestion" + VOL["/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}
JSON Event Files
Real-time Streaming"] + CONFIG["Configuration Files
data/generator/configs/
sanfrancisco.json"] + DIM["Dimensional Data
data/dimensional/
brands.parquet
categories.parquet
items.parquet
menus.parquet"] + end + + %% Bronze Layer + subgraph "Bronze Layer - Raw Event Store" + direction TB + AE["all_events
Raw JSON events as ingested
• event_type, order_id, ts
• body (JSON payload)
• location, gk_id
CloudFiles Streaming"] + end + + %% Silver Layer + subgraph "Silver Layer - Clean Operational Data" + direction TB + SOI["silver_order_items
One row per item per order
• Exploded items with extended_price
• order_id, gk_id, location
• item details + price * qty
• Partitioned by order_day
• Watermark: order_ts"] + end + + %% Gold Layer + subgraph "Gold Layer - Business Intelligence" + direction TB + GOH["gold_order_header
Per-order revenue & counts
• order_revenue, total_qty
• total_items, brands_in_order
• Grouped by order"] + + GISD["gold_item_sales_day
Item-level sales by day
• units_sold, gross_revenue
• Partitioned by day
• Grouped by item + day"] + + GBSD["gold_brand_sales_day
Brand-level sales by day
• approx_count_distinct orders
• items_sold, brand_revenue
• HyperLogLog for streaming
• Watermark: 3 hours"] + + GLSH["gold_location_sales_hourly
Hourly location performance
• approx orders, revenue
• Partitioned by hour_ts
• Watermark: 3 hours"] + end + + %% Dimensional Tables + subgraph "Dimensional Tables" + direction TB + BRANDS["{CATALOG}.{SIMULATOR_SCHEMA}.brands
Brand reference data"] + CATEGORIES["{CATALOG}.{SIMULATOR_SCHEMA}.categories
Category reference data"] + ITEMS["{CATALOG}.{SIMULATOR_SCHEMA}.items
Item reference data"] + MENUS["{CATALOG}.{SIMULATOR_SCHEMA}.menus
Menu reference data"] + end + + %% Streaming Intelligence + subgraph "Real-time Streaming Intelligence" + direction TB + RRS["Refund Recommender Stream
Processes 'delivered' events
• Filters: event_type = 'delivered'
• Sampling: 10% historical, 100% new
• LLM-based refund classification
• Output: structured recommendations"] + + RRT["{CATALOG}.recommender.refund_recommendations
ML-generated refund suggestions
• order_id, ts, agent_response
• Refund class: none/partial/full
• Refund amount in USD"] + end + + %% Lakebase Integration + subgraph "Lakebase Integration" + direction TB + LB["PostgreSQL Instance
{CATALOG}refundmanager
Database: caspers"] + + ST["Synced Table
pg_recommendations
• Continuous sync from refund_recommendations
• Primary key: order_id
• Real-time operational queries"] + end + + %% Applications Layer + subgraph "Applications & Consumption" + direction TB + RMA["Refund Manager App
FastAPI Application
• Human refund review
• Decision tracking
• Order event timeline
• PostgreSQL backend"] + + DASH["AI/BI Dashboards
Real-time Analytics
• Revenue metrics
• Order volumes
• Delivery performance
• Streaming updates"] + + AGENT["Agent Bricks
RefundGPT Agent
• LLM-based decisions
• Delivery performance analysis
• Automated recommendations"] + end + + %% State Management + subgraph "State Management" + direction TB + UCState["Unity Catalog State
utils/uc_state/
• Resource tracking
• Deployment state
• Configuration management"] + end + + %% Data Flow Connections + OC --> VOL + KE --> VOL + DE --> VOL + GPS --> VOL + + CONFIG --> VOL + DIM --> BRANDS + DIM --> CATEGORIES + DIM --> ITEMS + DIM --> MENUS + + VOL --> AE + AE --> SOI + + SOI --> GOH + SOI --> GISD + SOI --> GBSD + SOI --> GLSH + + AE --> RRS + RRS --> RRT + + RRT --> LB + LB --> ST + ST --> RMA + + GOH --> DASH + GISD --> DASH + GBSD --> DASH + GLSH --> DASH + + BRANDS --> SOI + CATEGORIES --> SOI + ITEMS --> SOI + MENUS --> SOI + + RRT --> AGENT + AGENT --> RMA + + %% Styling + classDef eventSource fill:#e1f5fe,stroke:#01579b,stroke-width:2px + classDef bronze fill:#fff3e0,stroke:#e65100,stroke-width:2px + classDef silver fill:#f3e5f5,stroke:#4a148c,stroke-width:2px + classDef gold fill:#fff8e1,stroke:#f57f17,stroke-width:2px + classDef streaming fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px + classDef app fill:#fce4ec,stroke:#c2185b,stroke-width:2px + classDef dimensional fill:#f1f8e9,stroke:#558b2f,stroke-width:2px + classDef lakebase fill:#e3f2fd,stroke:#1565c0,stroke-width:2px + + class OC,KE,DE,GPS eventSource + class AE bronze + class SOI silver + class GOH,GISD,GBSD,GLSH gold + class RRS,RRT streaming + class RMA,DASH,AGENT app + class BRANDS,CATEGORIES,ITEMS,MENUS dimensional + class LB,ST lakebase + class VOL,CONFIG,DIM,UCState silver diff --git a/docs/developer-onboarding.md b/docs/developer-onboarding.md new file mode 100644 index 0000000..e6c7ed8 --- /dev/null +++ b/docs/developer-onboarding.md @@ -0,0 +1,331 @@ +# Developer Onboarding Guide - Casper's Kitchens + +## Welcome to Casper's Kitchens! 🍔 + +This guide will help you understand the Ghost Kitchen data architecture and get you productive quickly. Casper's Kitchens is a comprehensive demo of the Databricks platform, showcasing streaming data processing, medallion architecture, AI/ML integration, and real-time applications. + +## Quick Start Checklist + +- [ ] Review the [dataflow diagram](./dataflow-diagram.md) to understand the overall architecture +- [ ] Examine the [technical reference](./technical-reference.md) for detailed schemas and implementations +- [ ] Run through the demo setup process +- [ ] Explore key code files and notebooks +- [ ] Understand the data flow from events to applications + +## Architecture Overview + +Casper's Kitchens simulates a ghost kitchen delivery platform with the following key components: + +### 🏗️ System Layers +1. **Event Sources**: Real-time order lifecycle events from ghost kitchens +2. **Bronze Layer**: Raw event ingestion via CloudFiles streaming +3. **Silver Layer**: Clean, normalized operational data +4. **Gold Layer**: Business intelligence aggregations +5. **Applications**: Real-time apps for operations and analytics + +### 📊 Data Flow Pattern +``` +Ghost Kitchen Events → Volume Storage → Bronze → Silver → Gold → Applications + ↓ + Dimensional Data (Parquet Files) + ↓ + Streaming Intelligence (ML/AI) + ↓ + Lakebase (PostgreSQL) → Web Apps +``` + +## Key Files to Understand + +### 1. Core Data Pipeline +**File**: `pipelines/order_items/transformations/transformation.py` +- **Purpose**: Defines the medallion architecture transformations +- **Key Functions**: + - `all_events()`: Bronze layer raw event ingestion + - `silver_order_items()`: Silver layer item-level processing + - `gold_*()`: Business intelligence aggregations + +**What to Look For**: +- Delta Live Tables decorators (`@dlt.table`) +- Streaming transformations and watermarks +- Schema definitions and data quality rules +- Partitioning strategies + +### 2. Data Generation and Setup +**File**: `stages/raw_data.ipynb` +- **Purpose**: Sets up data generation and dimensional tables +- **Key Sections**: + - Catalog and schema creation + - Dimensional data loading from parquet files + - Event generation configuration + +### 3. Pipeline Orchestration +**File**: `stages/lakeflow.ipynb` +- **Purpose**: Creates and manages the Delta Live Tables pipeline +- **Key Concepts**: + - Pipeline configuration and settings + - Cluster and compute management + - Continuous vs triggered execution + +### 4. Streaming Intelligence +**File**: `jobs/refund_recommender_stream.ipynb` +- **Purpose**: Real-time ML-based refund recommendations +- **Key Components**: + - LLM integration for decision making + - Streaming data processing + - Output to recommendation tables + +### 5. Application Layer +**File**: `apps/refund-manager/app/main.py` +- **Purpose**: FastAPI web application for human review +- **Key Features**: + - REST API endpoints + - PostgreSQL integration via Lakebase + - Human-in-the-loop decision making + +### 6. Lakebase Integration +**File**: `stages/lakebase.ipynb` +- **Purpose**: Sets up PostgreSQL instance and synced tables +- **Key Concepts**: + - Database instance creation + - Continuous sync from lakehouse to PostgreSQL + - Operational data serving + +## Understanding the Data Model + +### Event Types +The system processes 7 types of events in the order lifecycle: + +1. **order_created** - Customer places order +2. **gk_started** - Kitchen begins preparation +3. **gk_finished** - Kitchen completes preparation +4. **gk_ready** - Order ready for pickup +5. **driver_arrived** - Driver arrives at kitchen +6. **driver_picked_up** - Driver collects order +7. **driver_ping** - GPS updates during delivery +8. **delivered** - Order delivered to customer + +### Table Relationships +``` +all_events (Bronze) + ↓ (filter: order_created, explode items) +silver_order_items (Silver) + ↓ (aggregate by order) ↓ (aggregate by item+day) +gold_order_header gold_item_sales_day + ↓ (aggregate by brand+day) ↓ (aggregate by location+hour) +gold_brand_sales_day gold_location_sales_hourly +``` + +### Dimensional Data +Static reference tables loaded from parquet files: +- **brands**: Restaurant brand information +- **categories**: Food category definitions +- **items**: Menu item details with pricing +- **menus**: Menu structure and organization + +## Common Development Tasks + +### 1. Adding New Event Types +To add a new event type to the system: + +1. **Update the generator** (`data/generator/generator.ipynb`): + - Add event generation logic + - Define event schema and timing + +2. **Modify transformations** (`pipelines/order_items/transformations/transformation.py`): + - Add filtering logic for new event type + - Create new silver/gold tables if needed + +3. **Update applications** as needed to consume new data + +### 2. Creating New Gold Tables +To add business intelligence aggregations: + +1. **Define the table** in `transformation.py`: +```python +@dlt.table( + name="gold_my_new_metric", + partition_cols=["day"], + comment="Description of the new metric" +) +def gold_my_new_metric(): + return ( + dlt.read_stream("silver_order_items") + .groupBy("dimension1", "dimension2", "day") + .agg( + F.sum("measure1").alias("total_measure1"), + F.count("*").alias("record_count") + ) + ) +``` + +2. **Consider partitioning** for query performance +3. **Add watermarks** for streaming if needed +4. **Update downstream applications** to consume the new data + +### 3. Modifying the Web Application +The Refund Manager app is a standard FastAPI application: + +1. **Add new endpoints** in `apps/refund-manager/app/main.py` +2. **Update the database schema** if needed (DDL in startup function) +3. **Modify the frontend** (`apps/refund-manager/index.html`) +4. **Test locally** using the development server + +### 4. Configuring Data Generation +Modify `data/generator/configs/sanfrancisco.json` to: +- Change simulation speed (`speed_up`) +- Adjust order volumes (`orders_day_1`, `orders_last`) +- Modify delivery parameters (`radius_mi`, `driver_mph`) +- Add noise for testing (`noise_pct`) + +## Running the Demo + +### Full Demo Setup +1. **Initialize**: Run `init.ipynb` to create the "Casper's Initializer" job +2. **Execute**: Run the job with "Run All" for complete demo +3. **Monitor**: Watch pipelines process data in real-time +4. **Explore**: Access applications and dashboards + +### Selective Stage Execution +You can run individual stages for focused development: +- **Raw Data**: Data generation and dimensional tables +- **Lakeflow**: Medallion architecture pipeline +- **Refund Agent**: ML model training and deployment +- **Refund Stream**: Real-time streaming intelligence +- **Lakebase**: PostgreSQL setup and sync +- **Apps**: Web application deployment + +### Development Workflow +1. **Make changes** to transformation logic or applications +2. **Test locally** using notebook environments +3. **Deploy changes** through the pipeline orchestration +4. **Validate results** using SQL queries or application UI +5. **Monitor performance** using Databricks monitoring tools + +## Useful SQL Queries + +### Monitoring Data Flow +```sql +-- Check recent events +SELECT event_type, COUNT(*) as count, MAX(ts) as latest_event +FROM {CATALOG}.lakeflow.all_events +WHERE ts >= CURRENT_TIMESTAMP - INTERVAL 1 HOUR +GROUP BY event_type; + +-- Monitor silver layer processing +SELECT order_day, COUNT(*) as items, SUM(extended_price) as revenue +FROM {CATALOG}.lakeflow.silver_order_items +WHERE order_day >= CURRENT_DATE - 7 +GROUP BY order_day +ORDER BY order_day DESC; + +-- Check gold layer metrics +SELECT day, SUM(brand_revenue) as total_revenue, SUM(items_sold) as total_items +FROM {CATALOG}.lakeflow.gold_brand_sales_day +WHERE day >= CURRENT_DATE - 7 +GROUP BY day +ORDER BY day DESC; +``` + +### Debugging Streaming Jobs +```sql +-- Check streaming job health +DESCRIBE HISTORY {CATALOG}.lakeflow.all_events; + +-- Monitor pipeline execution +SELECT * FROM system.lakeflow.pipeline_events +WHERE pipeline_name LIKE '%casper%' +ORDER BY timestamp DESC; + +-- Check for processing delays +SELECT + event_type, + MIN(ts) as earliest_event, + MAX(ts) as latest_event, + COUNT(*) as event_count +FROM {CATALOG}.lakeflow.all_events +WHERE ts >= CURRENT_TIMESTAMP - INTERVAL 1 DAY +GROUP BY event_type; +``` + +### Application Data Queries +```sql +-- Check refund recommendations +SELECT + refund_class, + COUNT(*) as count, + AVG(CAST(JSON_EXTRACT(agent_response, '$.refund_usd') AS DOUBLE)) as avg_amount +FROM {CATALOG}.recommender.refund_recommendations +WHERE ts >= CURRENT_DATE - 1 +GROUP BY refund_class; + +-- Monitor human decisions +SELECT + refund_class, + COUNT(*) as decisions, + SUM(amount_usd) as total_amount, + decided_by +FROM refunds.refund_decisions +WHERE decided_ts >= CURRENT_DATE - 7 +GROUP BY refund_class, decided_by; +``` + +## Troubleshooting Common Issues + +### Pipeline Not Processing Data +1. **Check data generation**: Verify events are being written to volumes +2. **Verify pipeline status**: Look for errors in pipeline execution logs +3. **Check permissions**: Ensure proper access to catalogs and schemas +4. **Monitor resource usage**: Verify cluster has sufficient resources + +### Streaming Delays +1. **Check watermarks**: Ensure watermark settings allow for data latency +2. **Monitor checkpoints**: Verify streaming checkpoints are progressing +3. **Review batch sizes**: Adjust trigger intervals if needed +4. **Check for backpressure**: Monitor streaming metrics for bottlenecks + +### Application Connectivity Issues +1. **Verify Lakebase instance**: Check PostgreSQL instance is running +2. **Check sync status**: Ensure synced tables are up to date +3. **Review permissions**: Verify app has proper database access +4. **Test connections**: Use health endpoints to validate connectivity + +### Data Quality Issues +1. **Check schema evolution**: Verify schemas match between layers +2. **Monitor data quality metrics**: Look for parsing errors or null values +3. **Review transformation logic**: Validate business rules and calculations +4. **Check dimensional data**: Ensure reference tables are current + +## Next Steps + +### For Data Engineers +- Explore advanced Delta Live Tables features +- Implement data quality monitoring and alerting +- Optimize streaming performance and resource usage +- Add new business metrics and KPIs + +### For Application Developers +- Extend the Refund Manager with new features +- Build additional applications consuming gold layer data +- Implement real-time dashboards and monitoring +- Add authentication and authorization + +### For Data Scientists +- Enhance the refund recommendation model +- Add new ML use cases (demand forecasting, route optimization) +- Implement A/B testing for model improvements +- Build feature stores for ML workflows + +### For Platform Engineers +- Implement CI/CD for pipeline deployments +- Add comprehensive monitoring and alerting +- Optimize cost and performance across the platform +- Implement disaster recovery and backup strategies + +## Resources and Support + +- **Documentation**: This docs folder contains comprehensive technical references +- **Code Examples**: All notebooks include detailed comments and examples +- **Databricks Documentation**: Official platform documentation and best practices +- **Community**: Databricks community forums and user groups + +Happy coding! 🚀 diff --git a/docs/generate-diagrams.sh b/docs/generate-diagrams.sh new file mode 100755 index 0000000..ff07d0a --- /dev/null +++ b/docs/generate-diagrams.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# generate-diagrams.sh +# Script to generate all dataflow diagram formats with dark theme and black background + +echo "Generating dataflow diagrams with dark theme..." + +# Create images directory if it doesn't exist +mkdir -p images + +# Standard PNG with dark theme +echo "Generating standard PNG..." +mmdc -i dataflow-diagram.mermaid -o images/dataflow-diagram.png -t dark -b black + +# High-resolution PNG with dark theme +echo "Generating high-resolution PNG..." +mmdc -i dataflow-diagram.mermaid -o images/dataflow-diagram-hd.png -t dark -b black -w 2400 -H 1800 + +# SVG vector with dark theme +echo "Generating SVG vector..." +mmdc -i dataflow-diagram.mermaid -o images/dataflow-diagram.svg -t dark -b black + +echo "All diagrams generated successfully in images/!" +echo "" +echo "Generated files:" +ls -la images/dataflow-diagram.* diff --git a/docs/images/dataflow-diagram-hd.png b/docs/images/dataflow-diagram-hd.png new file mode 100644 index 0000000..3f2437f Binary files /dev/null and b/docs/images/dataflow-diagram-hd.png differ diff --git a/docs/images/dataflow-diagram.png b/docs/images/dataflow-diagram.png new file mode 100644 index 0000000..1a886bd Binary files /dev/null and b/docs/images/dataflow-diagram.png differ diff --git a/docs/images/dataflow-diagram.svg b/docs/images/dataflow-diagram.svg new file mode 100644 index 0000000..a842487 --- /dev/null +++ b/docs/images/dataflow-diagram.svg @@ -0,0 +1 @@ +

Applications & Consumption

Lakebase Integration

Real-time Streaming Intelligence

Dimensional Tables

Gold Layer - Business Intelligence

Silver Layer - Clean Operational Data

Bronze Layer - Raw Event Store

Raw Data Ingestion

Event Sources

State Management

Unity Catalog State
utils/uc_state/
• Resource tracking
• Deployment state
• Configuration management

Order Creation
Customer App

Kitchen Events
gk_started, gk_finished, gk_ready

Driver Events
driver_arrived, driver_picked_up

GPS Tracking
driver_ping, delivered

/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}
JSON Event Files
Real-time Streaming

Configuration Files
data/generator/configs/
sanfrancisco.json

Dimensional Data
data/dimensional/
brands.parquet
categories.parquet
items.parquet
menus.parquet

all_events
Raw JSON events as ingested
• event_type, order_id, ts
• body (JSON payload)
• location, gk_id
CloudFiles Streaming

silver_order_items
One row per item per order
• Exploded items with extended_price
• order_id, gk_id, location
• item details + price * qty
• Partitioned by order_day
• Watermark: order_ts

gold_order_header
Per-order revenue & counts
• order_revenue, total_qty
• total_items, brands_in_order
• Grouped by order

gold_item_sales_day
Item-level sales by day
• units_sold, gross_revenue
• Partitioned by day
• Grouped by item + day

gold_brand_sales_day
Brand-level sales by day
• approx_count_distinct orders
• items_sold, brand_revenue
• HyperLogLog for streaming
• Watermark: 3 hours

gold_location_sales_hourly
Hourly location performance
• approx orders, revenue
• Partitioned by hour_ts
• Watermark: 3 hours

{CATALOG}.{SIMULATOR_SCHEMA}.brands
Brand reference data

{CATALOG}.{SIMULATOR_SCHEMA}.categories
Category reference data

{CATALOG}.{SIMULATOR_SCHEMA}.items
Item reference data

{CATALOG}.{SIMULATOR_SCHEMA}.menus
Menu reference data

Refund Recommender Stream
Processes 'delivered' events
• Filters: event_type = 'delivered'
• Sampling: 10% historical, 100% new
• LLM-based refund classification
• Output: structured recommendations

{CATALOG}.recommender.refund_recommendations
ML-generated refund suggestions
• order_id, ts, agent_response
• Refund class: none/partial/full
• Refund amount in USD

PostgreSQL Instance
{CATALOG}refundmanager
Database: caspers

Synced Table
pg_recommendations
• Continuous sync from refund_recommendations
• Primary key: order_id
• Real-time operational queries

Refund Manager App
FastAPI Application
• Human refund review
• Decision tracking
• Order event timeline
• PostgreSQL backend

AI/BI Dashboards
Real-time Analytics
• Revenue metrics
• Order volumes
• Delivery performance
• Streaming updates

Agent Bricks
RefundGPT Agent
• LLM-based decisions
• Delivery performance analysis
• Automated recommendations

\ No newline at end of file diff --git a/docs/mermaid-image-generation.md b/docs/mermaid-image-generation.md new file mode 100644 index 0000000..c2e1679 --- /dev/null +++ b/docs/mermaid-image-generation.md @@ -0,0 +1,161 @@ +# Generating Images from Mermaid Diagrams + +This guide explains how to convert the Mermaid dataflow diagram into various image formats. + +## Available Image Formats + +The dataflow diagram is available in multiple formats in the `docs/images/` directory: + +- **`images/dataflow-diagram.png`** - Standard PNG with dark theme and black background +- **`images/dataflow-diagram-hd.png`** - High-resolution PNG (2400x1800) for presentations +- **`images/dataflow-diagram.svg`** - Scalable vector format (best for web/print) +- **`dataflow-diagram.mermaid`** - Source code for editing + +## Methods to Generate Images + +### 1. Online Mermaid Live Editor (Easiest) + +1. Visit [mermaid.live](https://mermaid.live) +2. Copy contents from `dataflow-diagram.mermaid` +3. Paste into the editor +4. Download as PNG or SVG + +### 2. Mermaid CLI (Automated) + +Install the CLI tool: +```bash +npm install -g @mermaid-js/mermaid-cli +``` + +Generate images with dark theme and black background: +```bash +# Standard PNG with dark theme +mmdc -i docs/dataflow-diagram.mermaid -o docs/images/dataflow-diagram.png -t dark -b black + +# High-resolution PNG with dark theme +mmdc -i docs/dataflow-diagram.mermaid -o docs/images/dataflow-diagram-hd.png -t dark -b black -w 2400 -H 1800 + +# SVG vector format with dark theme +mmdc -i docs/dataflow-diagram.mermaid -o docs/images/dataflow-diagram.svg -t dark -b black +``` + +### 3. VS Code Extension + +1. Install "Mermaid Markdown Syntax Highlighting" extension +2. Open `dataflow-diagram.mermaid` +3. Use Command Palette: "Mermaid: Export Diagram" + +### 4. GitHub/GitLab Integration + +Both platforms render Mermaid diagrams automatically in markdown files: + +```markdown +```mermaid +graph TB + // Your diagram code here +``` +``` + +## CLI Options Reference + +Common `mmdc` command options: + +| Option | Description | Example | +|--------|-------------|---------| +| `-i` | Input file | `-i diagram.mermaid` | +| `-o` | Output file | `-o diagram.png` | +| `-w` | Width in pixels | `-w 2400` | +| `-H` | Height in pixels | `-H 1800` | +| `-t` | Theme | `-t dark` | +| `-b` | Background color | `-b white` | +| `-s` | Scale factor | `-s 2` | + +## Themes Available + +- `default` - Standard Mermaid theme +- `dark` - Dark background theme +- `forest` - Green color scheme +- `neutral` - Minimal colors + +Example with dark theme and black background: +```bash +mmdc -i docs/dataflow-diagram.mermaid -o docs/images/dataflow-diagram-dark.png -t dark -b black +``` + +## Troubleshooting + +### Common Issues + +1. **"mmdc command not found"** + - Install CLI: `npm install -g @mermaid-js/mermaid-cli` + - Check PATH includes npm global bin directory + +2. **Diagram too large/small** + - Adjust width/height: `-w 3000 -H 2000` + - Use scale factor: `-s 1.5` + +3. **Text cut off** + - Increase dimensions + - Simplify node labels + - Use line breaks in text + +4. **Poor image quality** + - Use SVG format for scalability + - Increase resolution for PNG: `-w 3000 -H 2000` + +### Performance Tips + +- Use SVG for web display (smaller file size, scalable) +- Use high-res PNG for presentations and print +- Use standard PNG for documentation and quick sharing + +## Automation Script + +Create a script to generate all formats with dark theme: + +```bash +#!/bin/bash +# generate-diagrams.sh + +echo "Generating dataflow diagrams with dark theme..." + +# Create images directory if it doesn't exist +mkdir -p docs/images + +# Standard PNG with dark theme +mmdc -i docs/dataflow-diagram.mermaid -o docs/images/dataflow-diagram.png -t dark -b black + +# High-resolution PNG with dark theme +mmdc -i docs/dataflow-diagram.mermaid -o docs/images/dataflow-diagram-hd.png -t dark -b black -w 2400 -H 1800 + +# SVG vector with dark theme +mmdc -i docs/dataflow-diagram.mermaid -o docs/images/dataflow-diagram.svg -t dark -b black + +echo "All diagrams generated successfully in docs/images/!" +``` + +Make executable and run: +```bash +cd docs +chmod +x generate-diagrams.sh +./generate-diagrams.sh +``` + +## Integration with Documentation + +The images are automatically referenced in: +- `dataflow-diagram.md` - Main architecture documentation with embedded PNG +- `README.md` - Documentation index with links to all formats +- All images use dark theme with black background for better presentation +- Can be embedded in presentations, wikis, or other documentation + +## Updating Diagrams + +When modifying the Mermaid source: + +1. Edit `dataflow-diagram.mermaid` +2. Run `cd docs && ./generate-diagrams.sh` to regenerate all formats with dark theme +3. Commit both source and generated images in `docs/images/` +4. Update documentation if structure changes significantly + +The automation script ensures consistent dark theme and black background across all formats. diff --git a/docs/technical-reference.md b/docs/technical-reference.md new file mode 100644 index 0000000..4f31f0b --- /dev/null +++ b/docs/technical-reference.md @@ -0,0 +1,483 @@ +# Casper's Kitchens - Technical Reference + +## Table Schemas and Implementation Details + +### Bronze Layer Tables + +#### all_events +**Location**: `{CATALOG}.lakeflow.all_events` +**Type**: Streaming table (Delta Live Tables) +**Source**: CloudFiles streaming from `/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}` + +```python +@dlt.table(comment="Raw JSON events as ingested (one file per event).") +def all_events(): + return ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "json") + .load(f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}") + ) +``` + +**Schema**: +- `event_type`: STRING - Type of event (order_created, gk_started, etc.) +- `order_id`: STRING - Unique order identifier +- `ts`: STRING - Event timestamp (raw) +- `body`: STRING - JSON payload with event-specific data +- `location`: STRING - Ghost kitchen location +- `gk_id`: STRING - Ghost kitchen identifier +- `seq`: INTEGER - Event sequence number + +### Silver Layer Tables + +#### silver_order_items +**Location**: `{CATALOG}.lakeflow.silver_order_items` +**Type**: Streaming table (Delta Live Tables) +**Partitioned by**: `order_day` + +```python +@dlt.table( + name="silver_order_items", + comment="Silver – one row per item per order, with extended_price.", + partition_cols=["order_day"] +) +``` + +**Schema Definition**: +```python +item_schema = StructType([ + StructField("id", IntegerType()), + StructField("category_id", IntegerType()), + StructField("menu_id", IntegerType()), + StructField("brand_id", IntegerType()), + StructField("name", StringType()), + StructField("price", DoubleType()), + StructField("qty", IntegerType()) +]) + +body_schema = ( + StructType() + .add("customer_lat", DoubleType()) + .add("customer_lon", DoubleType()) + .add("customer_addr", StringType()) + .add("items", ArrayType(item_schema)) +) +``` + +**Output Schema**: +- `order_id`: STRING +- `gk_id`: STRING +- `location`: STRING +- `order_ts`: TIMESTAMP - Canonical event time +- `order_day`: DATE - Partition key +- `item_id`: INTEGER +- `menu_id`: INTEGER +- `category_id`: INTEGER +- `brand_id`: INTEGER +- `item_name`: STRING +- `price`: DOUBLE +- `qty`: INTEGER +- `extended_price`: DOUBLE - Calculated as price * qty + +**Key Transformations**: +```python +.filter(F.col("event_type") == "order_created") +.withColumn("event_ts", F.to_timestamp("ts")) +.withColumn("body_obj", F.from_json("body", body_schema)) +.withColumn("item", F.explode("body_obj.items")) +.withColumn("extended_price", F.col("item.price") * F.col("item.qty")) +.withColumn("order_day", F.to_date("event_ts")) +``` + +### Gold Layer Tables + +#### gold_order_header +**Location**: `{CATALOG}.lakeflow.gold_order_header` +**Type**: Streaming table (Delta Live Tables) + +```python +@dlt.table( + name="gold_order_header", + comment="Gold – per-order revenue & counts." +) +``` + +**Aggregation Logic**: +```python +.groupBy("order_id", "gk_id", "location", "order_day") +.agg( + F.sum("extended_price").alias("order_revenue"), + F.sum("qty").alias("total_qty"), + F.count("item_id").alias("total_items"), + F.collect_set("brand_id").alias("brands_in_order") +) +``` + +**Schema**: +- `order_id`: STRING +- `gk_id`: STRING +- `location`: STRING +- `order_day`: DATE +- `order_revenue`: DOUBLE - Total order value +- `total_qty`: LONG - Total items quantity +- `total_items`: LONG - Count of distinct items +- `brands_in_order`: ARRAY - Set of brand IDs in order + +#### gold_item_sales_day +**Location**: `{CATALOG}.lakeflow.gold_item_sales_day` +**Type**: Streaming table (Delta Live Tables) +**Partitioned by**: `day` + +```python +@dlt.table( + name="gold_item_sales_day", + partition_cols=["day"], + comment="Gold – item-level units & revenue by day." +) +``` + +**Schema**: +- `item_id`: INTEGER +- `menu_id`: INTEGER +- `category_id`: INTEGER +- `brand_id`: INTEGER +- `day`: DATE - Partition key +- `units_sold`: LONG - Total quantity sold +- `gross_revenue`: DOUBLE - Total revenue for item + +#### gold_brand_sales_day +**Location**: `{CATALOG}.lakeflow.gold_brand_sales_day` +**Type**: Streaming table (Delta Live Tables) +**Partitioned by**: `day` + +```python +@dlt.table( + name="gold_brand_sales_day", + partition_cols=["day"], + comment="Gold – brand-level orders (approx), items, revenue by day." +) +``` + +**Streaming Configuration**: +```python +.withWatermark("order_ts", "3 hours") +.groupBy("brand_id", F.col("order_day").alias("day")) +.agg( + F.approx_count_distinct("order_id").alias("orders"), + F.sum("qty").alias("items_sold"), + F.sum("extended_price").alias("brand_revenue") +) +``` + +**Schema**: +- `brand_id`: INTEGER +- `day`: DATE - Partition key +- `orders`: LONG - Approximate distinct order count (HyperLogLog) +- `items_sold`: LONG - Total items sold +- `brand_revenue`: DOUBLE - Total brand revenue + +#### gold_location_sales_hourly +**Location**: `{CATALOG}.lakeflow.gold_location_sales_hourly` +**Type**: Streaming table (Delta Live Tables) +**Partitioned by**: `hour_ts` + +```python +@dlt.table( + name="gold_location_sales_hourly", + partition_cols=["hour_ts"], + comment="Gold – hourly orders (approx) & revenue per location." +) +``` + +**Streaming Configuration**: +```python +.withWatermark("order_ts", "3 hours") +.withColumn("hour_ts", F.date_trunc("hour", "order_ts")) +.groupBy("location", "hour_ts") +.agg( + F.approx_count_distinct("order_id").alias("orders"), + F.sum("extended_price").alias("revenue") +) +``` + +**Schema**: +- `location`: STRING +- `hour_ts`: TIMESTAMP - Partition key (truncated to hour) +- `orders`: LONG - Approximate distinct order count +- `revenue`: DOUBLE - Total hourly revenue + +### Dimensional Tables + +#### brands +**Location**: `{CATALOG}.{SIMULATOR_SCHEMA}.brands` +**Source**: `data/dimensional/brands.parquet` + +```python +spark.createDataFrame(pd.read_parquet("../data/dimensional/brands.parquet")) \ + .write.mode("overwrite").saveAsTable(f"{CATALOG}.{SIMULATOR_SCHEMA}.brands") +``` + +#### categories +**Location**: `{CATALOG}.{SIMULATOR_SCHEMA}.categories` +**Source**: `data/dimensional/categories.parquet` + +#### items +**Location**: `{CATALOG}.{SIMULATOR_SCHEMA}.items` +**Source**: `data/dimensional/items.parquet` + +#### menus +**Location**: `{CATALOG}.{SIMULATOR_SCHEMA}.menus` +**Source**: `data/dimensional/menus.parquet` + +### Streaming Intelligence Tables + +#### refund_recommendations +**Location**: `{CATALOG}.recommender.refund_recommendations` +**Type**: Streaming table (Spark Structured Streaming) + +```python +refund_recommendations = spark.readStream.table(f"{CATALOG}.lakeflow.all_events") \ + .filter("event_type = 'delivered'") \ + .filter( + # For historical data (ts < current_time), sample 10% + # For new data (ts >= current_time), process 100% + (F.col("ts") >= current_time) | + ((F.col("ts") < current_time) & (F.rand() < 0.1)) + ) \ + .select( + F.col("order_id"), + F.current_timestamp().alias("ts"), + get_chat_completion_udf(F.col("order_id")).alias("agent_response"), + ) +``` + +**Schema**: +- `order_id`: STRING +- `ts`: TIMESTAMP - Processing timestamp +- `agent_response`: STRING - LLM-generated JSON response + +**Agent Response Format**: +```json +{ + "refund_class": "none|partial|full", + "refund_usd": 0.00, + "reason": "Explanation for refund decision" +} +``` + +### Lakebase Tables + +#### pg_recommendations +**Location**: PostgreSQL in Lakebase instance `{CATALOG}refundmanager` +**Database**: `caspers` +**Sync Source**: `{CATALOG}.recommender.refund_recommendations` + +```python +synced_table = w.database.create_synced_database_table( + SyncedDatabaseTable( + name=f"{CATALOG}.recommender.pg_recommendations", + database_instance_name=instance.name, + logical_database_name="caspers", + spec=SyncedTableSpec( + source_table_full_name=f"{CATALOG}.recommender.refund_recommendations", + primary_key_columns=["order_id"], + scheduling_policy=SyncedTableSchedulingPolicy.CONTINUOUS, + create_database_objects_if_missing=True + ) + ) +) +``` + +#### refund_decisions +**Location**: PostgreSQL in Lakebase instance +**Schema**: `refunds` +**Purpose**: Human refund decisions + +```sql +CREATE TABLE refunds.refund_decisions ( + id BIGSERIAL PRIMARY KEY, + order_id TEXT NOT NULL, + decided_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + amount_usd NUMERIC(10,2) NOT NULL CHECK (amount_usd >= 0), + refund_class TEXT NOT NULL CHECK (refund_class IN ('none','partial','full')), + reason TEXT NOT NULL, + decided_by TEXT, + source_suggestion JSONB +); +CREATE INDEX idx_refund_decisions_order_id ON refunds.refund_decisions(order_id); +``` + +## Event Types and Schemas + +### Order Lifecycle Events + +1. **order_created** + - Customer places order + - Contains: customer location, delivery address, ordered items with quantities + +2. **gk_started** + - Kitchen begins preparing food + - Contains: timestamp when prep begins + +3. **gk_finished** + - Kitchen completes food preparation + - Contains: timestamp when food is ready + +4. **gk_ready** + - Order ready for pickup + - Contains: timestamp when driver can collect + +5. **driver_arrived** + - Driver arrives at kitchen + - Contains: timestamp of driver arrival + +6. **driver_picked_up** + - Driver collects order + - Contains: full GPS route to customer, estimated delivery time + +7. **driver_ping** + - Driver location updates during delivery + - Contains: current GPS coordinates, delivery progress percentage + +8. **delivered** + - Order delivered to customer + - Contains: final delivery location coordinates + +## Configuration Files + +### Generator Configuration +**Location**: `data/generator/configs/sanfrancisco.json` + +```json +{ + "start_days_ago": 3, + "end_days_ahead": 362, + "speed_up": 1, + "orders_day_1": 50, + "orders_last": 1000, + "noise_pct": 10, + "gk_location": "160 Spear St, San Francisco, CA 94105", + "location_name": "sanfrancisco", + "radius_mi": 4, + "driver_mph": 25, + "batch_rows": 10, + "batch_seconds": 1, + "ping_sec": 60, + "random_seed": 72 +} +``` + +**Key Parameters**: +- `speed_up`: Simulation speed multiplier (1x = real-time, 60x = 1 hour per minute) +- `orders_day_1` / `orders_last`: Order volume scaling +- `noise_pct`: Data quality variation percentage +- `radius_mi`: Delivery radius from ghost kitchen +- `driver_mph`: Average driver speed for routing +- `batch_rows` / `batch_seconds`: Event generation batching + +## Application Configurations + +### Refund Manager App +**Location**: `apps/refund-manager/app.yaml` + +```yaml +display_name: "Refund Manager" +description: "Human review interface for AI-generated refund recommendations" +``` + +**Environment Variables**: +- `REFUNDS_SCHEMA`: Schema for refund decisions (default: "refunds") +- `RECS_SCHEMA`: Schema for recommendations (default: "recommender") +- `DEBUG`: Enable debug mode for detailed error responses + +### API Endpoints + +#### GET /api/summary +Returns aggregate statistics: +```json +{ + "recommendations_count": 1250, + "suggestions_by_class": {"none": 800, "partial": 300, "full": 150}, + "suggested_total_usd": 15750.50, + "decisions_count": 450, + "decisions_by_class": {"none": 280, "partial": 120, "full": 50}, + "decided_total_usd": 5250.25, + "pending_count": 800 +} +``` + +#### GET /api/recommendations +Returns paginated recommendations with decisions: +```json +{ + "items": [ + { + "order_id": "order_123", + "ts": "2024-01-15T10:30:00Z", + "suggestion": { + "refund_class": "partial", + "refund_usd": 12.50, + "reason": "Delivery delayed by 25 minutes" + }, + "decision": null, + "status": "pending" + } + ], + "limit": 50, + "offset": 0 +} +``` + +#### POST /api/refunds +Apply human refund decision: +```json +{ + "order_id": "order_123", + "amount_usd": 10.00, + "refund_class": "partial", + "reason": "Approved partial refund for late delivery", + "decided_by": "manager@caspers.com" +} +``` + +#### GET /api/orders/{order_id}/events +Returns complete event timeline for an order: +```json +{ + "order_id": "order_123", + "events": [ + { + "event_type": "order_created", + "ts": "2024-01-15T09:00:00Z", + "location": "160 Spear St, San Francisco, CA 94105" + }, + { + "event_type": "delivered", + "ts": "2024-01-15T10:25:00Z", + "delivery_time_minutes": 85 + } + ] +} +``` + +## Performance and Scaling + +### Streaming Watermarks +- **Purpose**: Handle late-arriving data in streaming aggregations +- **Configuration**: 3 hours for brand and location hourly tables +- **Impact**: Allows events up to 3 hours late to be processed correctly + +### Partitioning Strategy +- **Time-based partitioning**: All fact tables partitioned by date/hour +- **Benefits**: Query pruning, parallel processing, maintenance efficiency +- **Partition columns**: `order_day`, `day`, `hour_ts` + +### Approximate Aggregations +- **HyperLogLog**: Used for distinct count estimates in streaming +- **Trade-off**: ~2% error rate for significant performance improvement +- **Use cases**: Order counts in brand and location aggregations + +### Delta Lake Optimizations +- **Auto-compaction**: Enabled for all streaming tables +- **Z-ordering**: Applied to frequently queried columns +- **Vacuum**: Automated cleanup of old file versions