From 3851355a3b799ee96b4dd7c583fc0c0130bc93fc Mon Sep 17 00:00:00 2001 From: RomirJ Date: Fri, 12 Jun 2026 12:27:39 -0700 Subject: [PATCH] feat(runtime): add AAC telemetry calibration --- scripts/calibrate_adaptive_chunking.py | 50 ++++ src/tether/observability/__init__.py | 2 + src/tether/observability/prometheus.py | 151 +++++++++++- src/tether/runtime/adaptive_calibration.py | 270 +++++++++++++++++++++ src/tether/runtime/server.py | 92 ++++++- tests/test_adaptive_calibration.py | 177 ++++++++++++++ tests/test_observability_prometheus.py | 27 +++ tests/test_record.py | 25 ++ tests/test_rtc_adapter_day4.py | 39 ++- 9 files changed, 826 insertions(+), 7 deletions(-) create mode 100644 scripts/calibrate_adaptive_chunking.py create mode 100644 src/tether/runtime/adaptive_calibration.py create mode 100644 tests/test_adaptive_calibration.py diff --git a/scripts/calibrate_adaptive_chunking.py b/scripts/calibrate_adaptive_chunking.py new file mode 100644 index 0000000..4cd5063 --- /dev/null +++ b/scripts/calibrate_adaptive_chunking.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +"""Recommend adaptive RTC action-chunk thresholds from Tether JSONL traces.""" +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from tether.runtime.adaptive_calibration import ( # noqa: E402 + iter_adaptive_records, + recommend_adaptive_chunk_thresholds, +) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "trace", + nargs="+", + type=Path, + help="Tether JSONL or JSONL.gz trace files containing request.rtc records.", + ) + parser.add_argument( + "--compact", + action="store_true", + help="Print compact JSON instead of indented JSON.", + ) + args = parser.parse_args(argv) + + records = list(iter_adaptive_records(args.trace)) + recommendation = recommend_adaptive_chunk_thresholds(records) + print( + json.dumps( + recommendation, + indent=None if args.compact else 2, + sort_keys=True, + ) + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/tether/observability/__init__.py b/src/tether/observability/__init__.py index 19be2ea..2d2a12f 100644 --- a/src/tether/observability/__init__.py +++ b/src/tether/observability/__init__.py @@ -23,6 +23,7 @@ inc_slo_violation, observe_batch_flush, observe_onnx_load_time, + observe_rtc_adaptive_chunking, record_act_latency, render_metrics, set_episodes_active, @@ -47,6 +48,7 @@ "inc_inference_executor_rejected", "inc_model_swap", "observe_batch_flush", + "observe_rtc_adaptive_chunking", "track_in_flight", "set_server_up", "set_robot_info", diff --git a/src/tether/observability/prometheus.py b/src/tether/observability/prometheus.py index 36d4f47..20e7e59 100644 --- a/src/tether/observability/prometheus.py +++ b/src/tether/observability/prometheus.py @@ -16,8 +16,10 @@ """ from __future__ import annotations +import math +from collections.abc import Iterator, Mapping from contextlib import contextmanager -from typing import Iterator +from typing import Any from prometheus_client import ( CONTENT_TYPE_LATEST, @@ -120,6 +122,13 @@ registry=REGISTRY, ) +tether_rtc_adaptive_decisions_total = Counter( + "tether_rtc_adaptive_decisions_total", + "Adaptive RTC action-chunk decisions partitioned by bounded reason", + labelnames=("reason",), + registry=REGISTRY, +) + # Action-similarity fast-path skip counter (action-similarity-fast-path # Phase 1.5 — FlashVLA). Increments when the inference path returns a # cached action chunk instead of running the expert. Operator visibility @@ -195,6 +204,51 @@ registry=REGISTRY, ) +# Adaptive RTC action-chunking gauges. Labels match the /act latency histogram +# and stay bounded: embodiment × model_id × policy_slot. +tether_rtc_adaptive_horizon = Gauge( + "tether_rtc_adaptive_horizon", + "Latest adaptive RTC execution horizon in actions", + labelnames=("embodiment", "model_id", "policy_slot"), + registry=REGISTRY, +) +tether_rtc_adaptive_risk_score = Gauge( + "tether_rtc_adaptive_risk_score", + "Latest adaptive RTC risk score used for horizon selection", + labelnames=("embodiment", "model_id", "policy_slot"), + registry=REGISTRY, +) +tether_rtc_adaptive_replan_threshold_ratio = Gauge( + "tether_rtc_adaptive_replan_threshold_ratio", + "Latest adaptive RTC replan threshold ratio", + labelnames=("embodiment", "model_id", "policy_slot"), + registry=REGISTRY, +) +tether_rtc_adaptive_guard_margin = Gauge( + "tether_rtc_adaptive_guard_margin", + "Latest ActionGuard safety margin feeding adaptive RTC chunking", + labelnames=("embodiment", "model_id", "policy_slot"), + registry=REGISTRY, +) +tether_rtc_adaptive_correction_magnitude = Gauge( + "tether_rtc_adaptive_correction_magnitude", + "Latest A2C2 correction magnitude feeding adaptive RTC chunking", + labelnames=("embodiment", "model_id", "policy_slot"), + registry=REGISTRY, +) +tether_rtc_adaptive_uncertainty = Gauge( + "tether_rtc_adaptive_uncertainty", + "Latest model uncertainty feeding adaptive RTC chunking", + labelnames=("embodiment", "model_id", "policy_slot"), + registry=REGISTRY, +) +tether_rtc_adaptive_action_delta = Gauge( + "tether_rtc_adaptive_action_delta", + "Latest overlap delta between previous and current RTC action chunks", + labelnames=("embodiment", "model_id", "policy_slot"), + registry=REGISTRY, +) + # --------------------------------------------------------------------------- # Helpers — typed call-sites keep the surface searchable @@ -268,6 +322,101 @@ def inc_inference_executor_rejected( ).inc() +_RTC_ADAPTIVE_REASONS = frozenset({ + "disabled", + "no_signal", + "stable", + "stable_high_latency", + "uncertainty", + "guard_margin", + "correction", + "action_delta", + "unknown", +}) + + +def observe_rtc_adaptive_chunking( + *, + embodiment: str, + model_id: str, + policy_slot: str = "prod", + decision: Mapping[str, Any] | None = None, + signal: Mapping[str, Any] | None = None, + last_action_delta: float | None = None, +) -> None: + """Emit bounded metrics for the latest adaptive RTC chunking snapshot.""" + labels = { + "embodiment": embodiment, + "model_id": model_id, + "policy_slot": policy_slot, + } + + if decision: + reason = _bounded_rtc_reason(decision.get("reason")) + tether_rtc_adaptive_decisions_total.labels(reason=reason).inc() + _set_gauge_if_float( + tether_rtc_adaptive_horizon, + labels, + decision.get("horizon"), + ) + _set_gauge_if_float( + tether_rtc_adaptive_risk_score, + labels, + decision.get("risk_score"), + ) + _set_gauge_if_float( + tether_rtc_adaptive_replan_threshold_ratio, + labels, + decision.get("replan_threshold_ratio"), + ) + + if signal: + _set_gauge_if_float( + tether_rtc_adaptive_guard_margin, + labels, + signal.get("guard_margin"), + ) + _set_gauge_if_float( + tether_rtc_adaptive_correction_magnitude, + labels, + signal.get("correction_magnitude"), + ) + _set_gauge_if_float( + tether_rtc_adaptive_uncertainty, + labels, + signal.get("uncertainty"), + ) + signal_delta = _metric_float(signal.get("action_delta")) + else: + signal_delta = None + + if signal_delta is None: + signal_delta = _metric_float(last_action_delta) + if signal_delta is not None: + tether_rtc_adaptive_action_delta.labels(**labels).set(signal_delta) + + +def _bounded_rtc_reason(value: Any) -> str: + reason = str(value) if value is not None else "unknown" + return reason if reason in _RTC_ADAPTIVE_REASONS else "unknown" + + +def _metric_float(value: Any) -> float | None: + if value is None: + return None + try: + out = float(value) + except (TypeError, ValueError): + return None + return out if math.isfinite(out) else None + + +def _set_gauge_if_float(gauge: Gauge, labels: dict[str, str], value: Any) -> None: + out = _metric_float(value) + if out is not None: + gauge.labels(**labels).set(out) + + def inc_action_skip() -> None: tether_action_skip_total.inc() diff --git a/src/tether/runtime/adaptive_calibration.py b/src/tether/runtime/adaptive_calibration.py new file mode 100644 index 0000000..9fefdea --- /dev/null +++ b/src/tether/runtime/adaptive_calibration.py @@ -0,0 +1,270 @@ +"""Offline calibration helpers for adaptive RTC action chunking. + +Reads Tether JSONL trace records and recommends AdaptiveChunkConfig thresholds +from observed guard, A2C2, uncertainty, latency, and action-delta signals. +""" +from __future__ import annotations + +import gzip +import json +import math +from collections import Counter +from collections.abc import Iterable, Iterator, Mapping +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any + +from tether.runtime.adaptive_chunking import AdaptiveChunkConfig + + +_METRICS = ( + "guard_margin", + "correction_magnitude", + "uncertainty", + "action_delta", + "latency_ms", + "horizon", + "risk_score", +) + + +@dataclass(frozen=True) +class AdaptiveCalibrationRecommendation: + """Serializable AAC threshold recommendation.""" + + sample_count: int + decision_count: int + reasons: dict[str, int] + observed: dict[str, dict[str, float]] + recommended_config: dict[str, float] + defaults_used: list[str] + + def as_dict(self) -> dict[str, Any]: + return asdict(self) + + +def iter_adaptive_records(paths: Iterable[str | Path]) -> Iterator[dict[str, Any]]: + """Yield request records that contain an ``rtc`` telemetry block.""" + for path_like in paths: + path = Path(path_like) + opener = gzip.open if path.suffix == ".gz" else open + with opener(path, "rt", encoding="utf-8") as f: + for line_no, line in enumerate(f, start=1): + if not line.strip(): + continue + try: + record = json.loads(line) + except json.JSONDecodeError as exc: + raise ValueError( + f"{path}:{line_no}: invalid JSONL record: {exc.msg}" + ) from exc + if record.get("kind") != "request": + continue + if isinstance(record.get("rtc"), dict): + yield record + + +def summarize_adaptive_records( + records: Iterable[Mapping[str, Any]], +) -> dict[str, Any]: + """Summarize observed AAC telemetry from request records.""" + values: dict[str, list[float]] = {metric: [] for metric in _METRICS} + reasons: Counter[str] = Counter() + sample_count = 0 + decision_count = 0 + + for record in records: + rtc = record.get("rtc") + if not isinstance(rtc, Mapping): + continue + sample_count += 1 + + signal = rtc.get("adaptive_signal") + if isinstance(signal, Mapping): + _append_float(values["guard_margin"], signal.get("guard_margin")) + _append_float( + values["correction_magnitude"], + signal.get("correction_magnitude"), + ) + _append_float(values["uncertainty"], signal.get("uncertainty")) + _append_float(values["action_delta"], signal.get("action_delta")) + + _append_float(values["action_delta"], rtc.get("last_action_delta")) + + decision = rtc.get("adaptive_chunking") + if isinstance(decision, Mapping): + decision_count += 1 + reason = str(decision.get("reason") or "unknown") + reasons[reason] += 1 + _append_float(values["horizon"], decision.get("horizon")) + _append_float(values["risk_score"], decision.get("risk_score")) + + latency = record.get("latency") + if isinstance(latency, Mapping): + _append_float(values["latency_ms"], latency.get("total_ms")) + + observed = { + name: _describe(values_for_metric) + for name, values_for_metric in values.items() + if values_for_metric + } + return { + "sample_count": sample_count, + "decision_count": decision_count, + "reasons": dict(sorted(reasons.items())), + "observed": observed, + } + + +def recommend_adaptive_chunk_config( + records: Iterable[Mapping[str, Any]], + *, + defaults: AdaptiveChunkConfig | None = None, +) -> AdaptiveCalibrationRecommendation: + """Recommend AAC thresholds from recorded telemetry. + + The recommendation is deliberately conservative: + - low_guard_margin uses p10 of observed margins, so low-margin pilots shorten + horizons earlier. + - high correction, uncertainty, and action-delta thresholds use p90. + - high_latency_ms uses p75 latency, so stable high-latency scenes can stretch + horizons before the p95 tail. + """ + defaults = defaults or AdaptiveChunkConfig() + summary = summarize_adaptive_records(records) + observed: dict[str, dict[str, float]] = summary["observed"] + defaults_used: list[str] = [] + + recommended = { + "low_guard_margin": _recommended_percentile( + observed, + "guard_margin", + "p10", + defaults.low_guard_margin, + defaults_used, + ), + "high_correction_magnitude": _recommended_percentile( + observed, + "correction_magnitude", + "p90", + defaults.high_correction_magnitude, + defaults_used, + ), + "high_uncertainty": _recommended_percentile( + observed, + "uncertainty", + "p90", + defaults.high_uncertainty, + defaults_used, + ), + "high_action_delta": _recommended_percentile( + observed, + "action_delta", + "p90", + defaults.high_action_delta, + defaults_used, + ), + "high_latency_ms": _recommended_percentile( + observed, + "latency_ms", + "p75", + defaults.high_latency_ms, + defaults_used, + ), + } + recommended["high_uncertainty"] = max( + recommended["high_uncertainty"], + defaults.low_uncertainty + 1e-6, + ) + + return AdaptiveCalibrationRecommendation( + sample_count=int(summary["sample_count"]), + decision_count=int(summary["decision_count"]), + reasons=dict(summary["reasons"]), + observed=observed, + recommended_config={ + key: round(float(value), 6) + for key, value in recommended.items() + }, + defaults_used=sorted(defaults_used), + ) + + +def recommend_adaptive_chunk_thresholds( + records: Iterable[Mapping[str, Any]], + *, + defaults: AdaptiveChunkConfig | None = None, +) -> dict[str, Any]: + """Return a plain dict recommendation for CLIs and JSON reports.""" + return recommend_adaptive_chunk_config(records, defaults=defaults).as_dict() + + +def _append_float(out: list[float], value: Any) -> None: + cleaned = _clean_float(value) + if cleaned is not None: + out.append(cleaned) + + +def _clean_float(value: Any) -> float | None: + if value is None: + return None + try: + out = float(value) + except (TypeError, ValueError): + return None + return out if math.isfinite(out) else None + + +def _describe(values: list[float]) -> dict[str, float]: + ordered = sorted(values) + return { + "count": float(len(ordered)), + "min": ordered[0], + "p10": _percentile(ordered, 10), + "p50": _percentile(ordered, 50), + "p75": _percentile(ordered, 75), + "p90": _percentile(ordered, 90), + "p95": _percentile(ordered, 95), + "max": ordered[-1], + } + + +def _percentile(ordered: list[float], percentile: float) -> float: + if not ordered: + raise ValueError("percentile requires at least one value") + if len(ordered) == 1: + return ordered[0] + rank = (percentile / 100.0) * (len(ordered) - 1) + low = int(math.floor(rank)) + high = int(math.ceil(rank)) + if low == high: + return ordered[low] + weight = rank - low + return ordered[low] * (1.0 - weight) + ordered[high] * weight + + +def _recommended_percentile( + observed: Mapping[str, Mapping[str, float]], + metric: str, + percentile_name: str, + default: float, + defaults_used: list[str], +) -> float: + stats = observed.get(metric) + if not stats or percentile_name not in stats: + defaults_used.append(metric) + return float(default) + value = _clean_float(stats[percentile_name]) + if value is None: + defaults_used.append(metric) + return float(default) + return max(value, 0.0) + + +__all__ = [ + "AdaptiveCalibrationRecommendation", + "iter_adaptive_records", + "recommend_adaptive_chunk_config", + "recommend_adaptive_chunk_thresholds", + "summarize_adaptive_records", +] diff --git a/src/tether/runtime/server.py b/src/tether/runtime/server.py index d0a4316..57a5d42 100644 --- a/src/tether/runtime/server.py +++ b/src/tether/runtime/server.py @@ -48,6 +48,7 @@ from tether.observability import ( METRICS_CONTENT_TYPE, inc_inference_executor_rejected, + observe_rtc_adaptive_chunking, record_act_latency, render_metrics, set_inference_executor_state, @@ -60,6 +61,7 @@ _METRICS_AVAILABLE = False METRICS_CONTENT_TYPE = "text/plain" def inc_inference_executor_rejected(*args, **kwargs): pass + def observe_rtc_adaptive_chunking(*args, **kwargs): pass def record_act_latency(*args, **kwargs): pass def render_metrics() -> bytes: return b"# prometheus_client not installed\n" def set_inference_executor_state(*args, **kwargs): pass @@ -122,6 +124,67 @@ def _record_rtc_adaptive_signal( ) +def _rtc_adaptive_record_from_stats(stats: Any) -> dict[str, Any] | None: + """Extract the additive RTC telemetry block written into JSONL records.""" + if not isinstance(stats, dict): + return None + + out: dict[str, Any] = {} + for key in ("adaptive_chunking", "adaptive_signal"): + value = stats.get(key) + if isinstance(value, dict) and value: + out[key] = value + + action_delta = _coerce_optional_float(stats.get("last_action_delta")) + if action_delta is not None: + out["last_action_delta"] = action_delta + + return out or None + + +def _set_span_optional_float(span: Any, name: str, value: Any) -> None: + out = _coerce_optional_float(value) + if out is not None: + span.set_attribute(name, out) + + +def _set_rtc_adaptive_span_attrs(span: Any, rtc_record: dict[str, Any]) -> None: + decision = rtc_record.get("adaptive_chunking") + if isinstance(decision, dict): + reason = decision.get("reason") + if reason is not None: + span.set_attribute("tether.rtc.adaptive.reason", str(reason)) + _set_span_optional_float( + span, "tether.rtc.adaptive.horizon", decision.get("horizon") + ) + _set_span_optional_float( + span, "tether.rtc.adaptive.risk_score", decision.get("risk_score") + ) + _set_span_optional_float( + span, + "tether.rtc.adaptive.replan_threshold_ratio", + decision.get("replan_threshold_ratio"), + ) + + signal = rtc_record.get("adaptive_signal") + if isinstance(signal, dict): + _set_span_optional_float( + span, "tether.rtc.adaptive.guard_margin", signal.get("guard_margin") + ) + _set_span_optional_float( + span, + "tether.rtc.adaptive.correction_magnitude", + signal.get("correction_magnitude"), + ) + _set_span_optional_float( + span, "tether.rtc.adaptive.uncertainty", signal.get("uncertainty") + ) + + _set_span_optional_float( + span, "tether.rtc.adaptive.action_delta", rtc_record.get("last_action_delta") + ) + + # ─── Blackwell auto-detect ────────────────────────────────────────────────── # ORT-bundled TensorRT predates Blackwell (RTX 50-series, sm_100). On those # GPUs, TRT EP segfaults at session-init because TRT can't register kernels @@ -2613,6 +2676,13 @@ async def act(request: PredictRequest, _auth: None = Depends(_require_api_key)): ) raise + _slot_label = ( + _two_routing_decision.slot + if _two_routing_decision is not None + else "prod" + ) + _rtc_for_record: dict[str, Any] | None = None + # Circuit-breaker bookkeeping on returned result. Error-result # responses (e.g., NaN guard trips) count as crashes; clean # responses reset the counter to 0. @@ -2691,11 +2761,6 @@ async def act(request: PredictRequest, _auth: None = Depends(_require_api_key)): # operators can split per-slot p99 in 2-policy mode. # Default "prod" preserves single-policy series. try: - _slot_label = ( - _two_routing_decision.slot - if _two_routing_decision is not None - else "prod" - ) record_act_latency( float(result["latency_ms"]) / 1000.0, embodiment=_emb_label, @@ -2798,6 +2863,22 @@ async def act(request: PredictRequest, _auth: None = Depends(_require_api_key)): result, guard_margin=_guard_margin, ) + _rtc_stats = _rtc.get_stats() if hasattr(_rtc, "get_stats") else None + _rtc_for_record = _rtc_adaptive_record_from_stats(_rtc_stats) + if _rtc_for_record is not None: + _set_rtc_adaptive_span_attrs(span, _rtc_for_record) + _decision = _rtc_for_record.get("adaptive_chunking") + _signal = _rtc_for_record.get("adaptive_signal") + observe_rtc_adaptive_chunking( + embodiment=_emb_label, + model_id=_model_label, + policy_slot=_slot_label, + decision=( + _decision if isinstance(_decision, dict) else None + ), + signal=_signal if isinstance(_signal, dict) else None, + last_action_delta=_rtc_for_record.get("last_action_delta"), + ) except Exception as exc: # noqa: BLE001 — RTC signal must not break /act logger.warning("RTC adaptive signal update failed: %s", exc) @@ -2848,6 +2929,7 @@ async def act(request: PredictRequest, _auth: None = Depends(_require_api_key)): error=err, routing=_routing_for_record, guard=_guard_for_record, + rtc=_rtc_for_record, ) if rec_seq >= 0: span.set_attribute("tether.record.seq", rec_seq) diff --git a/tests/test_adaptive_calibration.py b/tests/test_adaptive_calibration.py new file mode 100644 index 0000000..d15ba21 --- /dev/null +++ b/tests/test_adaptive_calibration.py @@ -0,0 +1,177 @@ +from __future__ import annotations + +import gzip +import json +from pathlib import Path + +import pytest + +from tether.runtime.adaptive_calibration import ( + iter_adaptive_records, + recommend_adaptive_chunk_thresholds, + summarize_adaptive_records, +) + + +def _write_jsonl(path: Path, records: list[dict], *, gzip_file: bool = False) -> None: + opener = gzip.open if gzip_file else open + with opener(path, "wt", encoding="utf-8") as f: + for record in records: + f.write(json.dumps(record) + "\n") + + +def _record( + *, + latency_ms: float, + guard_margin: float, + correction_magnitude: float, + uncertainty: float, + action_delta: float, + horizon: int, + reason: str, +) -> dict: + return { + "kind": "request", + "latency": {"total_ms": latency_ms}, + "rtc": { + "adaptive_signal": { + "guard_margin": guard_margin, + "correction_magnitude": correction_magnitude, + "uncertainty": uncertainty, + }, + "adaptive_chunking": { + "horizon": horizon, + "reason": reason, + "risk_score": 0.5, + "replan_threshold_ratio": 0.4, + }, + "last_action_delta": action_delta, + }, + } + + +def test_iter_adaptive_records_reads_plain_and_gzip_jsonl(tmp_path): + records = [ + {"kind": "header"}, + _record( + latency_ms=50, + guard_margin=0.02, + correction_magnitude=0.1, + uncertainty=0.2, + action_delta=0.03, + horizon=8, + reason="stable", + ), + {"kind": "request", "latency": {"total_ms": 60}}, + ] + plain = tmp_path / "trace.jsonl" + gz = tmp_path / "trace.jsonl.gz" + _write_jsonl(plain, records) + _write_jsonl(gz, records, gzip_file=True) + + plain_records = list(iter_adaptive_records([plain])) + gz_records = list(iter_adaptive_records([gz])) + + assert len(plain_records) == 1 + assert plain_records == gz_records + assert plain_records[0]["rtc"]["adaptive_chunking"]["reason"] == "stable" + + +def test_summarize_adaptive_records_counts_reasons_and_percentiles(): + records = [ + _record( + latency_ms=50, + guard_margin=0.02, + correction_magnitude=0.1, + uncertainty=0.2, + action_delta=0.03, + horizon=8, + reason="stable", + ), + _record( + latency_ms=100, + guard_margin=0.04, + correction_magnitude=0.2, + uncertainty=0.5, + action_delta=0.08, + horizon=5, + reason="correction", + ), + _record( + latency_ms=200, + guard_margin=0.08, + correction_magnitude=0.4, + uncertainty=0.9, + action_delta=0.16, + horizon=2, + reason="correction", + ), + ] + + summary = summarize_adaptive_records(records) + + assert summary["sample_count"] == 3 + assert summary["decision_count"] == 3 + assert summary["reasons"] == {"correction": 2, "stable": 1} + assert summary["observed"]["latency_ms"]["p50"] == pytest.approx(100) + assert summary["observed"]["guard_margin"]["p10"] == pytest.approx(0.024) + + +def test_recommend_adaptive_chunk_thresholds_uses_recorded_distribution(): + records = [ + _record( + latency_ms=50, + guard_margin=0.02, + correction_magnitude=0.1, + uncertainty=0.2, + action_delta=0.03, + horizon=8, + reason="stable", + ), + _record( + latency_ms=100, + guard_margin=0.04, + correction_magnitude=0.2, + uncertainty=0.5, + action_delta=0.08, + horizon=5, + reason="correction", + ), + _record( + latency_ms=200, + guard_margin=0.08, + correction_magnitude=0.4, + uncertainty=0.9, + action_delta=0.16, + horizon=2, + reason="correction", + ), + ] + + recommendation = recommend_adaptive_chunk_thresholds(records) + + cfg = recommendation["recommended_config"] + assert cfg["low_guard_margin"] == pytest.approx(0.024) + assert cfg["high_correction_magnitude"] == pytest.approx(0.36) + assert cfg["high_uncertainty"] == pytest.approx(0.82) + assert cfg["high_action_delta"] == pytest.approx(0.144) + assert cfg["high_latency_ms"] == pytest.approx(150) + assert recommendation["defaults_used"] == [] + + +def test_recommend_adaptive_chunk_thresholds_falls_back_to_defaults(): + recommendation = recommend_adaptive_chunk_thresholds([]) + + cfg = recommendation["recommended_config"] + assert cfg["low_guard_margin"] == pytest.approx(0.05) + assert cfg["high_correction_magnitude"] == pytest.approx(0.2) + assert cfg["high_uncertainty"] == pytest.approx(0.65) + assert cfg["high_action_delta"] == pytest.approx(0.25) + assert cfg["high_latency_ms"] == pytest.approx(120) + assert set(recommendation["defaults_used"]) == { + "action_delta", + "correction_magnitude", + "guard_margin", + "latency_ms", + "uncertainty", + } diff --git a/tests/test_observability_prometheus.py b/tests/test_observability_prometheus.py index 5537c73..b382d9c 100644 --- a/tests/test_observability_prometheus.py +++ b/tests/test_observability_prometheus.py @@ -22,6 +22,7 @@ inc_safety_violation, inc_slo_violation, observe_onnx_load_time, + observe_rtc_adaptive_chunking, record_act_latency, render_metrics, set_episodes_active, @@ -141,6 +142,32 @@ def test_inference_executor_rejected(self): assert "tether_inference_executor_rejected_total" in out assert 'model_id="pi05"' in out + def test_rtc_adaptive_chunking_metrics(self): + observe_rtc_adaptive_chunking( + embodiment="franka", + model_id="pi05", + policy_slot="prod", + decision={ + "horizon": 4, + "reason": "guard_margin", + "risk_score": 0.8, + "replan_threshold_ratio": 0.6, + }, + signal={ + "guard_margin": 0.03, + "correction_magnitude": 0.25, + "uncertainty": 0.4, + }, + last_action_delta=0.12, + ) + out = render_metrics().decode() + assert "tether_rtc_adaptive_decisions_total" in out + assert 'reason="guard_margin"' in out + assert "tether_rtc_adaptive_horizon" in out + assert 'model_id="pi05"' in out + assert "tether_rtc_adaptive_guard_margin" in out + assert "tether_rtc_adaptive_action_delta" in out + def test_model_swap(self): inc_model_swap(embodiment="franka", from_model="pi0", to_model="pi05") out = render_metrics().decode() diff --git a/tests/test_record.py b/tests/test_record.py index dd0bd25..0270de0 100644 --- a/tests/test_record.py +++ b/tests/test_record.py @@ -183,6 +183,31 @@ def test_optional_fields_dropped_when_none(self, tmp_path): for k in ["cache", "guard", "deadline", "rtc", "error"]: assert k not in r, f"optional field '{k}' should be absent when None" + def test_rtc_field_preserved_when_provided(self, tmp_path): + rec = _make_writer(tmp_path) + _dummy_request( + rec, + rtc={ + "adaptive_chunking": { + "horizon": 4, + "reason": "guard_margin", + "risk_score": 0.75, + "replan_threshold_ratio": 0.6, + }, + "adaptive_signal": { + "guard_margin": 0.03, + "correction_magnitude": 0.2, + "uncertainty": 0.4, + }, + "last_action_delta": 0.12, + }, + ) + rec.close() + r = _read_all(rec.filepath)[1] + assert r["rtc"]["adaptive_chunking"]["horizon"] == 4 + assert r["rtc"]["adaptive_signal"]["guard_margin"] == pytest.approx(0.03) + assert r["rtc"]["last_action_delta"] == pytest.approx(0.12) + class TestFooterFormat: def test_footer_on_close(self, tmp_path): diff --git a/tests/test_rtc_adapter_day4.py b/tests/test_rtc_adapter_day4.py index 8952c24..103d1c8 100644 --- a/tests/test_rtc_adapter_day4.py +++ b/tests/test_rtc_adapter_day4.py @@ -21,7 +21,10 @@ from tether.runtime.buffer import ActionChunkBuffer from tether.runtime.rtc_adapter import RtcAdapter, RtcAdapterConfig -from tether.runtime.server import _record_rtc_adaptive_signal +from tether.runtime.server import ( + _record_rtc_adaptive_signal, + _rtc_adaptive_record_from_stats, +) # --------------------------------------------------------------------------- @@ -349,3 +352,37 @@ def test_server_helper_records_guard_a2c2_and_uncertainty(self): assert stats["adaptive_signal"]["guard_margin"] == pytest.approx(0.03) assert stats["adaptive_signal"]["correction_magnitude"] == pytest.approx(0.25) assert stats["adaptive_signal"]["uncertainty"] == pytest.approx(0.4) + + def test_rtc_adaptive_record_extracts_only_calibration_fields(self): + record = _rtc_adaptive_record_from_stats({ + "enabled": True, + "chunk_count": 5, + "adaptive_chunking": { + "horizon": 3, + "reason": "guard_margin", + "risk_score": 0.8, + "replan_threshold_ratio": 0.7, + }, + "adaptive_signal": { + "guard_margin": 0.02, + "correction_magnitude": 0.3, + }, + "last_action_delta": 0.11, + }) + + assert record == { + "adaptive_chunking": { + "horizon": 3, + "reason": "guard_margin", + "risk_score": 0.8, + "replan_threshold_ratio": 0.7, + }, + "adaptive_signal": { + "guard_margin": 0.02, + "correction_magnitude": 0.3, + }, + "last_action_delta": pytest.approx(0.11), + } + + def test_rtc_adaptive_record_returns_none_without_adaptive_state(self): + assert _rtc_adaptive_record_from_stats({"enabled": True}) is None