Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/data-analytics-demo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ dashboard:
$(PYTHON) -m data_analytics_demo.dashboard.render

semantic-validate:
@echo "[semantic-validate] TODO T-10: MetricFlow validation not yet implemented"
@exit 1
$(PYTHON) -m data_analytics_demo.semantic.validator

demo: data dbt ml narrative dashboard
@echo "[demo] full pipeline OK"
Expand Down
3 changes: 2 additions & 1 deletion packages/data-analytics-demo/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dev = [
"ruff>=0.7",
"mypy>=1.13",
"pip-audit>=2.7",
"types-PyYAML>=6.0",
]

[project.scripts]
Expand Down Expand Up @@ -77,7 +78,7 @@ mypy_path = "src"
# but lags behind `pandas` releases; treating these as untyped is the
# pragmatic choice for a Python 3.11 + pandas 3.x stack.
[[tool.mypy.overrides]]
module = ["pandas", "pandas.*", "duckdb", "faker", "shap", "xgboost", "sklearn.*", "plotly", "plotly.*"]
module = ["pandas", "pandas.*", "duckdb", "faker", "shap", "xgboost", "sklearn.*", "plotly", "plotly.*", "yaml"]
ignore_missing_imports = true

[tool.pytest.ini_options]
Expand Down
135 changes: 135 additions & 0 deletions packages/data-analytics-demo/semantic/kpi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# MetricFlow-compatible semantic models + metrics for the data-analytics demo.
#
# This file declares the single source of truth for KPIs that the dashboard
# and any downstream BI tool can reuse. Each metric resolves to a primary
# measure plus a set of dimensions that the metric can be sliced by; the
# schema mirrors MetricFlow 0.21x.
#
# Validation: `make semantic-validate` (Python validator in
# src/data_analytics_demo/semantic/validator.py — checks YAML shape +
# AC-6.2 ≥1 dim + ≥1 measure per metric).

semantic_models:
- name: customers
description: Tenant accounts. Drives signup-month cohort + region slicing.
model: ref('stg_customers')
entities:
- name: customer
type: primary
expr: customer_id
dimensions:
- name: region
type: categorical
- name: plan_tier_at_signup
type: categorical
- name: signup_date
type: time
type_params:
time_granularity: day
measures:
- name: customer_count
description: Number of customers.
agg: count
expr: customer_id

- name: subscriptions
description: Subscription lifecycle facts (active / canceled / paused).
model: ref('stg_subscriptions')
entities:
- name: subscription
type: primary
expr: subscription_id
- name: customer
type: foreign
expr: customer_id
dimensions:
- name: plan_tier
type: categorical
- name: status
type: categorical
- name: start_date
type: time
type_params:
time_granularity: day
measures:
- name: subscription_count
description: Number of subscriptions.
agg: count
expr: subscription_id
- name: monthly_amount
description: Monthly subscription value (USD).
agg: sum
expr: monthly_amount_usd
- name: active_subscription_count
description: Count of subscriptions whose status is active.
agg: sum
expr: case when status = 'active' then 1 else 0 end

- name: invoices
description: Monthly billing records for paid subscriptions.
model: ref('stg_invoices')
entities:
- name: invoice
type: primary
expr: invoice_id
- name: customer
type: foreign
expr: customer_id
- name: subscription
type: foreign
expr: subscription_id
dimensions:
- name: status
type: categorical
- name: period_start
type: time
type_params:
time_granularity: day
measures:
- name: invoice_count
description: Number of invoices.
agg: count
expr: invoice_id
- name: paid_amount_usd
description: Sum of paid invoice amounts.
agg: sum
expr: case when status = 'paid' then amount_usd else 0 end

metrics:
- name: customers
description: Distinct customer count.
type: simple
type_params:
measure: customer_count
dimensions:
- region
- plan_tier_at_signup
- signup_date

- name: active_subscriptions
description: Count of subscriptions whose status is "active" right now.
type: simple
type_params:
measure: active_subscription_count
dimensions:
- plan_tier
- start_date

- name: monthly_recurring_revenue
description: Sum of monthly subscription amounts (US dollars).
type: simple
type_params:
measure: monthly_amount
dimensions:
- plan_tier
- status
- start_date

- name: paid_invoice_volume
description: Sum of paid invoice amounts (US dollars).
type: simple
type_params:
measure: paid_amount_usd
dimensions:
- status
- period_start
12 changes: 12 additions & 0 deletions packages/data-analytics-demo/src/data_analytics_demo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,17 @@ def narrative() -> None:
typer.echo(f"wrote {out}")


@app.command()
def semantic() -> None:
"""Validate the MetricFlow KPI definitions (semantic/kpi.yml)."""
from data_analytics_demo.semantic import validator

report = validator.main()
typer.echo(
f"OK: {report.semantic_model_count} semantic models, "
f"{report.metric_count} metrics"
)


if __name__ == "__main__":
app()
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Semantic-layer validator for `semantic/kpi.yml`.

Checks the MetricFlow-compatible KPI definition file against AC-6.1 and
AC-6.2 (each metric has ≥ 1 dimension and ≥ 1 measure). Independent of
the MetricFlow CLI so the test suite has no CLI-shell dependency.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""KPI YAML validator — enforces the MetricFlow schema invariants we rely on.

Run via `python -m data_analytics_demo.semantic.validator` or
`data-analytics-demo semantic` (the make target proxies through this).
"""

from __future__ import annotations

import sys
from dataclasses import dataclass
from pathlib import Path

import yaml

from ..ml import _io

DEFAULT_KPI_PATH_PARTS = ("semantic", "kpi.yml")


def _emit(msg: str) -> None:
print(f"[semantic] {msg}", file=sys.stderr, flush=True) # noqa: T201


def default_kpi_path() -> Path:
root = _io.package_root()
return root.joinpath(*DEFAULT_KPI_PATH_PARTS)


@dataclass(frozen=True)
class ValidationReport:
semantic_model_count: int
metric_count: int
metric_names: list[str]


class ValidationError(RuntimeError):
"""Raised when the KPI YAML fails an AC-6.x invariant."""


def _require_keys(
name: str,
obj: dict[str, object],
required: tuple[str, ...],
) -> None:
missing = [k for k in required if k not in obj]
if missing:
raise ValidationError(f"{name}: missing required keys {missing}")


def _validate_semantic_model(node: dict[str, object]) -> dict[str, list[str]]:
"""Return the set of dimension / measure names this model exposes."""
_require_keys("semantic_model", node, ("name", "model", "entities", "dimensions", "measures"))
name = str(node["name"])
dims = node.get("dimensions", [])
measures = node.get("measures", [])
if not isinstance(dims, list) or not dims:
raise ValidationError(f"semantic_model {name!r}: needs ≥ 1 dimension")
if not isinstance(measures, list) or not measures:
raise ValidationError(f"semantic_model {name!r}: needs ≥ 1 measure")
return {
"dimensions": [str(d["name"]) for d in dims if isinstance(d, dict) and "name" in d],
"measures": [str(m["name"]) for m in measures if isinstance(m, dict) and "name" in m],
}


def _validate_metric(
node: dict[str, object],
all_dims: set[str],
all_measures: set[str],
) -> str:
_require_keys("metric", node, ("name", "type", "type_params", "dimensions"))
name = str(node["name"])
tp = node.get("type_params", {})
if not isinstance(tp, dict) or "measure" not in tp:
raise ValidationError(f"metric {name!r}: type_params.measure is required")
measure_ref = str(tp["measure"])
if measure_ref not in all_measures:
raise ValidationError(
f"metric {name!r}: references unknown measure {measure_ref!r}"
)
dims = node.get("dimensions", [])
if not isinstance(dims, list) or not dims:
raise ValidationError(f"metric {name!r}: needs ≥ 1 dimension (AC-6.2)")
for d in dims:
if str(d) not in all_dims:
raise ValidationError(
f"metric {name!r}: references unknown dimension {d!r}"
)
return name


def validate(path: Path | None = None) -> ValidationReport:
kpi_path = path or default_kpi_path()
if not kpi_path.exists():
raise FileNotFoundError(f"kpi.yml not found at {kpi_path}")

_emit(f"loading {kpi_path}")
text = kpi_path.read_text(encoding="utf-8")
try:
doc = yaml.safe_load(text)
except yaml.YAMLError as exc:
raise ValidationError(f"YAML parse error: {exc}") from exc

if not isinstance(doc, dict):
raise ValidationError("kpi.yml: top-level must be a mapping")

semantic_models = doc.get("semantic_models", [])
metrics = doc.get("metrics", [])
if not isinstance(semantic_models, list) or not semantic_models:
raise ValidationError("kpi.yml: needs ≥ 1 semantic_model")
if not isinstance(metrics, list) or not metrics:
raise ValidationError("kpi.yml: needs ≥ 1 metric")

all_dims: set[str] = set()
all_measures: set[str] = set()
for sm in semantic_models:
if not isinstance(sm, dict):
raise ValidationError("semantic_models[]: items must be mappings")
exposed = _validate_semantic_model(sm)
all_dims.update(exposed["dimensions"])
all_measures.update(exposed["measures"])

metric_names: list[str] = []
for m in metrics:
if not isinstance(m, dict):
raise ValidationError("metrics[]: items must be mappings")
metric_names.append(_validate_metric(m, all_dims, all_measures))

report = ValidationReport(
semantic_model_count=len(semantic_models),
metric_count=len(metrics),
metric_names=metric_names,
)
_emit(
f"OK — {report.semantic_model_count} semantic models / "
f"{report.metric_count} metrics: {', '.join(report.metric_names)}"
)
return report


def main() -> ValidationReport:
return validate()


if __name__ == "__main__":
main()
Loading
Loading