Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/data-analytics-demo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ dbt:
cd dbt_project && DBT_PROFILES_DIR=. dbt test

ml:
@echo "[ml] TODO T-06/T-07: ML pipelines not yet implemented"
@exit 1
$(PYTHON) -m data_analytics_demo.ml.churn
$(PYTHON) -m data_analytics_demo.ml.upsell

narrative:
@echo "[narrative] TODO T-08: Ollama narrative not yet implemented"
Expand Down
13 changes: 10 additions & 3 deletions packages/data-analytics-demo/src/data_analytics_demo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ def data() -> None:

@app.command()
def ml() -> None:
"""Train churn + upsell models (T-06 / T-07, not yet implemented)."""
typer.echo("[ml] TODO T-06/T-07: ML pipelines not yet implemented", err=True)
sys.exit(1)
"""Train churn + upsell models (writes to ml/artifacts/)."""
from data_analytics_demo.ml import churn as churn_mod
from data_analytics_demo.ml import upsell as upsell_mod

churn_meta = churn_mod.train_and_save()
upsell_meta = upsell_mod.train_and_save()
typer.echo(
f"churn ROC-AUC={churn_meta['metrics']['roc_auc_test']:.4f} "
f"upsell lift@10%={upsell_meta['metrics']['lift_at_top_10pct']:.2f}x"
)


@app.command()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,43 @@ def _generate_events(
subscriptions: pd.DataFrame,
n: int,
) -> pd.DataFrame:
"""Generate `n` events with engineered churn + upsell signals."""
# Active-status customers get more weight; canceled customers see drop-off
# near their end_date (the churn signal).
"""Generate `n` events with engineered churn + upsell signals.

Two engineered patterns the downstream ML pipelines (T-06 / T-07) are
designed to recover:

1. **Churn signal**: customers without any active subscription get
(a) 3× lower event volume overall, and
(b) timestamps biased into the *older* half of the history window,
so their trailing-30d activity is much lower than their lifetime
daily average. The mart's `recent_to_lifetime_ratio` feature
captures this directly.
2. **Upsell signal**: handled separately via EVENT_WEIGHTS_BY_TIER —
higher-tier customers emit more `feature_use_premium` / `_advanced`
events.
"""
customer_ids = customers["customer_id"].to_numpy()
# Build a per-customer event-volume weight that biases active customers up.
is_active = subscriptions.groupby("customer_id")["status"].apply(
lambda s: (s == "active").any()
)
weights = np.array([2.0 if is_active.get(cid, False) else 1.0 for cid in customer_ids])
is_active_arr = np.array([bool(is_active.get(cid, False)) for cid in customer_ids])

# (1a) Stronger volume bias: 4× weight for active vs 1× for churned.
weights = np.where(is_active_arr, 4.0, 1.0)
weights = weights / weights.sum()

chosen_customers = rng.choice(customer_ids, size=n, p=weights)
timestamp_offsets = rng.integers(0, HISTORY_WINDOW_DAYS, size=n)

# (1b) Timestamp bias: for each chosen event, look up whether the
# owning customer is active, and sample the offset accordingly.
active_lookup = dict(zip(customer_ids, is_active_arr, strict=True))
chosen_is_active = np.array([active_lookup[c] for c in chosen_customers])
# Active customers: uniform across the full history window.
# Churned customers: uniform over the *older half* (60 .. HISTORY_WINDOW_DAYS).
active_offsets = rng.integers(0, HISTORY_WINDOW_DAYS, size=n)
churned_offsets = rng.integers(60, HISTORY_WINDOW_DAYS, size=n)
timestamp_offsets = np.where(chosen_is_active, active_offsets, churned_offsets)

timestamps = [
REFERENCE_NOW - timedelta(days=int(d), seconds=int(rng.integers(0, 86400)))
for d in timestamp_offsets
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Machine-learning layer for the customer-analytics demo.

Two pipelines, each reading from a dbt mart and writing artifacts under
`<package-root>/ml/artifacts/`:

- `churn.train_and_save` -> `churn_features` mart -> churn model + SHAP summary
- `upsell.train_and_save` -> `upsell_opportunities` mart -> propensity model + lift report

Both pipelines are deterministic (random_state=42 by default) and fail with
clear error messages when the source mart is missing or empty.
"""
74 changes: 74 additions & 0 deletions packages/data-analytics-demo/src/data_analytics_demo/ml/_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Shared IO helpers for the ML layer.

Both `churn.py` and `upsell.py` read marts from the same DuckDB file and
write artifacts to the same directory; centralising the path math + the
not-empty guard keeps the per-pipeline code focused on the model.
"""

from __future__ import annotations

import sys
from pathlib import Path

import duckdb
import pandas as pd

DEFAULT_RANDOM_STATE = 42


def package_root() -> Path:
"""Resolve the package root (one level above src/data_analytics_demo/)."""
return Path(__file__).resolve().parents[3]


def default_warehouse_path() -> Path:
return package_root() / "warehouse" / "analytics.duckdb"


def default_artifacts_dir() -> Path:
return package_root() / "ml" / "artifacts"


def emit(msg: str) -> None:
"""ML-layer progress emitter — mirrors data/generate._emit format."""
print(f"[ml] {msg}", file=sys.stderr, flush=True) # noqa: T201


def read_mart(mart: str, duckdb_path: Path | None = None) -> pd.DataFrame:
"""Read a dbt mart into a DataFrame.

Raises a clear error (AC-3.4) when the warehouse file is missing or the
mart table is empty / absent.
"""
path = duckdb_path or default_warehouse_path()
if not path.exists():
raise FileNotFoundError(
f"warehouse not found at {path}. "
"Run `make data` then `make dbt` before training."
)
# NOTE: opened in default (rw) mode rather than read_only=True so that
# the same process can use both dbt's adapter (which holds an rw
# connection) and this loader without DuckDB's "different configuration"
# mismatch. The ML pipelines only issue SELECTs.
con = duckdb.connect(str(path))
try:
try:
# `mart` is a module-level constant string supplied by churn.py
# and upsell.py — not user input. SQL injection guard not applicable.
df = con.execute(f"SELECT * FROM {mart}").fetchdf() # noqa: S608
except duckdb.CatalogException as exc:
raise RuntimeError(
f"mart `{mart}` not found in {path}. "
"Run `make dbt` to materialise the marts."
) from exc
finally:
con.close()
if df.empty:
raise RuntimeError(f"mart `{mart}` is empty; nothing to train on.")
return df


def ensure_artifacts_dir(artifacts_dir: Path | None = None) -> Path:
out = artifacts_dir or default_artifacts_dir()
out.mkdir(parents=True, exist_ok=True)
return out
176 changes: 176 additions & 0 deletions packages/data-analytics-demo/src/data_analytics_demo/ml/churn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Churn-prediction pipeline (T-06).

Reads the `churn_features` mart, trains a LogisticRegression baseline and an
XGBoost classifier, picks the one with the higher hold-out ROC-AUC, and saves:

- `ml/artifacts/churn_model.pkl` — the chosen estimator (sklearn Pipeline)
- `ml/artifacts/churn_metadata.json` — metric snapshot + feature list
- `ml/artifacts/shap_summary.json` — top-10 SHAP features for the narrative layer

Determinism: every random-number-using step takes `random_state=42`.
"""

from __future__ import annotations

import json
import pickle # noqa: S403
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from xgboost import XGBClassifier

from . import _io, explain

CATEGORICAL = ["plan_tier_at_signup", "current_plan_tier", "region"]
NUMERIC = [
"event_count_total",
"distinct_event_types",
"lifetime_paid_usd",
"failed_invoice_count",
"invoice_count",
"events_last_30d",
"lifetime_daily_avg_events",
"recent_to_lifetime_ratio",
"support_ticket_count",
]
LABEL = "is_churned"


def _build_preprocessor() -> ColumnTransformer:
return ColumnTransformer(
[
("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), CATEGORICAL),
("num", StandardScaler(), NUMERIC),
],
remainder="drop",
)


def _expanded_feature_names(preprocessor: ColumnTransformer, df: pd.DataFrame) -> list[str]:
preprocessor.fit(df)
return list(preprocessor.get_feature_names_out())


def train_and_save( # noqa: PLR0913
*,
duckdb_path: Path | None = None,
artifacts_dir: Path | None = None,
random_state: int = _io.DEFAULT_RANDOM_STATE,
test_size: float = 0.2,
min_roc_auc: float = 0.70,
) -> dict[str, Any]:
"""Train both models, pick the best, persist artifacts, return metadata."""
_io.emit("loading churn_features mart")
df = _io.read_mart("churn_features", duckdb_path)

# Fill NaN in engineered ratio (some customers have 0 events).
df["recent_to_lifetime_ratio"] = df["recent_to_lifetime_ratio"].fillna(0.0)

x = df[CATEGORICAL + NUMERIC]
y = df[LABEL].astype(int)

_io.emit(f"train/test split (n={len(df)}, test_size={test_size})")
x_train, x_test, y_train, y_test = train_test_split(
x, y, test_size=test_size, random_state=random_state, stratify=y
)

candidates: dict[str, Pipeline] = {
"logistic_regression": Pipeline(
[
("pre", _build_preprocessor()),
("clf", LogisticRegression(max_iter=1000, random_state=random_state)),
]
),
"xgboost": Pipeline(
[
("pre", _build_preprocessor()),
(
"clf",
XGBClassifier(
n_estimators=200,
max_depth=4,
learning_rate=0.1,
eval_metric="auc",
random_state=random_state,
n_jobs=1,
),
),
]
),
}

aucs: dict[str, float] = {}
for name, pipe in candidates.items():
_io.emit(f"training {name}")
pipe.fit(x_train, y_train)
proba = pipe.predict_proba(x_test)[:, 1]
aucs[name] = float(roc_auc_score(y_test, proba))
_io.emit(f" {name} ROC-AUC = {aucs[name]:.4f}")

chosen_name = max(aucs, key=aucs.get) # type: ignore[arg-type]
chosen = candidates[chosen_name]
chosen_auc = aucs[chosen_name]

if chosen_auc < min_roc_auc:
raise RuntimeError(
f"best model ({chosen_name}) ROC-AUC {chosen_auc:.4f} "
f"< AC-3.2 floor {min_roc_auc}"
)

out_dir = _io.ensure_artifacts_dir(artifacts_dir)
model_path = out_dir / "churn_model.pkl"
with model_path.open("wb") as f:
pickle.dump(chosen, f) # noqa: S301

# SHAP summary on the chosen model. We pass the *post-preprocessing*
# feature matrix so SHAP sees the same shape the inner estimator does.
pre = chosen.named_steps["pre"]
clf = chosen.named_steps["clf"]
feature_names = _expanded_feature_names(pre, x_train)
x_train_transformed = pd.DataFrame(pre.transform(x_train), columns=feature_names)
x_test_transformed = pd.DataFrame(pre.transform(x_test), columns=feature_names)

_io.emit("computing SHAP summary")
shap_summary = explain.compute_shap_summary(
model=clf,
background=x_train_transformed,
feature_names=feature_names,
sample=x_test_transformed,
)
shap_path = out_dir / "shap_summary.json"
explain.write_summary(shap_summary, shap_path)

metadata = {
"task": "churn_prediction",
"chosen_model": chosen_name,
"metrics": {
"roc_auc_test": chosen_auc,
"all_models": aucs,
},
"n_train": int(len(x_train)),
"n_test": int(len(x_test)),
"positive_rate": float(np.mean(y)),
"features": {"categorical": CATEGORICAL, "numeric": NUMERIC},
"artifacts": {
"model": str(model_path.name),
"shap_summary": str(shap_path.name),
},
"random_state": random_state,
}
(out_dir / "churn_metadata.json").write_text(
json.dumps(metadata, indent=2), encoding="utf-8"
)
_io.emit(f"done — chosen={chosen_name} ROC-AUC={chosen_auc:.4f}")
return metadata


if __name__ == "__main__":
train_and_save()
Loading
Loading