Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Unreleased

### Fixed

- **Dashboard hydrate: hydrated calls no longer lose `cost` and `latency`** (Python + TypeScript parity, fully backward-compatible). `CallLogger.log_call_end` writes `cost`, `latency`, `duration_ms`, and `telephony_provider` as **top-level keys** of `metadata.json`, but `MetricsStore.hydrate` (`libraries/python/getpatter/dashboard/store.py:535`, `libraries/typescript/src/dashboard/store.ts:421-424`) read them only from `meta.metrics.cost` / `meta.metrics.latency`. Result: every call rebuilt from disk landed in the store with `metrics=null`, so the local dashboard rendered `$0.00` and `—` for cost/latency on all hydrated rows; only the in-flight call (which never goes through hydrate) showed real numbers. Caught during 0.6.0 acceptance against `releases/0.6.0/typescript/matrix/outbound-cartesia-cerebras-elevenlabs.ts` — 48 of 49 calls in the dashboard had blank P95/cost columns. Fix promotes the top-level fields into a synthesized `metrics` dict (`metrics_from_top_level` / `metricsFromTopLevel`) when `meta.metrics` is missing, mapping `latency.p95_ms` → `metrics.latency_avg.total_ms` so the existing UI fields populate. Explicit `meta.metrics` (legacy/future shape) is preserved untouched. New regressions: `tests/unit/test_metrics_store_hydrate.py::test_hydrate_lifts_top_level_cost_and_latency_into_metrics` + `test_hydrate_preserves_explicit_metrics_when_present` (Py); two `MetricsStore.hydrate` cases in `tests/dashboard-store.test.ts` (TS). Files: `libraries/python/getpatter/dashboard/store.py`, `libraries/typescript/src/dashboard/store.ts`.

- **Pipeline early barge-in: VAD self-cancellation before TTS first byte arrived** (Python + TypeScript parity, behavioural change for pipeline mode). Cloud TTS providers (ElevenLabs, Cartesia, …) take 200–700 ms to emit the first audio byte. The barge-in anti-flicker gate was anchored on `_speaking_started_at` / `speakingStartedAt` (set inside `_begin_speaking` / `beginSpeaking`), so a 250 ms gate without AEC expired BEFORE TTS produced any audio. VAD then picked up background noise, room ambience, or a "hello?" from the operator and triggered `[VAD] speech_start during TTS → BARGE-IN` → `cancelSpeaking` → `isSpeaking=false` → the `for await (chunk of tts.synthesizeStream(...))` loop exited at `if (!this.isSpeaking) break`, emitting **zero bytes**. From the SDK's perspective the agent "spoke" the first message; from the caller's perspective the line went silent until the next turn. Reproduced on `outbound-cartesia-cerebras-elevenlabs.ts` (call CAfca9c23b22144d4b1bb8ee737dd24016, 47 s — first message never reached the wire). Fix introduces `_first_audio_sent_at` / `firstAudioSentAt`, set in a new `_mark_first_audio_sent` / `markFirstAudioSent` helper invoked AFTER `audio_sender.send_audio` / `bridge.sendAudio` succeeds at all four pipeline emit sites (firstMessage, streaming response, regular response, WebSocket remote). `_can_barge_in` / `canBargeIn` now refuses to open the gate while `_first_audio_sent_at` is null — VAD speech_start before the first wire-time byte is suppressed regardless of how much wall-clock has elapsed since `_begin_speaking`. The 250 ms / 1000 ms gate values are unchanged — only the anchor moves. New regressions: `tests/unit/test_stream_handler_unit.py::test_barge_in_suppressed_before_first_audio_emitted` (Py); `canBargeIn() false before the first TTS chunk has hit the wire` in `tests/unit/stream-handler.test.ts` (TS). Existing `_handle_barge_in` / `handleBargeIn` tests updated to set both timestamps to reflect the new contract. Files: `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`.

## 0.6.0 (2026-05-08)

### Fixed
Expand Down
31 changes: 31 additions & 0 deletions libraries/python/getpatter/dashboard/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,35 @@ def _numeric_subdirs(parent):
yield entry


def _metrics_from_top_level(meta: dict[str, Any]) -> dict[str, Any] | None:
"""Build a ``metrics`` dict from top-level CallLogger fields.

``CallLogger.log_call_end`` writes ``cost`` / ``latency`` / ``duration_ms`` /
``telephony_provider`` as top-level keys in ``metadata.json``, but the
dashboard UI expects them under ``metrics``. Without this fallback every
hydrated call shows ``$0.00`` and ``—`` for cost and latency.
"""
cost = meta.get("cost") if isinstance(meta.get("cost"), dict) else None
latency = meta.get("latency") if isinstance(meta.get("latency"), dict) else None
duration_ms = meta.get("duration_ms")
telephony = meta.get("telephony_provider")
if cost is None and latency is None and duration_ms is None and not telephony:
return None
out: dict[str, Any] = {}
if cost is not None:
out["cost"] = cost
if latency is not None:
out["latency_avg"] = {
"total_ms": latency.get("p95_ms") or latency.get("p50_ms") or 0
}
out["latency"] = latency
if isinstance(duration_ms, (int, float)) and duration_ms > 0:
out["duration_seconds"] = float(duration_ms) / 1000.0
if telephony:
out["telephony_provider"] = telephony
return out or None


def _metadata_to_call_record(
call_id: str, meta: dict[str, Any]
) -> dict[str, Any] | None:
Expand All @@ -533,6 +562,8 @@ def _to_seconds(raw: Any) -> float | None:
return None
ended = _to_seconds(meta.get("ended_at"))
metrics = meta.get("metrics") if isinstance(meta.get("metrics"), dict) else None
if metrics is None:
metrics = _metrics_from_top_level(meta)
transcript = (
meta.get("transcript") if isinstance(meta.get("transcript"), list) else []
)
Expand Down
41 changes: 40 additions & 1 deletion libraries/python/getpatter/stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,17 @@ def __init__(
# the AEC filter is still converging (~500 ms warmup + safety
# margin).
self._speaking_started_at: float | None = None
# Wall-clock timestamp (``time.time()`` units) when the FIRST TTS
# audio chunk of the current turn actually reached the carrier wire
# — set by ``_mark_first_audio_sent`` after ``audio_sender.send_audio``
# succeeds, cleared by ``_begin_speaking`` / barge-in cancels. The
# barge-in gate is anchored to this timestamp instead of
# ``_speaking_started_at`` because cloud TTS providers (ElevenLabs,
# Cartesia, ...) take 200-700 ms to emit the first byte. A gate
# starting at ``_begin_speaking`` would expire on background noise
# before any audio went out, exit the TTS loop on
# ``_is_speaking=False``, and silently drop the agent's first turn.
self._first_audio_sent_at: float | None = None
# Monotonic counter incremented at every TTS-start. ``_end_speaking_with_grace``
# captures the value at scheduling time and only flips ``_is_speaking`` to
# False if no new turn started in the meantime. Prevents an in-flight grace
Expand Down Expand Up @@ -1845,6 +1856,7 @@ async def start(self) -> None:
if self._aec is not None:
self._aec.push_far_end(audio_chunk)
await self.audio_sender.send_audio(audio_chunk)
self._mark_first_audio_sent()
finally:
# Drop any partial int16 byte to prevent cross-turn corruption
# if the stream threw before a complete sample was delivered.
Expand Down Expand Up @@ -2107,6 +2119,7 @@ async def _synthesize_sentence(
if self._aec is not None:
self._aec.push_far_end(processed_audio)
await self.audio_sender.send_audio(processed_audio)
self._mark_first_audio_sent()
finally:
await gen.aclose()
_tts_span.__exit__(None, None, None)
Expand Down Expand Up @@ -2337,6 +2350,7 @@ async def _handle_barge_in(self, transcript) -> None:
):
self._is_speaking = False
self._speaking_started_at = None
self._first_audio_sent_at = None
# Record cancel timestamp so ``_begin_speaking`` can enforce
# a short drain window before the next TTS chunk lands on
# top of the cancelled turn's tail (avoids audible "doubled
Expand Down Expand Up @@ -2702,6 +2716,7 @@ async def on_audio_received(self, audio_bytes: bytes) -> None:
# pending grace-flip from the prior turn can't fight us.
self._is_speaking = False
self._speaking_started_at = None
self._first_audio_sent_at = None
self._speaking_generation += 1
# Record cancel timestamp so ``_begin_speaking``
# can enforce a short drain window before the
Expand Down Expand Up @@ -2813,10 +2828,22 @@ async def _begin_speaking(self) -> None:
self._speaking_generation += 1
self._is_speaking = True
self._speaking_started_at = time.time()
self._first_audio_sent_at = None
# Fresh turn — drop any stale pre-barge-in buffer from a previous
# turn so we never replay yesterday's audio to STT.
self._inbound_audio_ring = []

def _mark_first_audio_sent(self) -> None:
"""Record that the first TTS chunk of the current turn hit the wire.

Idempotent within a turn: only the first call sets the timestamp.
Must be invoked AFTER the underlying ``audio_sender.send_audio`` so
the gate is anchored to "audio actually went out", not "we asked
the carrier to send it". Mirrors TS ``markFirstAudioSent``.
"""
if self._first_audio_sent_at is None:
self._first_audio_sent_at = time.time()

def _can_barge_in(self) -> bool:
"""Whether barge-in is allowed to fire right now.

Expand All @@ -2832,7 +2859,17 @@ def _can_barge_in(self) -> bool:
started_at = getattr(self, "_speaking_started_at", None)
if started_at is None:
return True
elapsed = time.time() - started_at
# Anchor the gate on "first audio actually emitted", not on
# ``_begin_speaking`` (which fires before the TTS provider's
# first-byte latency has elapsed). Without this guard, background
# noise picked up by VAD ~250 ms after ``_begin_speaking`` triggers
# a self-cancel BEFORE any TTS chunk has reached the wire — the
# agent's first turn becomes silence even though the SDK believes
# it spoke. Mirrors TS ``canBargeIn``.
first_audio_at = getattr(self, "_first_audio_sent_at", None)
if first_audio_at is None:
return False
elapsed = time.time() - first_audio_at
gate = (
MIN_AGENT_SPEAKING_S_BEFORE_BARGE_IN_AEC
if getattr(self, "_aec", None) is not None
Expand Down Expand Up @@ -2869,6 +2906,7 @@ async def _end_speaking_with_grace(self) -> None:
if grace_ms <= 0:
self._is_speaking = False
self._speaking_started_at = None
self._first_audio_sent_at = None
return

gen = self._speaking_generation
Expand All @@ -2881,6 +2919,7 @@ async def _flip_after_grace() -> None:
if self._speaking_generation == gen:
self._is_speaking = False
self._speaking_started_at = None
self._first_audio_sent_at = None
except asyncio.CancelledError: # pragma: no cover
raise
except Exception as exc: # pragma: no cover - defensive
Expand Down
97 changes: 80 additions & 17 deletions libraries/python/tests/unit/test_metrics_store_hydrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from __future__ import annotations

import json
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from pathlib import Path

import pytest
Expand Down Expand Up @@ -67,19 +67,15 @@ def test_rebuilds_call_list_from_disk(tmp_path: Path) -> None:


def test_idempotent_on_re_hydrate(tmp_path: Path) -> None:
_build_fixture(
tmp_path, [{"id": "CA-1", "iso": "2026-04-26T15:00:00.000Z"}]
)
_build_fixture(tmp_path, [{"id": "CA-1", "iso": "2026-04-26T15:00:00.000Z"}])
store = MetricsStore()
assert store.hydrate(str(tmp_path)) == 1
assert store.hydrate(str(tmp_path)) == 0
assert store.call_count == 1


def test_tolerates_corrupt_metadata(tmp_path: Path) -> None:
_build_fixture(
tmp_path, [{"id": "CA-good", "iso": "2026-04-26T15:00:00.000Z"}]
)
_build_fixture(tmp_path, [{"id": "CA-good", "iso": "2026-04-26T15:00:00.000Z"}])
bad_dir = tmp_path / "calls" / "2026" / "04" / "26" / "CA-bad"
bad_dir.mkdir(parents=True, exist_ok=True)
(bad_dir / "metadata.json").write_text("{ not valid json", encoding="utf-8")
Expand All @@ -92,10 +88,7 @@ def test_tolerates_corrupt_metadata(tmp_path: Path) -> None:
def test_respects_max_calls(tmp_path: Path) -> None:
_build_fixture(
tmp_path,
[
{"id": f"CA-{i}", "iso": f"2026-04-26T15:0{i}:00.000Z"}
for i in range(7)
],
[{"id": f"CA-{i}", "iso": f"2026-04-26T15:0{i}:00.000Z"} for i in range(7)],
)
store = MetricsStore(max_calls=3)
assert store.hydrate(str(tmp_path)) == 7
Expand All @@ -108,9 +101,7 @@ def test_respects_max_calls(tmp_path: Path) -> None:
@pytest.mark.parametrize("invalid_name", ["not_numeric", ".DS_Store"])
def test_skips_non_numeric_directory_layers(tmp_path: Path, invalid_name: str) -> None:
"""Stray non-numeric YYYY/MM/DD entries must not break the walk."""
_build_fixture(
tmp_path, [{"id": "CA-only", "iso": "2026-04-26T15:00:00.000Z"}]
)
_build_fixture(tmp_path, [{"id": "CA-only", "iso": "2026-04-26T15:00:00.000Z"}])
(tmp_path / "calls" / invalid_name).mkdir(parents=True, exist_ok=True)
store = MetricsStore()
assert store.hydrate(str(tmp_path)) == 1
Expand All @@ -119,9 +110,7 @@ def test_skips_non_numeric_directory_layers(tmp_path: Path, invalid_name: str) -
def test_skips_records_with_unparseable_started_at(tmp_path: Path) -> None:
"""A malformed ``started_at`` must NOT land in the store as epoch 0,
which would corrupt every sort/range query that depends on it."""
_build_fixture(
tmp_path, [{"id": "CA-good", "iso": "2026-04-26T15:00:00.000Z"}]
)
_build_fixture(tmp_path, [{"id": "CA-good", "iso": "2026-04-26T15:00:00.000Z"}])
bad_dir = tmp_path / "calls" / "2026" / "04" / "26" / "CA-bad"
bad_dir.mkdir(parents=True, exist_ok=True)
(bad_dir / "metadata.json").write_text(
Expand Down Expand Up @@ -168,3 +157,77 @@ def test_accepts_numeric_unix_seconds_timestamps(tmp_path: Path) -> None:
listed = store.get_calls()
assert listed[0]["started_at"] == 1745683200.0
assert listed[0]["ended_at"] == 1745683230.0


def test_hydrate_lifts_top_level_cost_and_latency_into_metrics(tmp_path: Path) -> None:
"""``CallLogger.log_call_end`` writes ``cost`` / ``latency`` / ``duration_ms``
at the top of metadata.json (no ``metrics`` key). The hydrate path must
promote those into ``metrics`` so the dashboard renders cost and latency
instead of ``$0.00`` / ``—`` for hydrated calls.
"""
call_dir = tmp_path / "calls" / "2026" / "05" / "08" / "CA-real-shape"
call_dir.mkdir(parents=True, exist_ok=True)
(call_dir / "metadata.json").write_text(
json.dumps(
{
"schema_version": "1.0",
"call_id": "CA-real-shape",
"started_at": "2026-05-08T23:33:00.000Z",
"ended_at": "2026-05-08T23:33:57.000Z",
"duration_ms": 57400,
"status": "completed",
"caller": "",
"callee": "",
"telephony_provider": "twilio",
"provider_mode": "pipeline",
"agent": {"provider": "pipeline", "language": "en"},
"turns": 9,
"cost": {
"stt": 0.001526,
"tts": 0.02988,
"llm": 0.000406,
"telephony": 0.0085,
"total": 0.040312,
},
"latency": {"p50_ms": 2127.7, "p95_ms": 3461.7, "p99_ms": 3640.1},
"error": None,
}
),
encoding="utf-8",
)

store = MetricsStore()
assert store.hydrate(str(tmp_path)) == 1
rec = store.get_calls()[0]
metrics = rec["metrics"]
assert metrics is not None
assert metrics["cost"]["total"] == pytest.approx(0.040312)
assert metrics["latency"]["p95_ms"] == pytest.approx(3461.7)
assert metrics["latency_avg"]["total_ms"] == pytest.approx(3461.7)
assert metrics["duration_seconds"] == pytest.approx(57.4)
assert metrics["telephony_provider"] == "twilio"


def test_hydrate_preserves_explicit_metrics_when_present(tmp_path: Path) -> None:
"""If a metadata.json already has ``metrics`` (legacy or future shape) we
must NOT overwrite it with the top-level fallback.
"""
call_dir = tmp_path / "calls" / "2026" / "05" / "08" / "CA-explicit"
call_dir.mkdir(parents=True, exist_ok=True)
(call_dir / "metadata.json").write_text(
json.dumps(
{
"call_id": "CA-explicit",
"started_at": "2026-05-08T10:00:00Z",
"metrics": {"cost": {"total": 0.999}, "marker": "kept"},
"cost": {"total": 0.001},
"latency": {"p95_ms": 9999},
}
),
encoding="utf-8",
)
store = MetricsStore()
assert store.hydrate(str(tmp_path)) == 1
metrics = store.get_calls()[0]["metrics"]
assert metrics["marker"] == "kept"
assert metrics["cost"]["total"] == pytest.approx(0.999)
Loading
Loading