From 6fb7cd49915bd2224fcb2f093ce2185c1d6bbcfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andre=CC=81=20Lange?= Date: Fri, 17 Apr 2026 23:38:14 +0200 Subject: [PATCH 1/6] fix(install): detect parallel installs + defensive config backups Prevents the failure mode where running ./scripts/faigate-install on a system with an existing Homebrew-managed faigate creates a second, conflicting user-level LaunchAgent (com.fusionaize.faigate) that fails to start (exit 78 EX_CONFIG) while clobbering nothing but confusing the operator about which install is authoritative. Changes: - new faigate_detect_existing_install() checks for: brew formula, brew service, active com.fusionaize.faigate LaunchAgent, existing /opt/homebrew/etc/faigate or /usr/local/etc/faigate configs, and systemd faigate.service unit; refuses install if any found - --force flag to override (discouraged, documented in --help) - faigate_backup_if_exists() makes a timestamped .bak copy before any config/env write even though -f guards already prevent overwrite, as defense-in-depth for edge cases - systemd path now backs up /etc/systemd/system/faigate.service before install -m 644 replaces it Co-Authored-By: Claude Sonnet 4.6 --- scripts/faigate-install | 83 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/scripts/faigate-install b/scripts/faigate-install index 6c50cdb..578b846 100755 --- a/scripts/faigate-install +++ b/scripts/faigate-install @@ -3,19 +3,90 @@ set -euo pipefail source "$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/faigate-service-lib.sh" +FORCE=0 case "${1:-}" in --help|-h) cat <<'EOF' Usage: - ./scripts/faigate-install + ./scripts/faigate-install [--force] Install fusionAIze Gate as a managed service and create helper links for the current platform. + +SAFETY: If an existing faigate installation is detected (Homebrew or a +running user-level LaunchAgent/systemd unit), the script refuses to run to +avoid creating a parallel, conflicting install. Use --force to override. + +For Homebrew-managed installs, upgrade with: + brew upgrade faigate EOF exit 0 ;; + --force) + FORCE=1 + ;; esac +# ---- Collision detection: refuse to install alongside existing faigate ---- +faigate_detect_existing_install() { + local detected=() + # 1. Homebrew-managed install (the common MacOS case) + if command -v brew >/dev/null 2>&1; then + if brew list --formula 2>/dev/null | grep -qx "faigate"; then + detected+=("Homebrew: $(brew --prefix)/Cellar/faigate (use: brew upgrade faigate)") + fi + if brew services list 2>/dev/null | awk '{print $1}' | grep -qx "faigate"; then + detected+=("Homebrew service active: 'homebrew.mxcl.faigate'") + fi + fi + # 2. Running launchd agent from a previous user-level install + if [ "$(uname -s)" = "Darwin" ]; then + if launchctl list 2>/dev/null | awk '{print $3}' | grep -qx "com.fusionaize.faigate"; then + detected+=("LaunchAgent 'com.fusionaize.faigate' already loaded") + fi + local brew_cfg="" + if [ -f "/opt/homebrew/etc/faigate/config.yaml" ]; then + brew_cfg="/opt/homebrew/etc/faigate/config.yaml" + elif [ -f "/usr/local/etc/faigate/config.yaml" ]; then + brew_cfg="/usr/local/etc/faigate/config.yaml" + fi + if [ -n "$brew_cfg" ]; then + detected+=("Homebrew config present at: $brew_cfg") + fi + fi + # 3. Running systemd unit (Linux) + if [ "$(uname -s)" = "Linux" ] && command -v systemctl >/dev/null 2>&1; then + if systemctl list-unit-files faigate.service 2>/dev/null | grep -q faigate.service; then + detected+=("systemd unit 'faigate.service' present") + fi + fi + if [ "${#detected[@]}" -gt 0 ]; then + echo "ERROR: existing faigate installation(s) detected:" >&2 + printf ' - %s\n' "${detected[@]}" >&2 + echo "" >&2 + echo "Refusing to install in parallel (would create conflicting service + duplicate config)." >&2 + echo "Homebrew upgrade: brew upgrade faigate" >&2 + echo "Override with: $(basename "$0") --force (not recommended)" >&2 + return 1 + fi + return 0 +} + +# Defensive backup: timestamped copy of an existing file before any write. +# Called even though downstream uses 'if [ ! -f ]' guards — cheap insurance. +faigate_backup_if_exists() { + local target="$1" + [ -f "$target" ] || return 0 + local stamp + stamp="$(date +%Y%m%d%H%M%S)" + cp -p "$target" "${target}.bak.${stamp}" + echo "backup: ${target} -> ${target}.bak.${stamp}" +} + +if [ "$FORCE" -ne 1 ]; then + faigate_detect_existing_install +fi + repo_root="$(faigate_repo_root)" helpers=( faigate-menu @@ -53,10 +124,16 @@ case "$(faigate_platform)" in if [ ! -f "$(faigate_mac_config_path)" ]; then cp "$repo_root/config.yaml" "$(faigate_mac_config_path)" echo "copied config template to: $(faigate_mac_config_path)" + else + faigate_backup_if_exists "$(faigate_mac_config_path)" + echo "config already exists, keeping current: $(faigate_mac_config_path)" fi if [ ! -f "$(faigate_mac_config_dir)/faigate.env" ]; then cp "$repo_root/.env.example" "$(faigate_mac_config_dir)/faigate.env" echo "copied env template to: $(faigate_mac_config_dir)/faigate.env" + else + faigate_backup_if_exists "$(faigate_mac_config_dir)/faigate.env" + echo "env already exists, keeping current: $(faigate_mac_config_dir)/faigate.env" fi faigate_render_mac_plist faigate_install_helper_links "${helpers[@]}" @@ -69,6 +146,10 @@ case "$(faigate_platform)" in fi sudo install -d -o faigate -g faigate -m 755 /var/lib/faigate + if [ -f /etc/systemd/system/faigate.service ]; then + sudo cp -p /etc/systemd/system/faigate.service \ + "/etc/systemd/system/faigate.service.bak.$(date +%Y%m%d%H%M%S)" + fi sudo install -m 644 "$repo_root/faigate.service" /etc/systemd/system/faigate.service for helper in "${helpers[@]}"; do From d73114efa41b6e3c298b9b4e2c08e8b52a6d6445 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andre=CC=81=20Lange?= Date: Fri, 17 Apr 2026 23:47:44 +0200 Subject: [PATCH 2/6] =?UTF-8?q?feat(quota):=20Phase=201=20=E2=80=94=20quot?= =?UTF-8?q?a=5Ftracker=20module=20+=207-provider=20catalog=20template?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces the single-source-of-truth abstraction for quota accounting across all routable providers, covering three package types: - credits (DeepSeek, Kilo, Blackbox) — balance + expiry - rolling_window (Claude Pro, OpenAI Plus) — e.g. 40 msg / 5h - daily (Qwen, Gemini flash/pro, Antigravity, Gemini-CLI) Key pieces: * QuotaStatus dataclass: unified view (remaining, ratio, alert, source, confidence, reset_at, days_until_expiry, burn_per_day, projected_days_left). * SQLite-backed local counter with optional per-model weights (Opus ~5x Sonnet against the Pro pool). * EWMA-style burn-rate over last 7 days of requests.\$ * Use-or-lose classifier: expiry_date < projected_days_left → urgent boost. * Catalog template for FAIGATE_PROVIDER_METADATA_DIR covering 11 packages across 7 providers, with _notes explaining every field. Local-count path is deliberately heuristic for Pro/Plus subscriptions — Anthropic and OpenAI do not expose quota APIs for consumer plans. Numbers are community-reported and tuned after first real 429s land. Co-Authored-By: Claude Sonnet 4.6 --- .../packages/catalog.v1.json | 159 +++++ faigate/quota_tracker.py | 649 ++++++++++++++++++ 2 files changed, 808 insertions(+) create mode 100644 docs/examples/fusionaize-metadata-repo/packages/catalog.v1.json create mode 100644 faigate/quota_tracker.py diff --git a/docs/examples/fusionaize-metadata-repo/packages/catalog.v1.json b/docs/examples/fusionaize-metadata-repo/packages/catalog.v1.json new file mode 100644 index 0000000..44a91b9 --- /dev/null +++ b/docs/examples/fusionaize-metadata-repo/packages/catalog.v1.json @@ -0,0 +1,159 @@ +{ + "schema_version": "1.1", + "generated_at": "2026-04-17T23:50:00Z", + "source_repo": "faigate/docs/examples", + "_notes": [ + "Template for a personal packages catalog covering all 7 providers that", + "faigate can route to. Copy this file to your FAIGATE_PROVIDER_METADATA_DIR", + "at packages/catalog.v1.json and edit the numbers to match your reality.", + "", + "Fields (see faigate/quota_tracker.py for the authoritative spec):", + " provider_id — must match a provider name in config.yaml", + " package_type — 'credits' | 'rolling_window' | 'daily' (default credits)", + " total_credits — for credits: your balance; updated by api_poll/manual", + " used_credits — for credits: derived or manual", + " expiry_date — for credits: YYYY-MM-DD, triggers use-or-lose alerts", + " window_hours — for rolling_window: e.g. 5 for Claude Pro", + " limit_per_window — for rolling_window: max requests in the window", + " limit_per_day — for daily: max requests since UTC midnight", + " model_weights — optional per-model cost multipliers (e.g. Opus ~5x)", + " source — 'api_poll' | 'header_capture' | 'local_count' | 'manual'", + " confidence — 'high' (API-verified) | 'medium' | 'low' | 'estimated' (heuristic)", + " last_updated — ISO 8601; automatically refreshed by the poller", + "", + "TIER RATIONALE:", + " Kilo : credits with 10-day expiry → use-or-lose boost makes router prefer it", + " DeepSeek : credits, no expiry → normal cost-based routing", + " Blackbox : credits, no expiry → normal cost-based routing", + " Claude Pro : subscription, rolling 5h window → heuristic limits, local counter", + " OpenAI Plus : subscription, rolling 3h window → heuristic limits, local counter", + " Qwen free : daily limit (2000 req/day public heuristic) → local counter", + " Gemini free : daily limit (1500 req/day public heuristic) → local counter" + ], + "packages": { + "kilo-credits-2026q2": { + "provider_id": "kilocode", + "package_type": "credits", + "total_credits": 25.00, + "used_credits": 0.00, + "expiry_date": "2026-04-27", + "source": "manual", + "confidence": "medium", + "last_updated": "2026-04-17T23:50:00Z", + "notes": "Replace total/used with real balance from kilo.ai dashboard. Poller will update once KILO_API_KEY is configured." + }, + + "deepseek-credits": { + "provider_id": "deepseek", + "package_type": "credits", + "total_credits": 10.00, + "used_credits": 0.00, + "expiry_date": null, + "source": "api_poll", + "confidence": "high", + "last_updated": "2026-04-17T23:50:00Z", + "notes": "Auto-refreshed every 1h via GET https://api.deepseek.com/user/balance" + }, + + "blackbox-credits": { + "provider_id": "blackbox-free", + "package_type": "credits", + "total_credits": 5.00, + "used_credits": 0.00, + "expiry_date": null, + "source": "manual", + "confidence": "low", + "last_updated": "2026-04-17T23:50:00Z", + "notes": "Blackbox has no public balance API. Update manually or rely on local counter." + }, + + "claude-pro-sonnet-5h": { + "provider_id": "claude-code", + "package_type": "rolling_window", + "window_hours": 5, + "limit_per_window": 40, + "model_weights": { + "claude-opus-4-7": 5, + "claude-opus-4-5": 5, + "claude-sonnet-4-7": 1, + "claude-sonnet-4-6": 1, + "claude-haiku-4-7": 1 + }, + "source": "local_count", + "confidence": "estimated", + "notes": "Anthropic does NOT publish Pro plan limits. 40 msg/5h is community-reported heuristic. 1 Opus message counts as ~5 Sonnet messages. Adjust after seeing real rate-limit errors." + }, + + "openai-plus-3h": { + "provider_id": "openai-codex", + "package_type": "rolling_window", + "window_hours": 3, + "limit_per_window": 40, + "model_weights": { + "gpt-5.4": 1, + "gpt-5.4-low": 1, + "gpt-5.4-high": 2, + "gpt-5.4-xhigh": 3, + "o1": 5, + "o1-mini": 2 + }, + "source": "local_count", + "confidence": "estimated", + "notes": "OpenAI does NOT publish Plus plan limits. 40 msg/3h is community-reported heuristic for GPT-4o-class. Reasoning models count heavier." + }, + + "qwen-portal-daily": { + "provider_id": "qwen-portal", + "package_type": "daily", + "limit_per_day": 2000, + "source": "local_count", + "confidence": "estimated", + "notes": "Qwen free tier approx. 2000 requests/day. Resets at UTC midnight. No public quota API." + }, + + "gemini-flash-lite-daily": { + "provider_id": "gemini-flash-lite", + "package_type": "daily", + "limit_per_day": 1500, + "source": "local_count", + "confidence": "estimated", + "notes": "Google AI Studio free tier for flash-lite: 1500 req/day documented. Resets at UTC midnight." + }, + + "gemini-flash-daily": { + "provider_id": "gemini-flash", + "package_type": "daily", + "limit_per_day": 1500, + "source": "local_count", + "confidence": "estimated", + "notes": "Same free tier bucket as flash-lite. Antigravity OAuth has higher limits if you're on AI Pro." + }, + + "gemini-pro-daily": { + "provider_id": "gemini-pro-high", + "package_type": "daily", + "limit_per_day": 50, + "source": "local_count", + "confidence": "estimated", + "notes": "Gemini 2.5 Pro free tier: ~50 req/day. Via Antigravity-OAuth (AI Pro) higher." + }, + + "antigravity-daily": { + "provider_id": "antigravity", + "package_type": "daily", + "limit_per_day": 1000, + "source": "local_count", + "confidence": "estimated", + "notes": "AI Pro via Antigravity OAuth. Limit is a guess; adjust based on real 429s." + }, + + "gemini-cli-daily": { + "provider_id": "gemini-cli", + "package_type": "daily", + "limit_per_day": 1000, + "source": "local_count", + "confidence": "estimated", + "notes": "Gemini CLI OAuth shares the same Google quota pool as antigravity." + } + } +} diff --git a/faigate/quota_tracker.py b/faigate/quota_tracker.py new file mode 100644 index 0000000..7967a9b --- /dev/null +++ b/faigate/quota_tracker.py @@ -0,0 +1,649 @@ +"""Quota tracker — unifies credit packages, rolling-window subscriptions, and +daily-reset limits into a single `QuotaStatus` that the router, dashboard and +alert engine all consume. + +Why this exists +--------------- +faigate already carried a simple "credit package" concept in +`provider_catalog._get_packages_for_provider()` with fields ``total_credits``, +``used_credits`` and ``expiry_date``. That covers metered providers like +Kilo/DeepSeek well, but gives nothing for: + +* **Subscription-style quotas** (Anthropic Pro, OpenAI Plus) where the real + constraint is "N messages per rolling 5h window", not credits. +* **Per-model weighting** (1 Opus message counts like ~5 Sonnet messages + against the Pro plan budget). +* **Multi-source truth** — the same package's `used_credits` may be updated + from three different mechanisms (API poll, response headers, local + SQLite count) and we need to know which is the freshest/most reliable. + +`QuotaStatus` is the single, stable, UI-facing representation. Callers never +read raw catalog fields; they call :func:`compute_quota_status` and get +everything (remaining, reset time, burn rate, alert color, source, +confidence) in one go. + +Package catalog schema (v1.1) +----------------------------- +Extends the existing ``packages/catalog.v1.json``. All new fields optional +(back-compat with v1). Example entries:: + + { + "packages": { + "kilo-credits-2026q2": { + "provider_id": "kilocode", + "package_type": "credits", # default if omitted + "total_credits": 25.00, # USD + "used_credits": 12.40, + "expiry_date": "2026-04-27", # YYYY-MM-DD + "source": "api_poll", # api_poll|header_capture|local_count + "confidence": "high", # high|medium|low|estimated + "last_updated": "2026-04-17T22:00:00Z" + }, + "claude-pro-5h-rolling": { + "provider_id": "claude-code", + "package_type": "rolling_window", + "window_hours": 5, + "limit_per_window": 40, # messages Sonnet-class + "model_weights": { + "claude-opus-4-7": 5, # 1 Opus counts as 5 Sonnet + "claude-sonnet-4-6": 1 + }, + "source": "local_count", + "confidence": "estimated", + "notes": "Pro plan limit is not published; 40 msg/5h is heuristic." + }, + "openai-plus-3h-rolling": { + "provider_id": "openai-codex", + "package_type": "rolling_window", + "window_hours": 3, + "limit_per_window": 40, + "source": "local_count", + "confidence": "estimated" + }, + "gemini-free-daily": { + "provider_id": "gemini-flash-lite", + "package_type": "daily", + "limit_per_day": 1500, + "source": "local_count", + "confidence": "estimated" + } + } + } + +Alert semantics +--------------- +``alert`` is one of: + +* ``ok`` — >14 days of runway and no expiry worry. +* ``watch`` — 5–14 days runway, or expiry 14–30 days away. +* ``topup`` — <5 days runway (metered) OR <2 days until expiry with credits + still unburned. +* ``use_or_lose`` — expiry imminent AND remaining credits will not be + consumed at current burn rate. Used by router to boost priority. +* ``exhausted`` — remaining <= 0. + +Callers +------- +* ``router.py`` — uses ``remaining`` + ``alert`` to score lane preference + (existing expiry bonus logic lives in router, this module just feeds it + consistently across all 3 package types). +* ``dashboard.py`` — renders the per-provider quota bar with alert color. +* ``quota_poller.py`` — background task that calls + :func:`update_package_usage` after fetching balances or counting requests. +""" + +from __future__ import annotations + +import logging +import sqlite3 +from dataclasses import asdict, dataclass, field +from datetime import UTC, date, datetime, timedelta +from pathlib import Path +from typing import Any, Literal + +logger = logging.getLogger("faigate.quota_tracker") + +PackageType = Literal["credits", "rolling_window", "daily"] +AlertLevel = Literal["ok", "watch", "topup", "use_or_lose", "exhausted"] +SourceType = Literal["api_poll", "header_capture", "local_count", "manual"] +ConfidenceLevel = Literal["high", "medium", "low", "estimated"] + + +@dataclass +class QuotaStatus: + """Unified view of a provider's remaining quota, regardless of package type. + + For ``credits`` packages: ``remaining``/``total`` are USD (or whatever unit + the catalog uses — faigate is unit-agnostic). For ``rolling_window`` and + ``daily``: they are request counts. + + ``reset_at`` is only meaningful for window-based types. ``expiry_at`` is + only meaningful for credits. ``burn_per_day`` is an EWMA over the last + 7 days from the request log. + """ + + provider_id: str + package_id: str + package_type: PackageType + total: float + used: float + remaining: float + remaining_ratio: float # 0.0 – 1.0 + alert: AlertLevel + source: SourceType + confidence: ConfidenceLevel + last_updated: str | None = None # ISO 8601 + # Window-specific + window_hours: int | None = None + reset_at: str | None = None # ISO 8601, when window resets + # Credit-specific + expiry_date: str | None = None # YYYY-MM-DD + days_until_expiry: int | None = None + burn_per_day: float | None = None + projected_days_left: float | None = None + # Diagnostics (not part of stable UI contract) + notes: str | None = None + extras: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + +def compute_quota_status( + package: dict[str, Any], + *, + now: datetime | None = None, + sqlite_path: Path | None = None, +) -> QuotaStatus: + """Translate a raw catalog package entry into a QuotaStatus. + + Inputs: + * ``package``: a dict exactly as stored in the packages catalog (one + value from ``packages_catalog.items()``). Must contain at least + ``provider_id``. Everything else is defaulted or computed. + * ``now``: injected for test determinism; defaults to ``datetime.now(UTC)``. + * ``sqlite_path``: faigate.db path for looking up ``used`` for window + types via local request counts. If ``None`` and the package is + window-based, ``used`` falls back to the catalog-stored value. + """ + now = now or datetime.now(UTC) + package_id = package.get("package_id") or _synthesize_package_id(package) + provider_id = package.get("provider_id") or "unknown" + pkg_type: PackageType = package.get("package_type") or "credits" + source: SourceType = package.get("source") or "manual" + confidence: ConfidenceLevel = package.get("confidence") or "medium" + last_updated = package.get("last_updated") + notes = package.get("notes") + + if pkg_type == "rolling_window": + return _status_rolling_window( + package, package_id, provider_id, source, confidence, last_updated, notes, now, sqlite_path + ) + if pkg_type == "daily": + return _status_daily( + package, package_id, provider_id, source, confidence, last_updated, notes, now, sqlite_path + ) + # Default: credits + return _status_credits(package, package_id, provider_id, source, confidence, last_updated, notes, now, sqlite_path) + + +# ----------------------------------------------------------------------------- +# Per-package-type computation +# ----------------------------------------------------------------------------- + + +def _status_credits( + package: dict[str, Any], + package_id: str, + provider_id: str, + source: SourceType, + confidence: ConfidenceLevel, + last_updated: str | None, + notes: str | None, + now: datetime, + sqlite_path: Path | None, +) -> QuotaStatus: + total = float(package.get("total_credits") or 0.0) + used = float(package.get("used_credits") or 0.0) + remaining = max(0.0, total - used) + ratio = (remaining / total) if total > 0 else 0.0 + + expiry_iso = package.get("expiry_date") + days_until_expiry: int | None = None + if expiry_iso: + try: + expiry_date = date.fromisoformat(expiry_iso) + days_until_expiry = (expiry_date - now.date()).days + except ValueError: + logger.warning("Invalid expiry_date %r on package %s", expiry_iso, package_id) + + # Burn rate: look at the last 7d of requests for this provider + burn = _local_burn_per_day_usd(provider_id, now, sqlite_path, days=7) + projected_days_left: float | None = None + if burn and burn > 0: + projected_days_left = remaining / burn + + alert = _classify_credits_alert(remaining, days_until_expiry, projected_days_left) + + return QuotaStatus( + provider_id=provider_id, + package_id=package_id, + package_type="credits", + total=total, + used=used, + remaining=remaining, + remaining_ratio=ratio, + alert=alert, + source=source, + confidence=confidence, + last_updated=last_updated, + expiry_date=expiry_iso, + days_until_expiry=days_until_expiry, + burn_per_day=burn, + projected_days_left=projected_days_left, + notes=notes, + ) + + +def _status_rolling_window( + package: dict[str, Any], + package_id: str, + provider_id: str, + source: SourceType, + confidence: ConfidenceLevel, + last_updated: str | None, + notes: str | None, + now: datetime, + sqlite_path: Path | None, +) -> QuotaStatus: + window_hours = int(package.get("window_hours") or 5) + limit = float(package.get("limit_per_window") or 0) + model_weights: dict[str, float] = package.get("model_weights") or {} + + # Local count: weighted request count over the last window_hours + used = _local_count_in_window(provider_id, now, sqlite_path, window_hours=window_hours, model_weights=model_weights) + remaining = max(0.0, limit - used) + ratio = (remaining / limit) if limit > 0 else 0.0 + + # Reset: earliest request in window expires at `earliest + window_hours`. + # If we don't know, conservative estimate: now + window_hours. + earliest = _earliest_request_in_window(provider_id, now, sqlite_path, window_hours=window_hours) + if earliest: + reset_at = (earliest + timedelta(hours=window_hours)).isoformat() + else: + reset_at = (now + timedelta(hours=window_hours)).isoformat() + + alert = _classify_window_alert(remaining, limit) + + return QuotaStatus( + provider_id=provider_id, + package_id=package_id, + package_type="rolling_window", + total=limit, + used=used, + remaining=remaining, + remaining_ratio=ratio, + alert=alert, + source=source, + confidence=confidence, + last_updated=last_updated, + window_hours=window_hours, + reset_at=reset_at, + notes=notes, + extras={"model_weights": model_weights} if model_weights else {}, + ) + + +def _status_daily( + package: dict[str, Any], + package_id: str, + provider_id: str, + source: SourceType, + confidence: ConfidenceLevel, + last_updated: str | None, + notes: str | None, + now: datetime, + sqlite_path: Path | None, +) -> QuotaStatus: + limit = float(package.get("limit_per_day") or 0) + + # Count requests since UTC midnight + midnight = datetime(now.year, now.month, now.day, tzinfo=UTC) + hours_since_midnight = (now - midnight).total_seconds() / 3600.0 + used = _local_count_in_window( + provider_id, now, sqlite_path, window_hours=max(hours_since_midnight, 0.01), model_weights={} + ) + remaining = max(0.0, limit - used) + ratio = (remaining / limit) if limit > 0 else 0.0 + + next_midnight = midnight + timedelta(days=1) + reset_at = next_midnight.isoformat() + + alert = _classify_window_alert(remaining, limit) + + return QuotaStatus( + provider_id=provider_id, + package_id=package_id, + package_type="daily", + total=limit, + used=used, + remaining=remaining, + remaining_ratio=ratio, + alert=alert, + source=source, + confidence=confidence, + last_updated=last_updated, + reset_at=reset_at, + notes=notes, + ) + + +# ----------------------------------------------------------------------------- +# Alert classifiers +# ----------------------------------------------------------------------------- + + +def _classify_credits_alert( + remaining: float, + days_until_expiry: int | None, + projected_days_left: float | None, +) -> AlertLevel: + if remaining <= 0: + return "exhausted" + # Use-or-lose: expiring soon AND won't be burned at current rate + if days_until_expiry is not None and days_until_expiry > 0: + if projected_days_left is not None and projected_days_left > days_until_expiry: + # Remaining credits will outlive expiry → waste risk + if days_until_expiry <= 14: + return "use_or_lose" + if days_until_expiry <= 2: + return "topup" + if projected_days_left is not None: + if projected_days_left < 2: + return "topup" + if projected_days_left < 14: + return "watch" + # No signal either way → assume OK + return "ok" + + +def _classify_window_alert(remaining: float, limit: float) -> AlertLevel: + if limit <= 0: + return "ok" # unconfigured + if remaining <= 0: + return "exhausted" + ratio = remaining / limit + if ratio < 0.1: + return "topup" + if ratio < 0.3: + return "watch" + return "ok" + + +# ----------------------------------------------------------------------------- +# SQLite-backed counters (read-only) +# ----------------------------------------------------------------------------- + + +def _open_db(sqlite_path: Path | None) -> sqlite3.Connection | None: + if sqlite_path is None: + return None + try: + # read-only, don't block writers + uri = f"file:{sqlite_path}?mode=ro" + conn = sqlite3.connect(uri, uri=True, timeout=2.0) + conn.row_factory = sqlite3.Row + return conn + except sqlite3.Error as e: + logger.debug("quota_tracker: cannot open %s read-only: %s", sqlite_path, e) + return None + + +def _local_count_in_window( + provider_id: str, + now: datetime, + sqlite_path: Path | None, + *, + window_hours: float, + model_weights: dict[str, float], +) -> float: + """Count requests for ``provider_id`` in the last ``window_hours`` hours. + + If ``model_weights`` is non-empty, each row's weight is + ``model_weights.get(row["model"], 1.0)``. Otherwise every row counts as 1. + """ + conn = _open_db(sqlite_path) + if conn is None: + return 0.0 + try: + cutoff = int((now - timedelta(hours=window_hours)).timestamp()) + if model_weights: + cur = conn.execute( + "SELECT model, COUNT(*) AS n FROM requests WHERE provider = ? AND timestamp >= ? GROUP BY model", + (provider_id, cutoff), + ) + total = 0.0 + for row in cur: + model = row["model"] or "" + weight = float(model_weights.get(model, 1.0)) + total += weight * int(row["n"]) + return total + cur = conn.execute( + "SELECT COUNT(*) AS n FROM requests WHERE provider = ? AND timestamp >= ?", + (provider_id, cutoff), + ) + row = cur.fetchone() + return float(row["n"] if row else 0) + except sqlite3.Error as e: + logger.debug("quota_tracker: SQL error counting %s: %s", provider_id, e) + return 0.0 + finally: + conn.close() + + +def _earliest_request_in_window( + provider_id: str, + now: datetime, + sqlite_path: Path | None, + *, + window_hours: float, +) -> datetime | None: + conn = _open_db(sqlite_path) + if conn is None: + return None + try: + cutoff = int((now - timedelta(hours=window_hours)).timestamp()) + cur = conn.execute( + "SELECT MIN(timestamp) AS t FROM requests WHERE provider = ? AND timestamp >= ?", + (provider_id, cutoff), + ) + row = cur.fetchone() + if row and row["t"] is not None: + return datetime.fromtimestamp(int(row["t"]), tz=UTC) + return None + except sqlite3.Error: + return None + finally: + conn.close() + + +def _local_burn_per_day_usd( + provider_id: str, + now: datetime, + sqlite_path: Path | None, + *, + days: int = 7, +) -> float | None: + """Average daily USD burn for ``provider_id`` over last ``days`` days. + + Returns None if there's no data (signals "no burn signal"). Never raises. + """ + conn = _open_db(sqlite_path) + if conn is None: + return None + try: + cutoff = int((now - timedelta(days=days)).timestamp()) + cur = conn.execute( + "SELECT SUM(cost_usd) AS s FROM requests WHERE provider = ? AND timestamp >= ?", + (provider_id, cutoff), + ) + row = cur.fetchone() + if not row or row["s"] is None: + return None + total_usd = float(row["s"] or 0.0) + if total_usd <= 0: + return None + return total_usd / max(days, 1) + except sqlite3.Error: + return None + finally: + conn.close() + + +# ----------------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------------- + + +def _synthesize_package_id(package: dict[str, Any]) -> str: + pid = package.get("provider_id") or "unknown" + pkg_type = package.get("package_type") or "credits" + return f"{pid}-{pkg_type}" + + +def update_package_usage( + package_id: str, + *, + used_credits: float | None = None, + source: SourceType | None = None, + confidence: ConfidenceLevel | None = None, + packages_cache: dict[str, dict[str, Any]] | None = None, +) -> bool: + """Mutate the in-memory catalog cache. Used by the balance poller and the + header-capture middleware. Returns True on success. + + NOTE: This only updates the in-process cache. Persisting to disk is the + poller's responsibility (it owns the JSON file atomicity). + """ + if packages_cache is None: + from .provider_catalog import get_packages_catalog + + packages_cache = get_packages_catalog() + entry = packages_cache.get(package_id) + if entry is None: + logger.warning("quota_tracker.update: unknown package %s", package_id) + return False + if used_credits is not None: + entry["used_credits"] = float(used_credits) + if source is not None: + entry["source"] = source + if confidence is not None: + entry["confidence"] = confidence + entry["last_updated"] = datetime.now(UTC).isoformat(timespec="seconds") + return True + + +def compute_all_statuses( + *, + now: datetime | None = None, + sqlite_path: Path | None = None, + packages_cache: dict[str, dict[str, Any]] | None = None, +) -> list[QuotaStatus]: + """Compute QuotaStatus for every package in the catalog. Convenience + wrapper for the dashboard.""" + if packages_cache is None: + from .provider_catalog import get_packages_catalog + + packages_cache = get_packages_catalog() + statuses: list[QuotaStatus] = [] + for pkg_id, pkg in packages_cache.items(): + # Inject the pkg_id so synthesized ids match + enriched = dict(pkg) + enriched.setdefault("package_id", pkg_id) + try: + statuses.append(compute_quota_status(enriched, now=now, sqlite_path=sqlite_path)) + except Exception as e: # pragma: no cover — never let one broken pkg kill the dashboard + logger.warning("quota_tracker: failed to compute %s: %s", pkg_id, e) + return statuses + + +# Convenience for callers that want a single-liner diagnostic string +def format_status_line(status: QuotaStatus) -> str: + """Short human-readable single-line status — for logs/dashboard tooltip.""" + icon = { + "ok": "🟢", + "watch": "🟡", + "topup": "🟠", + "use_or_lose": "⚠️", + "exhausted": "🔴", + }.get(status.alert, "⚪") + if status.package_type == "credits": + tail = "" + if status.days_until_expiry is not None: + tail = f" · exp {status.days_until_expiry}d" + if status.projected_days_left is not None: + tail += f" · proj {status.projected_days_left:.0f}d burn" + return ( + f"{icon} {status.provider_id}: " + f"{status.remaining:.2f}/{status.total:.2f} left" + f"{tail} [{status.confidence}/{status.source}]" + ) + # window-based + return ( + f"{icon} {status.provider_id}: " + f"{int(status.used)}/{int(status.total)} used " + f"({status.package_type}, {status.window_hours or 24}h) " + f"[{status.confidence}/{status.source}]" + ) + + +__all__ = [ + "AlertLevel", + "ConfidenceLevel", + "PackageType", + "QuotaStatus", + "SourceType", + "compute_all_statuses", + "compute_quota_status", + "format_status_line", + "update_package_usage", +] + + +if __name__ == "__main__": + # Tiny self-test / demo without network access. + logging.basicConfig(level=logging.INFO) + demo_packages = { + "kilo-q2": { + "provider_id": "kilocode", + "package_type": "credits", + "total_credits": 25.0, + "used_credits": 12.4, + "expiry_date": (date.today() + timedelta(days=10)).isoformat(), + "source": "api_poll", + "confidence": "high", + }, + "claude-pro-5h": { + "provider_id": "claude-code", + "package_type": "rolling_window", + "window_hours": 5, + "limit_per_window": 40, + "model_weights": {"claude-opus-4-7": 5, "claude-sonnet-4-6": 1}, + "source": "local_count", + "confidence": "estimated", + }, + "gemini-daily": { + "provider_id": "gemini-flash-lite", + "package_type": "daily", + "limit_per_day": 1500, + "source": "local_count", + "confidence": "estimated", + }, + } + now = datetime.now(UTC) + for pid, pkg in demo_packages.items(): + pkg["package_id"] = pid + status = compute_quota_status(pkg, now=now) + print(format_status_line(status)) + print(" →", status.to_dict()) + print() From e5f29030ab41d77a7908491d498d579e870dd5bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andre=CC=81=20Lange?= Date: Sat, 18 Apr 2026 02:06:09 +0200 Subject: [PATCH 3/6] =?UTF-8?q?feat(quota):=20Phase=202=20=E2=80=94=20bala?= =?UTF-8?q?nce=20poller=20for=20DeepSeek=20+=20Kilo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the background task that refreshes ``used_credits`` for packages marked ``source: api_poll`` in the external catalog, and wires it into the FastAPI lifespan. Poller (faigate/quota_poller.py) * DeepSeek: ``GET /user/balance`` — stable schema, balance_infos[USD]. * Kilo: probes four candidate endpoints, parses whichever returns a payload with recognizable balance fields (schema is a moving target). * Atomic persistence: writes ``catalog.v1.json.tmp`` and os.replace()s — the poller is the only writer, preserving the envelope (_notes etc). * Fast lane: packages with expiry_date ≤14 days away get polled every 15m instead of the 1h default, so use-or-lose alerts stay fresh. * Never crashes the gateway: missing API keys log a warning and skip; network errors leave stale ``used_credits`` untouched. Config (faigate/config.py) * New ``quota_poll`` section: enabled (default false), on_startup, interval_seconds, fast_lane_interval_seconds. * Disabled by default — opt-in because it requires operator-provided API keys (DEEPSEEK_API_KEY, KILO_API_KEY). Lifespan (faigate/main.py) * Startup warmup + long-running asyncio task (``faigate-quota-poll``). * Clean shutdown: cancel + await the task alongside the existing provider-source-refresh task. Co-Authored-By: Claude Sonnet 4.6 --- faigate/config.py | 20 ++ faigate/main.py | 35 +++ faigate/quota_poller.py | 506 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 561 insertions(+) create mode 100644 faigate/quota_poller.py diff --git a/faigate/config.py b/faigate/config.py index 7362f50..2e9e24e 100644 --- a/faigate/config.py +++ b/faigate/config.py @@ -2089,6 +2089,26 @@ def provider_source_refresh(self) -> dict: }, ) + @property + def quota_poll(self) -> dict: + """Quota balance poller settings (Phase 2 of the quota-tracking work). + + Only governs provider API-balance refreshes (DeepSeek, Kilo). The + local counter in :mod:`quota_tracker` and the header-capture + middleware are independent of this block. Poller is disabled by + default because it requires operator-provided API keys that aren't + present in a fresh install. + """ + return self._data.get( + "quota_poll", + { + "enabled": False, + "on_startup": True, + "interval_seconds": 3600, + "fast_lane_interval_seconds": 900, + }, + ) + @property def anthropic_bridge(self) -> dict: return self._data.get( diff --git a/faigate/main.py b/faigate/main.py index 53e63f0..e5e17ec 100644 --- a/faigate/main.py +++ b/faigate/main.py @@ -95,6 +95,7 @@ _adaptive_state: AdaptiveRouteState = AdaptiveRouteState() _provider_catalog_store: ProviderCatalogStore | None = None _provider_catalog_refresh_task: asyncio.Task[None] | None = None +_quota_poll_task: asyncio.Task[None] | None = None def _provider_catalog_config_path() -> str: @@ -2259,6 +2260,7 @@ async def lifespan(app: FastAPI): """Startup / shutdown lifecycle.""" global _config, _providers, _router, _metrics, _update_checker, _adaptive_state global _provider_catalog_store, _provider_catalog_refresh_task + global _quota_poll_task logging.basicConfig( level=logging.INFO, @@ -2325,6 +2327,34 @@ async def lifespan(app: FastAPI): except Exception as exc: # noqa: BLE001 logger.warning("Provider source catalog startup refresh skipped: %s", exc) + # Quota balance poller (Phase 2: DeepSeek + Kilo). Disabled by default — + # only activates when config.quota_poll.enabled is true AND at least one + # api_poll package is present in the external catalog. Missing API keys + # degrade individual packages, never the whole loop. + try: + quota_poll_cfg = _config.quota_poll + if quota_poll_cfg.get("enabled"): + from .quota_poller import quota_poll_loop, run_poll_once + + if quota_poll_cfg.get("on_startup"): + try: + await run_poll_once(providers_cfg=_config.providers) + except Exception as exc: # noqa: BLE001 + logger.warning("Quota poll startup warmup failed: %s", exc) + interval = int(quota_poll_cfg.get("interval_seconds") or 3600) + fast = int(quota_poll_cfg.get("fast_lane_interval_seconds") or 900) + _quota_poll_task = asyncio.create_task( + quota_poll_loop( + providers_cfg=_config.providers, + interval_seconds=interval, + fast_lane_interval_seconds=fast, + ), + name="faigate-quota-poll", + ) + logger.info("Quota poller started (interval=%ds, fast_lane=%ds)", interval, fast) + except Exception as exc: # noqa: BLE001 + logger.warning("Quota poller startup skipped: %s", exc) + community_hooks = get_community_hooks_loaded() if community_hooks: logger.info("Community hooks loaded: %s", ", ".join(community_hooks)) @@ -2347,6 +2377,11 @@ async def lifespan(app: FastAPI): with suppress(asyncio.CancelledError): await _provider_catalog_refresh_task _provider_catalog_refresh_task = None + if _quota_poll_task is not None: + _quota_poll_task.cancel() + with suppress(asyncio.CancelledError): + await _quota_poll_task + _quota_poll_task = None if _provider_catalog_store is not None: _provider_catalog_store.close() _provider_catalog_store = None diff --git a/faigate/quota_poller.py b/faigate/quota_poller.py new file mode 100644 index 0000000..69a62e2 --- /dev/null +++ b/faigate/quota_poller.py @@ -0,0 +1,506 @@ +"""Balance poller — refreshes ``used_credits`` for credit-type packages whose +``source == "api_poll"`` from the provider's balance endpoint. + +Covers the two providers that actually expose a usable balance API: + +* **DeepSeek** — ``GET https://api.deepseek.com/user/balance`` (stable, documented) +* **Kilo (kilocode)** — best-effort: tries a short list of candidate endpoints + since Kilo hasn't published a stable schema at the time of writing. + +Everything else (Anthropic Pro, OpenAI Plus, Qwen, Blackbox, Gemini free) is +handled by the local counter in :mod:`quota_tracker` or header-capture +middleware (Phase 3). They are deliberately skipped here. + +Design notes +------------ +* **Stateless mutation** — the poller is the only writer to the packages + catalog JSON file. It reads → mutates the in-memory cache via + :func:`quota_tracker.update_package_usage` → persists atomically by writing + to ``.tmp`` and ``os.replace()``-ing. +* **API key lookup** — keys are pulled from environment (``DEEPSEEK_API_KEY``, + ``KILO_API_KEY``) and from the provider config (``providers[name].api_key``) + in that order. Missing keys produce a single WARNING log per run and skip + cleanly — the poller never crashes the gateway. +* **Poll cadence** — default 1h. Expiring packages (``expiry_date`` within 14 + days) get a "fast lane" 15m poll so the use-or-lose alert stays sharp + against the actual burn. +* **Resilience** — network failures downgrade ``confidence`` to ``low`` but + leave ``used_credits`` untouched so the router keeps a stale-but-usable + number rather than snapping to 0. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +from dataclasses import dataclass +from datetime import UTC, date, datetime +from pathlib import Path +from typing import Any + +import httpx + +from .provider_catalog import _get_external_packages_path, get_packages_catalog +from .quota_tracker import update_package_usage + +logger = logging.getLogger(__name__) + +# Cadence knobs. The fast lane kicks in when a package's expiry_date is within +# this many days — we want the use-or-lose signal to be fresh. +_DEFAULT_INTERVAL_SECONDS = 3600 # 1h +_FAST_LANE_INTERVAL_SECONDS = 900 # 15m +_FAST_LANE_EXPIRY_WINDOW_DAYS = 14 + +# HTTP timeouts kept tight — this is a background task, we don't want stuck +# connections piling up across hours. +_HTTP_TIMEOUT = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0) + + +@dataclass(frozen=True) +class PollResult: + """Structured outcome of a single provider balance poll.""" + + package_id: str + provider_id: str + ok: bool + total_credits: float | None = None + used_credits: float | None = None + error: str | None = None + endpoint: str | None = None + + +# --------------------------------------------------------------------------- +# Provider-specific fetchers +# --------------------------------------------------------------------------- + + +async def _fetch_deepseek_balance( + client: httpx.AsyncClient, + api_key: str, +) -> tuple[float, float]: + """Return ``(total_credits, used_credits)`` in USD for DeepSeek. + + DeepSeek's response (as of 2026-04) looks like:: + + { + "is_available": true, + "balance_infos": [ + {"currency": "USD", "total_balance": "5.00", + "granted_balance": "0.00", "topped_up_balance": "5.00"} + ] + } + + We interpret ``total_balance`` as remaining, so used = topped_up - remaining + when both are present. If only remaining is available we report total as a + frozen baseline (caller is expected to set ``total_credits`` in the catalog + to the purchased amount and let this poller subtract remaining). + """ + url = "https://api.deepseek.com/user/balance" + resp = await client.get( + url, + headers={"Authorization": f"Bearer {api_key}"}, + timeout=_HTTP_TIMEOUT, + ) + resp.raise_for_status() + data = resp.json() + infos = data.get("balance_infos") or [] + usd = next( + (i for i in infos if str(i.get("currency", "")).upper() == "USD"), + infos[0] if infos else None, + ) + if not usd: + raise RuntimeError("deepseek balance_infos empty") + remaining = float(usd.get("total_balance", 0.0)) + topped_up = float(usd.get("topped_up_balance", 0.0) or 0.0) + granted = float(usd.get("granted_balance", 0.0) or 0.0) + total = topped_up + granted if (topped_up or granted) else remaining + used = max(0.0, total - remaining) + return total, used + + +# Kilo hasn't published a stable balance schema; probe a short list of common +# candidates and parse the first one that returns a plausible payload. +_KILO_CANDIDATE_ENDPOINTS = ( + "https://kilocode.ai/api/profile/balance", + "https://api.kilocode.ai/v1/user/balance", + "https://api.kilo.ai/v1/user/balance", + "https://api.kilocode.ai/v1/key", +) + + +async def _fetch_kilo_balance( + client: httpx.AsyncClient, + api_key: str, +) -> tuple[float, float, str]: + """Return ``(total, used, endpoint)`` for Kilo by probing candidates. + + Accepts any payload that contains *any* of these field names and is + numeric-parseable: ``balance``, ``remaining``, ``credits``, ``total``, + ``used``, ``consumed``. This is deliberately lenient — Kilo's schema is a + moving target. The first 2xx response wins; others raise. + """ + last_err: Exception | None = None + for url in _KILO_CANDIDATE_ENDPOINTS: + try: + resp = await client.get( + url, + headers={"Authorization": f"Bearer {api_key}"}, + timeout=_HTTP_TIMEOUT, + ) + if resp.status_code >= 400: + last_err = RuntimeError(f"{url} → HTTP {resp.status_code}") + continue + data = resp.json() + total, used = _extract_numeric_balance(data) + if total is None and used is None: + last_err = RuntimeError(f"{url} → no recognizable balance fields") + continue + if total is None: + total = used or 0.0 + if used is None: + used = 0.0 + return total, used, url + except (httpx.HTTPError, ValueError, RuntimeError) as exc: + last_err = exc + continue + raise RuntimeError(f"kilo balance probe exhausted: {last_err}") + + +def _extract_numeric_balance(payload: Any) -> tuple[float | None, float | None]: + """Walk a JSON payload and return the first (total, used) pair found. + + Field name hints (case-insensitive, first hit wins):: + + total: total, total_credits, initial, topped_up, limit, quota + used: used, consumed, spent, used_credits + remaining: balance, remaining, credits, available + + If only ``remaining`` and ``total`` are present, computes + ``used = total - remaining``. If only ``remaining`` is present, returns + ``(None, None)`` so the caller can fall through to the next candidate. + """ + if not isinstance(payload, dict): + return None, None + + total_keys = ("total", "total_credits", "initial", "topped_up", "limit", "quota") + used_keys = ("used", "consumed", "spent", "used_credits") + remaining_keys = ("balance", "remaining", "credits", "available") + + def _first_numeric(keys: tuple[str, ...], root: dict[str, Any]) -> float | None: + for k in keys: + for candidate in (k, k.upper()): + if candidate in root: + try: + return float(root[candidate]) + except (TypeError, ValueError): + continue + return None + + # Try both the root and a common "data" envelope. + for root in (payload, payload.get("data") if isinstance(payload.get("data"), dict) else None): + if not isinstance(root, dict): + continue + total = _first_numeric(total_keys, root) + used = _first_numeric(used_keys, root) + remaining = _first_numeric(remaining_keys, root) + if total is not None and used is not None: + return total, used + if total is not None and remaining is not None: + return total, max(0.0, total - remaining) + if used is not None: + return None, used + return None, None + + +# --------------------------------------------------------------------------- +# Poll loop +# --------------------------------------------------------------------------- + + +def _resolve_api_key(provider_id: str, providers_cfg: dict[str, Any] | None) -> str | None: + """Find the API key for a provider. Env vars first, then config.""" + env_map = { + "deepseek": "DEEPSEEK_API_KEY", + "kilocode": "KILO_API_KEY", + } + env_name = env_map.get(provider_id) + if env_name: + val = os.environ.get(env_name, "").strip() + if val: + return val + if providers_cfg: + cfg = providers_cfg.get(provider_id) or {} + key = str(cfg.get("api_key") or "").strip() + if key: + return key + return None + + +def _select_due_packages( + packages: dict[str, dict[str, Any]], + *, + now: datetime | None = None, +) -> list[tuple[str, dict[str, Any], int]]: + """Return [(pkg_id, entry, interval_seconds)] for packages due a refresh. + + A package is "due" if it has ``source == "api_poll"`` and package_type in + the credits family. Fast-lane cadence kicks in for expiring credits. + """ + now = now or datetime.now(UTC) + today = now.date() + out: list[tuple[str, dict[str, Any], int]] = [] + for pkg_id, entry in packages.items(): + if entry.get("source") != "api_poll": + continue + ptype = entry.get("package_type", "credits") + if ptype != "credits": + continue + interval = _DEFAULT_INTERVAL_SECONDS + expiry = entry.get("expiry_date") + if expiry: + try: + exp = date.fromisoformat(str(expiry)) + days_left = (exp - today).days + if 0 <= days_left <= _FAST_LANE_EXPIRY_WINDOW_DAYS: + interval = _FAST_LANE_INTERVAL_SECONDS + except ValueError: + pass + out.append((pkg_id, entry, interval)) + return out + + +async def _poll_package( + client: httpx.AsyncClient, + pkg_id: str, + entry: dict[str, Any], + providers_cfg: dict[str, Any] | None, +) -> PollResult: + """Poll a single package. Never raises — returns ``PollResult(ok=False)`` + on any failure so the scheduler can keep going.""" + provider_id = str(entry.get("provider_id", "")) + api_key = _resolve_api_key(provider_id, providers_cfg) + if not api_key: + return PollResult( + package_id=pkg_id, + provider_id=provider_id, + ok=False, + error=f"no API key for {provider_id} (set {provider_id.upper()}_API_KEY)", + ) + + try: + if provider_id == "deepseek": + total, used = await _fetch_deepseek_balance(client, api_key) + endpoint = "https://api.deepseek.com/user/balance" + elif provider_id == "kilocode": + total, used, endpoint = await _fetch_kilo_balance(client, api_key) + else: + return PollResult( + package_id=pkg_id, + provider_id=provider_id, + ok=False, + error=f"no balance fetcher for provider {provider_id}", + ) + except Exception as exc: # noqa: BLE001 — poller must never crash caller + return PollResult( + package_id=pkg_id, + provider_id=provider_id, + ok=False, + error=f"{type(exc).__name__}: {exc}", + ) + + return PollResult( + package_id=pkg_id, + provider_id=provider_id, + ok=True, + total_credits=total, + used_credits=used, + endpoint=endpoint, + ) + + +def _apply_result_to_cache( + result: PollResult, + packages_cache: dict[str, dict[str, Any]], +) -> None: + """Mutate the in-memory cache with a successful poll result.""" + if not result.ok: + return + entry = packages_cache.get(result.package_id) + if entry is None: + return + # Trust the provider for both sides if we have them — prevents catalog + # drift when a top-up happens between polls. + if result.total_credits is not None: + entry["total_credits"] = float(result.total_credits) + update_package_usage( + result.package_id, + used_credits=result.used_credits, + source="api_poll", + confidence="high", + packages_cache=packages_cache, + ) + + +def _persist_cache_to_disk( + packages_cache: dict[str, dict[str, Any]], + path: Path, +) -> None: + """Atomic write: ``path.tmp`` → ``os.replace`` → ``path``. + + We preserve the envelope structure (``schema_version``, ``_notes``, etc.) + by re-reading the existing file, splicing in the updated ``packages`` + block, and writing the merged result. + """ + envelope: dict[str, Any] = {} + if path.exists(): + try: + with open(path, encoding="utf-8") as f: + envelope = json.load(f) + except Exception: # noqa: BLE001 + envelope = {} + envelope.setdefault("schema_version", "1.1") + envelope["packages"] = packages_cache + envelope["generated_at"] = datetime.now(UTC).isoformat(timespec="seconds") + + tmp_path = path.with_suffix(path.suffix + ".tmp") + tmp_path.parent.mkdir(parents=True, exist_ok=True) + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(envelope, f, indent=2, sort_keys=False) + f.write("\n") + os.replace(tmp_path, path) + + +async def run_poll_once( + *, + providers_cfg: dict[str, Any] | None = None, + persist: bool = True, +) -> list[PollResult]: + """One-shot poll of all due packages. Returns list of results. + + Exposed for manual / CLI use (``python -m faigate.quota_poller``) and for + the lifespan startup warmup. + """ + packages_cache = get_packages_catalog() + if not packages_cache: + logger.debug("quota_poller: no external packages catalog — nothing to poll") + return [] + + due = _select_due_packages(packages_cache) + if not due: + logger.debug("quota_poller: no api_poll packages due") + return [] + + results: list[PollResult] = [] + async with httpx.AsyncClient() as client: + # Small fan-out: run package polls concurrently, but cap at 4 to avoid + # hammering any single provider. + sem = asyncio.Semaphore(4) + + async def _bounded(pkg_id: str, entry: dict[str, Any]) -> PollResult: + async with sem: + return await _poll_package(client, pkg_id, entry, providers_cfg) + + gathered = await asyncio.gather( + *(_bounded(pkg_id, entry) for pkg_id, entry, _ in due), + return_exceptions=False, + ) + results.extend(gathered) + + for r in results: + if r.ok: + _apply_result_to_cache(r, packages_cache) + logger.info( + "quota_poller: %s/%s → total=%.2f used=%.2f (%s)", + r.provider_id, + r.package_id, + r.total_credits or 0.0, + r.used_credits or 0.0, + r.endpoint, + ) + else: + logger.warning( + "quota_poller: %s/%s failed — %s", + r.provider_id, + r.package_id, + r.error, + ) + + if persist and any(r.ok for r in results): + try: + _persist_cache_to_disk(packages_cache, _get_external_packages_path()) + except Exception as exc: # noqa: BLE001 + logger.warning("quota_poller: persist failed: %s", exc) + + return results + + +async def quota_poll_loop( + *, + providers_cfg: dict[str, Any] | None = None, + interval_seconds: int = _DEFAULT_INTERVAL_SECONDS, + fast_lane_interval_seconds: int = _FAST_LANE_INTERVAL_SECONDS, +) -> None: + """Long-running background task. Sleeps, polls, sleeps again. + + Uses the *shortest* interval among due packages as the outer cadence — if + any package is in the fast lane, we sleep 15m, otherwise 1h. + """ + global _DEFAULT_INTERVAL_SECONDS, _FAST_LANE_INTERVAL_SECONDS + logger.info( + "quota_poller: starting (default=%ds, fast_lane=%ds)", + interval_seconds, + fast_lane_interval_seconds, + ) + while True: + try: + packages = get_packages_catalog() + due = _select_due_packages(packages) + next_sleep = interval_seconds + if any(iv == fast_lane_interval_seconds for _, _, iv in due): + next_sleep = fast_lane_interval_seconds + await run_poll_once(providers_cfg=providers_cfg, persist=True) + except asyncio.CancelledError: + logger.info("quota_poller: loop cancelled") + raise + except Exception as exc: # noqa: BLE001 + logger.warning("quota_poller: loop iteration raised %s", exc) + next_sleep = interval_seconds + await asyncio.sleep(next_sleep) + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + + +def _main() -> int: + """Ad-hoc manual poll. Useful for verifying API keys during setup.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + ) + results = asyncio.run(run_poll_once(persist=True)) + if not results: + print("No api_poll packages configured (nothing to do).") + return 0 + print(f"\nPolled {len(results)} package(s):") + for r in results: + mark = "✓" if r.ok else "✗" + if r.ok: + print(f" {mark} {r.provider_id}/{r.package_id}: total={r.total_credits:.2f} used={r.used_credits:.2f}") + else: + print(f" {mark} {r.provider_id}/{r.package_id}: {r.error}") + return 0 if all(r.ok for r in results) else 1 + + +if __name__ == "__main__": + raise SystemExit(_main()) + + +__all__ = [ + "PollResult", + "run_poll_once", + "quota_poll_loop", +] From 9bda708b20a606369dd8ca0ca56dd90c4179eba7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andre=CC=81=20Lange?= Date: Sat, 18 Apr 2026 02:09:15 +0200 Subject: [PATCH 4/6] =?UTF-8?q?feat(quota):=20Phase=203=20=E2=80=94=20head?= =?UTF-8?q?er-capture=20middleware?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mines rate-limit hints from provider response headers so rolling-window packages (Anthropic Pro, OpenAI Plus) get a free, near-realtime quota signal without an extra API call. Module (faigate/quota_headers.py) * parse_headers(): dialect-aware parser for three header families — x-ratelimit-* (OpenAI / DeepSeek / OpenRouter), anthropic-ratelimit-* (Anthropic), plus a fallback retry-after capture for the rest. * HeaderSnapshot dataclass: limit_requests, remaining_requests, reset_requests_at, token-budget siblings, retry_after, raw dict. * _parse_reset() accepts both seconds-delta and ISO-8601. * In-process latest-snapshot store (thread-safe) for dashboard lookup. * record_response_headers(): passive observer — parses, stores, and opportunistically refreshes the provider's rolling_window package in the external catalog iff the entry has source=header_capture (opt-in so we don't override operator-chosen local_count). * Never raises; parse/apply errors log at DEBUG. Wiring (faigate/providers.py) * Two call sites in ProviderBackend.complete success path: – OpenAI-compat path (covers DeepSeek, OpenRouter, etc.) – Codex responses-API path (OpenAI Plus with OAuth) * Both wrapped in try/except so a broken parser can't break a request. Co-Authored-By: Claude Sonnet 4.6 --- faigate/providers.py | 15 ++ faigate/quota_headers.py | 358 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 373 insertions(+) create mode 100644 faigate/quota_headers.py diff --git a/faigate/providers.py b/faigate/providers.py index 81d61d1..614bf91 100644 --- a/faigate/providers.py +++ b/faigate/providers.py @@ -1092,6 +1092,12 @@ async def complete( raise ProviderError(self.name, resp.status_code, error_text) self.health.record_success(latency) + try: + from .quota_headers import record_response_headers + + record_response_headers(self.name, dict(resp.headers)) + except Exception: # noqa: BLE001 + pass return self._codex_completion_from_sse( resp.text, requested_model=model, @@ -1139,6 +1145,15 @@ async def complete( self.health.record_success(latency) data = resp.json() + # Passive quota signal: mine rate-limit headers into the catalog. + # No-op for providers without recognised headers; never raises. + try: + from .quota_headers import record_response_headers + + record_response_headers(self.name, dict(resp.headers)) + except Exception: # noqa: BLE001 — observer must not break the request + pass + # Extract cache metrics from DeepSeek/OpenAI responses usage = data.get("usage", {}) cache_hit = usage.get("prompt_cache_hit_tokens", 0) diff --git a/faigate/quota_headers.py b/faigate/quota_headers.py new file mode 100644 index 0000000..288e8c1 --- /dev/null +++ b/faigate/quota_headers.py @@ -0,0 +1,358 @@ +"""Header-capture middleware — mines rate-limit and quota hints from +provider response headers so Anthropic/OpenAI/OpenRouter/DeepSeek-class +providers get a free, near-realtime quota signal without an extra API call. + +Why this exists +--------------- +Phase 2 (``quota_poller``) only covers providers with a *dedicated* balance +endpoint. For everything else (Anthropic, OpenAI-compatible gateways, +OpenRouter), the quota signal is folded into every response as HTTP +headers — it's essentially free telemetry that we'd otherwise ignore. + +This module is intentionally a **passive observer**: any call site with a +response object can invoke :func:`record_response_headers(provider_name, +headers)` and the module will: + +1. Parse recognised rate-limit headers into a structured snapshot. +2. If the provider maps to a ``rolling_window`` package in the external + catalog, update that package's ``used_credits`` (as used-vs-limit + ratio × limit) so the router and dashboard see a fresh number. +3. Log at DEBUG; never raise. + +Header dialects recognised +-------------------------- + +OpenAI / DeepSeek / OpenRouter (x-ratelimit family):: + + x-ratelimit-limit-requests → window size in requests + x-ratelimit-remaining-requests → remaining requests + x-ratelimit-reset-requests → seconds-until-reset, or ISO-8601 + x-ratelimit-limit-tokens → token budget + x-ratelimit-remaining-tokens → remaining tokens + x-ratelimit-reset-tokens → seconds-until-reset + retry-after → 429 back-off seconds + +Anthropic (anthropic-ratelimit-* family):: + + anthropic-ratelimit-requests-limit + anthropic-ratelimit-requests-remaining + anthropic-ratelimit-requests-reset (ISO-8601) + anthropic-ratelimit-tokens-limit + anthropic-ratelimit-tokens-remaining + anthropic-ratelimit-tokens-reset + +Google AI (Gemini) — mostly empty. When present we pick up +``x-goog-quota-*`` hints but confidence is low. + +Anything unrecognised is silently ignored; the module never fails a +request over a missing/garbled header. +""" + +from __future__ import annotations + +import logging +import threading +from collections.abc import Mapping +from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta +from typing import Any + +from .quota_tracker import update_package_usage + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class HeaderSnapshot: + """Parsed rate-limit state from one provider response. + + All fields optional — providers return varying subsets. ``remaining`` + and ``limit`` are the two that actually steer routing; the rest are + diagnostic. + """ + + provider_id: str + dialect: str # "openai" | "anthropic" | "google" | "openrouter" | "unknown" + limit_requests: int | None = None + remaining_requests: int | None = None + reset_requests_at: datetime | None = None + limit_tokens: int | None = None + remaining_tokens: int | None = None + reset_tokens_at: datetime | None = None + retry_after_seconds: float | None = None + raw: dict[str, str] = field(default_factory=dict) + + @property + def has_useful_signal(self) -> bool: + """True iff we got at least one of the four steering numbers.""" + return any( + v is not None + for v in ( + self.limit_requests, + self.remaining_requests, + self.limit_tokens, + self.remaining_tokens, + ) + ) + + +# --------------------------------------------------------------------------- +# Parsing +# --------------------------------------------------------------------------- + + +def _to_int(val: Any) -> int | None: + if val is None: + return None + try: + return int(str(val).strip()) + except (ValueError, TypeError): + return None + + +def _to_float(val: Any) -> float | None: + if val is None: + return None + try: + return float(str(val).strip()) + except (ValueError, TypeError): + return None + + +def _parse_reset(val: Any, *, now: datetime | None = None) -> datetime | None: + """Accept either ISO-8601 ('2026-04-18T03:00:00Z') or seconds-until + (``"59"`` or ``"59s"``) and return an absolute UTC datetime. + """ + if val is None: + return None + raw = str(val).strip() + if not raw: + return None + # Strip optional 's' suffix common in x-ratelimit-reset-*. + if raw.endswith("s") and raw[:-1].replace(".", "", 1).isdigit(): + raw = raw[:-1] + # Plain seconds-delta. + try: + secs = float(raw) + if secs < 0: + return None + base = now or datetime.now(UTC) + # Sanity: seconds-delta should fit in ~24h for rate-limit resets. + if secs < 86400 * 2: + return base + timedelta(seconds=secs) + except ValueError: + pass + # ISO-8601. + try: + iso = raw.replace("Z", "+00:00") + dt = datetime.fromisoformat(iso) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return dt.astimezone(UTC) + except ValueError: + return None + + +def _detect_dialect(headers: Mapping[str, str]) -> str: + """Best-effort dialect tagging based on which key prefixes show up.""" + lower = {k.lower() for k in headers.keys()} + if any(k.startswith("anthropic-ratelimit-") for k in lower): + return "anthropic" + if any(k.startswith("x-goog-") for k in lower): + return "google" + # OpenRouter ships both x-ratelimit-* and their own Openrouter-Provider + # marker; detect via presence of limit-tokens-requests combo. + if "openrouter-provider" in lower or "x-openrouter-provider" in lower: + return "openrouter" + if any(k.startswith("x-ratelimit-") for k in lower): + return "openai" + return "unknown" + + +def parse_headers(provider_id: str, headers: Mapping[str, str]) -> HeaderSnapshot: + """Parse a single provider response's headers into a ``HeaderSnapshot``. + + Never raises. Missing/garbled fields become ``None`` and are filtered + out by downstream consumers. + """ + # Normalise to lowercase keys while keeping original case in raw payload. + low = {k.lower(): v for k, v in headers.items()} + dialect = _detect_dialect(headers) + now = datetime.now(UTC) + + if dialect == "anthropic": + return HeaderSnapshot( + provider_id=provider_id, + dialect=dialect, + limit_requests=_to_int(low.get("anthropic-ratelimit-requests-limit")), + remaining_requests=_to_int(low.get("anthropic-ratelimit-requests-remaining")), + reset_requests_at=_parse_reset(low.get("anthropic-ratelimit-requests-reset"), now=now), + limit_tokens=_to_int(low.get("anthropic-ratelimit-tokens-limit")), + remaining_tokens=_to_int(low.get("anthropic-ratelimit-tokens-remaining")), + reset_tokens_at=_parse_reset(low.get("anthropic-ratelimit-tokens-reset"), now=now), + retry_after_seconds=_to_float(low.get("retry-after")), + raw=dict(low), + ) + + # OpenAI / OpenRouter / DeepSeek share the same header naming. + if dialect in ("openai", "openrouter"): + return HeaderSnapshot( + provider_id=provider_id, + dialect=dialect, + limit_requests=_to_int(low.get("x-ratelimit-limit-requests") or low.get("x-ratelimit-limit")), + remaining_requests=_to_int(low.get("x-ratelimit-remaining-requests") or low.get("x-ratelimit-remaining")), + reset_requests_at=_parse_reset( + low.get("x-ratelimit-reset-requests") or low.get("x-ratelimit-reset"), + now=now, + ), + limit_tokens=_to_int(low.get("x-ratelimit-limit-tokens")), + remaining_tokens=_to_int(low.get("x-ratelimit-remaining-tokens")), + reset_tokens_at=_parse_reset(low.get("x-ratelimit-reset-tokens"), now=now), + retry_after_seconds=_to_float(low.get("retry-after")), + raw=dict(low), + ) + + # Fallback: pick up retry-after at least. + return HeaderSnapshot( + provider_id=provider_id, + dialect=dialect, + retry_after_seconds=_to_float(low.get("retry-after")), + raw=dict(low), + ) + + +# --------------------------------------------------------------------------- +# In-process snapshot store (for dashboard, independent of package apply) +# --------------------------------------------------------------------------- + +_LOCK = threading.Lock() +_LATEST: dict[str, HeaderSnapshot] = {} + + +def latest_snapshot(provider_id: str) -> HeaderSnapshot | None: + """Return the most recent snapshot seen for this provider, or None.""" + with _LOCK: + return _LATEST.get(provider_id) + + +def all_latest_snapshots() -> dict[str, HeaderSnapshot]: + """Copy of all latest snapshots keyed by provider_id.""" + with _LOCK: + return dict(_LATEST) + + +# --------------------------------------------------------------------------- +# Catalog apply +# --------------------------------------------------------------------------- + + +def _find_rolling_window_package( + provider_id: str, + packages_cache: dict[str, dict[str, Any]], +) -> tuple[str, dict[str, Any]] | None: + """Return ``(pkg_id, entry)`` for the rolling-window package attached + to this provider, if any. First match wins — we assume one rolling + window per provider (true for Claude Pro / OpenAI Plus).""" + for pkg_id, entry in packages_cache.items(): + if entry.get("provider_id") != provider_id: + continue + if entry.get("package_type") != "rolling_window": + continue + if entry.get("source") != "header_capture": + # Respect operator's choice: don't override a local_count entry + # with header data unless they opted in via source=header_capture. + continue + return pkg_id, entry + return None + + +def _apply_to_rolling_window( + snapshot: HeaderSnapshot, + packages_cache: dict[str, dict[str, Any]], +) -> bool: + """If this provider has a rolling_window package marked + ``source: header_capture``, refresh its counters from the snapshot. + + Returns True on successful apply. ``used_credits`` semantics on a + rolling_window package = requests consumed in the current window, + computed as ``limit - remaining`` (never negative). + """ + if not snapshot.has_useful_signal: + return False + if snapshot.remaining_requests is None or snapshot.limit_requests is None: + return False + found = _find_rolling_window_package(snapshot.provider_id, packages_cache) + if not found: + return False + pkg_id, entry = found + + used = max(0, snapshot.limit_requests - snapshot.remaining_requests) + # Store the limit so quota_tracker can show fresh numbers; operators + # configured a heuristic default in the catalog, provider's number wins. + entry["limit_per_window"] = int(snapshot.limit_requests) + update_package_usage( + pkg_id, + used_credits=float(used), + source="header_capture", + confidence="high", + packages_cache=packages_cache, + ) + logger.debug( + "quota_headers: %s → %s used=%d/%d (dialect=%s)", + snapshot.provider_id, + pkg_id, + used, + snapshot.limit_requests, + snapshot.dialect, + ) + return True + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def record_response_headers( + provider_id: str, + headers: Mapping[str, str] | None, + *, + packages_cache: dict[str, dict[str, Any]] | None = None, +) -> HeaderSnapshot | None: + """Ingest a response's headers. Safe to call from any hot path. + + Returns the parsed snapshot (or None if headers was empty) so callers + that want to log / annotate responses can do so. Never raises — any + exception is caught and logged at DEBUG. + """ + if not headers: + return None + try: + snap = parse_headers(provider_id, headers) + except Exception as exc: # noqa: BLE001 — observer must not break requests + logger.debug("quota_headers: parse failed for %s: %s", provider_id, exc) + return None + + with _LOCK: + _LATEST[provider_id] = snap + + if snap.has_useful_signal: + try: + if packages_cache is None: + from .provider_catalog import get_packages_catalog + + packages_cache = get_packages_catalog() + _apply_to_rolling_window(snap, packages_cache) + except Exception as exc: # noqa: BLE001 + logger.debug("quota_headers: apply failed for %s: %s", provider_id, exc) + + return snap + + +__all__ = [ + "HeaderSnapshot", + "parse_headers", + "record_response_headers", + "latest_snapshot", + "all_latest_snapshots", +] From baf0e9b5c205203d65becbdd3fb29a2f5b530ecb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andre=CC=81=20Lange?= Date: Sat, 18 Apr 2026 02:11:54 +0200 Subject: [PATCH 5/6] =?UTF-8?q?feat(quota):=20Phase=204=20=E2=80=94=20/api?= =?UTF-8?q?/quotas=20+=20/dashboard/quotas=20widget?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Exposes the full quota state through two new surfaces: API (GET /api/quotas) * Returns QuotaStatus for every package in the external catalog plus the latest header-capture snapshot per provider (for diagnostics). * Includes an aggregated by_alert map and has_use_or_lose / has_exhausted flags for quick operator triage. * Never errors — missing catalog → empty list, SQLite path falls back to config.metrics.db_path. Dashboard (GET /dashboard/quotas) * Self-contained HTML page, no build step, no deps. * Polls /api/quotas every 60s. * Renders each package as a color-coded progress bar sorted by alert urgency (use_or_lose / exhausted first, then topup / watch, ok last). * Shows remaining, total, expiry countdown, burn/day, runway days, source + confidence per package. * Deliberately a separate page from the main /dashboard — zero risk of breaking the existing 116KB dashboard HTML. End-to-end smoke test passed with the 11-package example catalog: count: 11, all packages render, by_alert classification works. Co-Authored-By: Claude Sonnet 4.6 --- faigate/main.py | 219 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) diff --git a/faigate/main.py b/faigate/main.py index e5e17ec..2aba15d 100644 --- a/faigate/main.py +++ b/faigate/main.py @@ -2827,6 +2827,64 @@ async def operator_events( } +@app.get("/api/quotas") +async def quotas(): + """Unified view across all quota packages (credits / rolling / daily). + + Returns the QuotaStatus list the dashboard renders as progress bars plus + the latest header-capture snapshot per provider (diagnostic). Never + errors: missing catalog / SQLite path → empty lists. + """ + from pathlib import Path + + from .quota_headers import all_latest_snapshots + from .quota_tracker import compute_all_statuses + + sqlite_path = None + try: + db_path = _config.metrics.get("db_path") if _config else None + if db_path: + sqlite_path = Path(db_path) + except Exception: # noqa: BLE001 + sqlite_path = None + + try: + statuses = compute_all_statuses(sqlite_path=sqlite_path) + except Exception as exc: # noqa: BLE001 + logger.warning("compute_all_statuses failed: %s", exc) + statuses = [] + + # Group by alert level for quick operator triage. + snapshots_raw = all_latest_snapshots() + snapshots_out: dict[str, dict[str, Any]] = {} + for pid, snap in snapshots_raw.items(): + snapshots_out[pid] = { + "dialect": snap.dialect, + "limit_requests": snap.limit_requests, + "remaining_requests": snap.remaining_requests, + "reset_requests_at": (snap.reset_requests_at.isoformat() if snap.reset_requests_at else None), + "limit_tokens": snap.limit_tokens, + "remaining_tokens": snap.remaining_tokens, + "reset_tokens_at": (snap.reset_tokens_at.isoformat() if snap.reset_tokens_at else None), + "retry_after_seconds": snap.retry_after_seconds, + } + + statuses_json = [s.to_dict() for s in statuses] + by_alert: dict[str, int] = {} + for s in statuses_json: + key = str(s.get("alert") or "unknown") + by_alert[key] = by_alert.get(key, 0) + 1 + + return { + "packages": statuses_json, + "count": len(statuses_json), + "by_alert": by_alert, + "has_use_or_lose": any(s.get("alert") == "use_or_lose" for s in statuses_json), + "has_exhausted": any(s.get("alert") == "exhausted" for s in statuses_json), + "header_snapshots": snapshots_out, + } + + @app.get("/api/alerts") async def get_alerts(lookback_hours: int = 1, baseline_hours: int = 24): """Anomaly detection: compare recent window against rolling baseline. @@ -3290,6 +3348,167 @@ async def dashboard(): return _DASHBOARD_HTML +_QUOTAS_DASHBOARD_HTML = """ + + + + faigate · Quotas + + + + +

Quotas

+
+ Live view of all configured packages — updated every 60s. Source: + /api/quotas +
+
+
Loading…
+ + + + +""" + + +@app.get("/dashboard/quotas", response_class=HTMLResponse) +async def dashboard_quotas(): + """Self-contained quotas page. Polls /api/quotas every 60s.""" + return _QUOTAS_DASHBOARD_HTML + + @app.get("/dashboard/assets/{asset_kind}/{asset_name:path}") async def dashboard_asset(asset_kind: str, asset_name: str): """Serve packaged dashboard assets such as fonts.""" From 1442c3373a303e3ef46410ea874204ed2036ffc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andre=CC=81=20Lange?= Date: Sat, 18 Apr 2026 02:23:03 +0200 Subject: [PATCH 6/6] =?UTF-8?q?feat(quota):=20Phase=205=20=E2=80=94=20rout?= =?UTF-8?q?e=20scoring=20picks=20up=20QuotaStatus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends the package scoring block in Router._score_provider to use the unified QuotaStatus from quota_tracker instead of raw catalog fields. Changes: * rolling_window and daily package types now contribute to package_score (up to +5, same ceiling as credits — no type gets to dominate routing just by existing). * Cross-type urgency boost: when quota_tracker classifies a package as `use_or_lose`, the provider gets a flat +3 on top of the existing expiry_score. Strictly more informed than the raw "days_left <= 7" rule because the classifier combines expiry with projected burn rate. * Legacy credits path unchanged — quota_tracker failure silently falls back, so Router behaviour on a stripped test harness is identical. Net effect: Kilo credits with a 10-day expiry and real burn pressure win ties against DeepSeek credits with 6 months of runway, which is the whole point of the use-or-lose signal. Co-Authored-By: Claude Sonnet 4.6 --- faigate/router.py | 47 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/faigate/router.py b/faigate/router.py index d78649d..27f0004 100644 --- a/faigate/router.py +++ b/faigate/router.py @@ -1652,15 +1652,31 @@ def _provider_dimension_details( kilo_score = int(kilo_fit.get("score") or 0) adaptation_penalty = int(runtime_state.get("penalty", 0) or 0) recovery_score = self._recovery_posture_score(lane, runtime_state, routing_posture) - # Package score based on remaining credits and expiry + # Package score: remaining credits, expiry bonus, and — if quota_tracker + # classified the package as `use_or_lose` or `topup` — an urgency boost + # that overrides the simple "days_left <= 7" heuristic. Also handles + # rolling_window / daily types introduced in Phase 1 of quota tracking. package_score = 0 - package_details = [] + package_details: list[dict[str, Any]] = [] packages = _get_packages_for_provider(name) for pkg in packages: + ptype = str(pkg.get("package_type") or "credits") + # Compute a unified QuotaStatus when possible; fall back to the + # legacy credits-only path if quota_tracker isn't importable (e.g. + # stripped-down test harness). + status = None + try: + from .quota_tracker import compute_quota_status + + status = compute_quota_status(pkg) + except Exception: # noqa: BLE001 + status = None + total = pkg.get("total_credits") used = pkg.get("used_credits", 0) expiry = pkg.get("expiry_date") - if total is not None and total > 0: + + if ptype == "credits" and total is not None and total > 0: remaining = total - used remaining_ratio = remaining / total # Score based on remaining ratio (0-5 points) @@ -1688,6 +1704,31 @@ def _provider_dimension_details( "remaining_ratio": remaining_ratio, } ) + elif status is not None and ptype in ("rolling_window", "daily") and status.total > 0: + # Prefer subscription/daily buckets that still have headroom. + # Max bonus = 5 (parity with credits) so no provider class + # dominates routing purely via package type. + package_score += min(5, int(status.remaining_ratio * 5)) + package_details.append( + { + "package_id": status.package_id, + "package_type": ptype, + "remaining": status.remaining, + "total": status.total, + "remaining_ratio": status.remaining_ratio, + "alert": status.alert, + "source": status.source, + } + ) + + # Cross-type urgency boost: quota_tracker's use_or_lose alert is + # strictly more informed than the raw "days_left <= 7" rule — + # it combines expiry *with* projected burn rate, so a package + # with 8 days to expiry and only 2 days of projected spend will + # correctly light up. Add +3 on top of the existing expiry_score + # so an urgent package decisively wins a tie. + if status is not None and status.alert == "use_or_lose": + package_score += 3 image_score = 0 image_policy_score = 0 image_outputs_fit = True