Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions scripts/calibrate_adaptive_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""Recommend adaptive RTC action-chunk thresholds from Tether JSONL traces."""
from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path


ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))

from tether.runtime.adaptive_calibration import ( # noqa: E402
iter_adaptive_records,
recommend_adaptive_chunk_thresholds,
)


def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"trace",
nargs="+",
type=Path,
help="Tether JSONL or JSONL.gz trace files containing request.rtc records.",
)
parser.add_argument(
"--compact",
action="store_true",
help="Print compact JSON instead of indented JSON.",
)
args = parser.parse_args(argv)

records = list(iter_adaptive_records(args.trace))
recommendation = recommend_adaptive_chunk_thresholds(records)
print(
json.dumps(
recommendation,
indent=None if args.compact else 2,
sort_keys=True,
)
)
return 0


if __name__ == "__main__":
raise SystemExit(main())
2 changes: 2 additions & 0 deletions src/tether/observability/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
inc_slo_violation,
observe_batch_flush,
observe_onnx_load_time,
observe_rtc_adaptive_chunking,
record_act_latency,
render_metrics,
set_episodes_active,
Expand All @@ -47,6 +48,7 @@
"inc_inference_executor_rejected",
"inc_model_swap",
"observe_batch_flush",
"observe_rtc_adaptive_chunking",
"track_in_flight",
"set_server_up",
"set_robot_info",
Expand Down
151 changes: 150 additions & 1 deletion src/tether/observability/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
"""
from __future__ import annotations

import math
from collections.abc import Iterator, Mapping
from contextlib import contextmanager
from typing import Iterator
from typing import Any

from prometheus_client import (
CONTENT_TYPE_LATEST,
Expand Down Expand Up @@ -120,6 +122,13 @@
registry=REGISTRY,
)

tether_rtc_adaptive_decisions_total = Counter(
"tether_rtc_adaptive_decisions_total",
"Adaptive RTC action-chunk decisions partitioned by bounded reason",
labelnames=("reason",),
registry=REGISTRY,
)

# Action-similarity fast-path skip counter (action-similarity-fast-path
# Phase 1.5 — FlashVLA). Increments when the inference path returns a
# cached action chunk instead of running the expert. Operator visibility
Expand Down Expand Up @@ -195,6 +204,51 @@
registry=REGISTRY,
)

# Adaptive RTC action-chunking gauges. Labels match the /act latency histogram
# and stay bounded: embodiment × model_id × policy_slot.
tether_rtc_adaptive_horizon = Gauge(
"tether_rtc_adaptive_horizon",
"Latest adaptive RTC execution horizon in actions",
labelnames=("embodiment", "model_id", "policy_slot"),
registry=REGISTRY,
)
tether_rtc_adaptive_risk_score = Gauge(
"tether_rtc_adaptive_risk_score",
"Latest adaptive RTC risk score used for horizon selection",
labelnames=("embodiment", "model_id", "policy_slot"),
registry=REGISTRY,
)
tether_rtc_adaptive_replan_threshold_ratio = Gauge(
"tether_rtc_adaptive_replan_threshold_ratio",
"Latest adaptive RTC replan threshold ratio",
labelnames=("embodiment", "model_id", "policy_slot"),
registry=REGISTRY,
)
tether_rtc_adaptive_guard_margin = Gauge(
"tether_rtc_adaptive_guard_margin",
"Latest ActionGuard safety margin feeding adaptive RTC chunking",
labelnames=("embodiment", "model_id", "policy_slot"),
registry=REGISTRY,
)
tether_rtc_adaptive_correction_magnitude = Gauge(
"tether_rtc_adaptive_correction_magnitude",
"Latest A2C2 correction magnitude feeding adaptive RTC chunking",
labelnames=("embodiment", "model_id", "policy_slot"),
registry=REGISTRY,
)
tether_rtc_adaptive_uncertainty = Gauge(
"tether_rtc_adaptive_uncertainty",
"Latest model uncertainty feeding adaptive RTC chunking",
labelnames=("embodiment", "model_id", "policy_slot"),
registry=REGISTRY,
)
tether_rtc_adaptive_action_delta = Gauge(
"tether_rtc_adaptive_action_delta",
"Latest overlap delta between previous and current RTC action chunks",
labelnames=("embodiment", "model_id", "policy_slot"),
registry=REGISTRY,
)


# ---------------------------------------------------------------------------
# Helpers — typed call-sites keep the surface searchable
Expand Down Expand Up @@ -268,6 +322,101 @@ def inc_inference_executor_rejected(
).inc()


_RTC_ADAPTIVE_REASONS = frozenset({
"disabled",
"no_signal",
"stable",
"stable_high_latency",
"uncertainty",
"guard_margin",
"correction",
"action_delta",
"unknown",
})


def observe_rtc_adaptive_chunking(
*,
embodiment: str,
model_id: str,
policy_slot: str = "prod",
decision: Mapping[str, Any] | None = None,
signal: Mapping[str, Any] | None = None,
last_action_delta: float | None = None,
) -> None:
"""Emit bounded metrics for the latest adaptive RTC chunking snapshot."""
labels = {
"embodiment": embodiment,
"model_id": model_id,
"policy_slot": policy_slot,
}

if decision:
reason = _bounded_rtc_reason(decision.get("reason"))
tether_rtc_adaptive_decisions_total.labels(reason=reason).inc()
_set_gauge_if_float(
tether_rtc_adaptive_horizon,
labels,
decision.get("horizon"),
)
_set_gauge_if_float(
tether_rtc_adaptive_risk_score,
labels,
decision.get("risk_score"),
)
_set_gauge_if_float(
tether_rtc_adaptive_replan_threshold_ratio,
labels,
decision.get("replan_threshold_ratio"),
)

if signal:
_set_gauge_if_float(
tether_rtc_adaptive_guard_margin,
labels,
signal.get("guard_margin"),
)
_set_gauge_if_float(
tether_rtc_adaptive_correction_magnitude,
labels,
signal.get("correction_magnitude"),
)
_set_gauge_if_float(
tether_rtc_adaptive_uncertainty,
labels,
signal.get("uncertainty"),
)
signal_delta = _metric_float(signal.get("action_delta"))
else:
signal_delta = None

if signal_delta is None:
signal_delta = _metric_float(last_action_delta)
if signal_delta is not None:
tether_rtc_adaptive_action_delta.labels(**labels).set(signal_delta)


def _bounded_rtc_reason(value: Any) -> str:
reason = str(value) if value is not None else "unknown"
return reason if reason in _RTC_ADAPTIVE_REASONS else "unknown"


def _metric_float(value: Any) -> float | None:
if value is None:
return None
try:
out = float(value)
except (TypeError, ValueError):
return None
return out if math.isfinite(out) else None


def _set_gauge_if_float(gauge: Gauge, labels: dict[str, str], value: Any) -> None:
out = _metric_float(value)
if out is not None:
gauge.labels(**labels).set(out)


def inc_action_skip() -> None:
tether_action_skip_total.inc()

Expand Down
Loading
Loading