From db5d58b4173462f0287b19d67b278eff4d848f45 Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Sat, 9 May 2026 02:20:43 +0200 Subject: [PATCH] fix(dashboard,pipeline): hydrate cost/latency from top-level + barge-in gate from first audio MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs caught during 0.6.0 acceptance against `releases/0.6.0/typescript/matrix/outbound-cartesia-cerebras-elevenlabs.ts`: 1. **Dashboard hydrate schema mismatch**: `CallLogger.log_call_end` writes `cost`/`latency`/`duration_ms`/`telephony_provider` as top-level keys of `metadata.json`, but `MetricsStore.hydrate` looked for them under `meta.metrics.cost`/`meta.metrics.latency`. Every hydrated row landed with `metrics=null`, so cost/latency rendered as `$0.00`/`—` for all on-disk calls (only the in-flight call had real numbers). Fix synthesizes a `metrics` dict from the top-level fields when `meta.metrics` is absent while preserving any explicit `meta.metrics` payload untouched. 2. **Early barge-in self-cancellation**: cloud TTS first-byte latency is 200–700 ms; the 250 ms anti-flicker gate (no-AEC PSTN default) was anchored on `_speaking_started_at`/`speakingStartedAt` and expired BEFORE TTS produced audio. VAD then picked up background noise and self-cancelled the agent's first turn — 0 bytes emitted, line silent. Fix anchors the gate on a new `_first_audio_sent_at`/`firstAudioSentAt` set AFTER `bridge.sendAudio` / `audio_sender.send_audio` succeeds at the four pipeline emit sites (firstMessage, streaming, regular, WebSocket remote). `_can_barge_in`/`canBargeIn` returns false while the marker is null. Gate values (250 ms / 1000 ms) unchanged — only the anchor moves. Tests: - Py 1717/1717, TS 1394/1394 green; lint clean. - New regressions: `test_hydrate_lifts_top_level_cost_and_latency_into_metrics`, `test_hydrate_preserves_explicit_metrics_when_present`, `test_barge_in_suppressed_before_first_audio_emitted` (Py) + parity TS cases in `tests/dashboard-store.test.ts` and `tests/unit/stream-handler.test.ts`. - Existing `_handle_barge_in`/`handleBargeIn` tests updated to set both timestamps for the new contract. --- CHANGELOG.md | 6 ++ libraries/python/getpatter/dashboard/store.py | 31 ++++++ libraries/python/getpatter/stream_handler.py | 41 +++++++- .../tests/unit/test_metrics_store_hydrate.py | 97 +++++++++++++++---- .../tests/unit/test_stream_handler_unit.py | 54 +++++++++-- libraries/typescript/src/dashboard/store.ts | 44 ++++++++- libraries/typescript/src/stream-handler.ts | 41 +++++++- .../typescript/tests/dashboard-store.test.ts | 70 +++++++++++++ .../tests/unit/stream-handler.test.ts | 24 ++++- 9 files changed, 381 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39a5e30e..c88cea20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## Unreleased +### Fixed + +- **Dashboard hydrate: hydrated calls no longer lose `cost` and `latency`** (Python + TypeScript parity, fully backward-compatible). `CallLogger.log_call_end` writes `cost`, `latency`, `duration_ms`, and `telephony_provider` as **top-level keys** of `metadata.json`, but `MetricsStore.hydrate` (`libraries/python/getpatter/dashboard/store.py:535`, `libraries/typescript/src/dashboard/store.ts:421-424`) read them only from `meta.metrics.cost` / `meta.metrics.latency`. Result: every call rebuilt from disk landed in the store with `metrics=null`, so the local dashboard rendered `$0.00` and `—` for cost/latency on all hydrated rows; only the in-flight call (which never goes through hydrate) showed real numbers. Caught during 0.6.0 acceptance against `releases/0.6.0/typescript/matrix/outbound-cartesia-cerebras-elevenlabs.ts` — 48 of 49 calls in the dashboard had blank P95/cost columns. Fix promotes the top-level fields into a synthesized `metrics` dict (`metrics_from_top_level` / `metricsFromTopLevel`) when `meta.metrics` is missing, mapping `latency.p95_ms` → `metrics.latency_avg.total_ms` so the existing UI fields populate. Explicit `meta.metrics` (legacy/future shape) is preserved untouched. New regressions: `tests/unit/test_metrics_store_hydrate.py::test_hydrate_lifts_top_level_cost_and_latency_into_metrics` + `test_hydrate_preserves_explicit_metrics_when_present` (Py); two `MetricsStore.hydrate` cases in `tests/dashboard-store.test.ts` (TS). Files: `libraries/python/getpatter/dashboard/store.py`, `libraries/typescript/src/dashboard/store.ts`. + +- **Pipeline early barge-in: VAD self-cancellation before TTS first byte arrived** (Python + TypeScript parity, behavioural change for pipeline mode). Cloud TTS providers (ElevenLabs, Cartesia, …) take 200–700 ms to emit the first audio byte. The barge-in anti-flicker gate was anchored on `_speaking_started_at` / `speakingStartedAt` (set inside `_begin_speaking` / `beginSpeaking`), so a 250 ms gate without AEC expired BEFORE TTS produced any audio. VAD then picked up background noise, room ambience, or a "hello?" from the operator and triggered `[VAD] speech_start during TTS → BARGE-IN` → `cancelSpeaking` → `isSpeaking=false` → the `for await (chunk of tts.synthesizeStream(...))` loop exited at `if (!this.isSpeaking) break`, emitting **zero bytes**. From the SDK's perspective the agent "spoke" the first message; from the caller's perspective the line went silent until the next turn. Reproduced on `outbound-cartesia-cerebras-elevenlabs.ts` (call CAfca9c23b22144d4b1bb8ee737dd24016, 47 s — first message never reached the wire). Fix introduces `_first_audio_sent_at` / `firstAudioSentAt`, set in a new `_mark_first_audio_sent` / `markFirstAudioSent` helper invoked AFTER `audio_sender.send_audio` / `bridge.sendAudio` succeeds at all four pipeline emit sites (firstMessage, streaming response, regular response, WebSocket remote). `_can_barge_in` / `canBargeIn` now refuses to open the gate while `_first_audio_sent_at` is null — VAD speech_start before the first wire-time byte is suppressed regardless of how much wall-clock has elapsed since `_begin_speaking`. The 250 ms / 1000 ms gate values are unchanged — only the anchor moves. New regressions: `tests/unit/test_stream_handler_unit.py::test_barge_in_suppressed_before_first_audio_emitted` (Py); `canBargeIn() false before the first TTS chunk has hit the wire` in `tests/unit/stream-handler.test.ts` (TS). Existing `_handle_barge_in` / `handleBargeIn` tests updated to set both timestamps to reflect the new contract. Files: `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`. + ## 0.6.0 (2026-05-08) ### Fixed diff --git a/libraries/python/getpatter/dashboard/store.py b/libraries/python/getpatter/dashboard/store.py index 25ae1b7c..e8921763 100644 --- a/libraries/python/getpatter/dashboard/store.py +++ b/libraries/python/getpatter/dashboard/store.py @@ -507,6 +507,35 @@ def _numeric_subdirs(parent): yield entry +def _metrics_from_top_level(meta: dict[str, Any]) -> dict[str, Any] | None: + """Build a ``metrics`` dict from top-level CallLogger fields. + + ``CallLogger.log_call_end`` writes ``cost`` / ``latency`` / ``duration_ms`` / + ``telephony_provider`` as top-level keys in ``metadata.json``, but the + dashboard UI expects them under ``metrics``. Without this fallback every + hydrated call shows ``$0.00`` and ``—`` for cost and latency. + """ + cost = meta.get("cost") if isinstance(meta.get("cost"), dict) else None + latency = meta.get("latency") if isinstance(meta.get("latency"), dict) else None + duration_ms = meta.get("duration_ms") + telephony = meta.get("telephony_provider") + if cost is None and latency is None and duration_ms is None and not telephony: + return None + out: dict[str, Any] = {} + if cost is not None: + out["cost"] = cost + if latency is not None: + out["latency_avg"] = { + "total_ms": latency.get("p95_ms") or latency.get("p50_ms") or 0 + } + out["latency"] = latency + if isinstance(duration_ms, (int, float)) and duration_ms > 0: + out["duration_seconds"] = float(duration_ms) / 1000.0 + if telephony: + out["telephony_provider"] = telephony + return out or None + + def _metadata_to_call_record( call_id: str, meta: dict[str, Any] ) -> dict[str, Any] | None: @@ -533,6 +562,8 @@ def _to_seconds(raw: Any) -> float | None: return None ended = _to_seconds(meta.get("ended_at")) metrics = meta.get("metrics") if isinstance(meta.get("metrics"), dict) else None + if metrics is None: + metrics = _metrics_from_top_level(meta) transcript = ( meta.get("transcript") if isinstance(meta.get("transcript"), list) else [] ) diff --git a/libraries/python/getpatter/stream_handler.py b/libraries/python/getpatter/stream_handler.py index a1d49cce..d1dbde18 100644 --- a/libraries/python/getpatter/stream_handler.py +++ b/libraries/python/getpatter/stream_handler.py @@ -1641,6 +1641,17 @@ def __init__( # the AEC filter is still converging (~500 ms warmup + safety # margin). self._speaking_started_at: float | None = None + # Wall-clock timestamp (``time.time()`` units) when the FIRST TTS + # audio chunk of the current turn actually reached the carrier wire + # — set by ``_mark_first_audio_sent`` after ``audio_sender.send_audio`` + # succeeds, cleared by ``_begin_speaking`` / barge-in cancels. The + # barge-in gate is anchored to this timestamp instead of + # ``_speaking_started_at`` because cloud TTS providers (ElevenLabs, + # Cartesia, ...) take 200-700 ms to emit the first byte. A gate + # starting at ``_begin_speaking`` would expire on background noise + # before any audio went out, exit the TTS loop on + # ``_is_speaking=False``, and silently drop the agent's first turn. + self._first_audio_sent_at: float | None = None # Monotonic counter incremented at every TTS-start. ``_end_speaking_with_grace`` # captures the value at scheduling time and only flips ``_is_speaking`` to # False if no new turn started in the meantime. Prevents an in-flight grace @@ -1845,6 +1856,7 @@ async def start(self) -> None: if self._aec is not None: self._aec.push_far_end(audio_chunk) await self.audio_sender.send_audio(audio_chunk) + self._mark_first_audio_sent() finally: # Drop any partial int16 byte to prevent cross-turn corruption # if the stream threw before a complete sample was delivered. @@ -2107,6 +2119,7 @@ async def _synthesize_sentence( if self._aec is not None: self._aec.push_far_end(processed_audio) await self.audio_sender.send_audio(processed_audio) + self._mark_first_audio_sent() finally: await gen.aclose() _tts_span.__exit__(None, None, None) @@ -2337,6 +2350,7 @@ async def _handle_barge_in(self, transcript) -> None: ): self._is_speaking = False self._speaking_started_at = None + self._first_audio_sent_at = None # Record cancel timestamp so ``_begin_speaking`` can enforce # a short drain window before the next TTS chunk lands on # top of the cancelled turn's tail (avoids audible "doubled @@ -2702,6 +2716,7 @@ async def on_audio_received(self, audio_bytes: bytes) -> None: # pending grace-flip from the prior turn can't fight us. self._is_speaking = False self._speaking_started_at = None + self._first_audio_sent_at = None self._speaking_generation += 1 # Record cancel timestamp so ``_begin_speaking`` # can enforce a short drain window before the @@ -2813,10 +2828,22 @@ async def _begin_speaking(self) -> None: self._speaking_generation += 1 self._is_speaking = True self._speaking_started_at = time.time() + self._first_audio_sent_at = None # Fresh turn — drop any stale pre-barge-in buffer from a previous # turn so we never replay yesterday's audio to STT. self._inbound_audio_ring = [] + def _mark_first_audio_sent(self) -> None: + """Record that the first TTS chunk of the current turn hit the wire. + + Idempotent within a turn: only the first call sets the timestamp. + Must be invoked AFTER the underlying ``audio_sender.send_audio`` so + the gate is anchored to "audio actually went out", not "we asked + the carrier to send it". Mirrors TS ``markFirstAudioSent``. + """ + if self._first_audio_sent_at is None: + self._first_audio_sent_at = time.time() + def _can_barge_in(self) -> bool: """Whether barge-in is allowed to fire right now. @@ -2832,7 +2859,17 @@ def _can_barge_in(self) -> bool: started_at = getattr(self, "_speaking_started_at", None) if started_at is None: return True - elapsed = time.time() - started_at + # Anchor the gate on "first audio actually emitted", not on + # ``_begin_speaking`` (which fires before the TTS provider's + # first-byte latency has elapsed). Without this guard, background + # noise picked up by VAD ~250 ms after ``_begin_speaking`` triggers + # a self-cancel BEFORE any TTS chunk has reached the wire — the + # agent's first turn becomes silence even though the SDK believes + # it spoke. Mirrors TS ``canBargeIn``. + first_audio_at = getattr(self, "_first_audio_sent_at", None) + if first_audio_at is None: + return False + elapsed = time.time() - first_audio_at gate = ( MIN_AGENT_SPEAKING_S_BEFORE_BARGE_IN_AEC if getattr(self, "_aec", None) is not None @@ -2869,6 +2906,7 @@ async def _end_speaking_with_grace(self) -> None: if grace_ms <= 0: self._is_speaking = False self._speaking_started_at = None + self._first_audio_sent_at = None return gen = self._speaking_generation @@ -2881,6 +2919,7 @@ async def _flip_after_grace() -> None: if self._speaking_generation == gen: self._is_speaking = False self._speaking_started_at = None + self._first_audio_sent_at = None except asyncio.CancelledError: # pragma: no cover raise except Exception as exc: # pragma: no cover - defensive diff --git a/libraries/python/tests/unit/test_metrics_store_hydrate.py b/libraries/python/tests/unit/test_metrics_store_hydrate.py index f54c0cba..447a7eeb 100644 --- a/libraries/python/tests/unit/test_metrics_store_hydrate.py +++ b/libraries/python/tests/unit/test_metrics_store_hydrate.py @@ -7,7 +7,7 @@ from __future__ import annotations import json -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from pathlib import Path import pytest @@ -67,9 +67,7 @@ def test_rebuilds_call_list_from_disk(tmp_path: Path) -> None: def test_idempotent_on_re_hydrate(tmp_path: Path) -> None: - _build_fixture( - tmp_path, [{"id": "CA-1", "iso": "2026-04-26T15:00:00.000Z"}] - ) + _build_fixture(tmp_path, [{"id": "CA-1", "iso": "2026-04-26T15:00:00.000Z"}]) store = MetricsStore() assert store.hydrate(str(tmp_path)) == 1 assert store.hydrate(str(tmp_path)) == 0 @@ -77,9 +75,7 @@ def test_idempotent_on_re_hydrate(tmp_path: Path) -> None: def test_tolerates_corrupt_metadata(tmp_path: Path) -> None: - _build_fixture( - tmp_path, [{"id": "CA-good", "iso": "2026-04-26T15:00:00.000Z"}] - ) + _build_fixture(tmp_path, [{"id": "CA-good", "iso": "2026-04-26T15:00:00.000Z"}]) bad_dir = tmp_path / "calls" / "2026" / "04" / "26" / "CA-bad" bad_dir.mkdir(parents=True, exist_ok=True) (bad_dir / "metadata.json").write_text("{ not valid json", encoding="utf-8") @@ -92,10 +88,7 @@ def test_tolerates_corrupt_metadata(tmp_path: Path) -> None: def test_respects_max_calls(tmp_path: Path) -> None: _build_fixture( tmp_path, - [ - {"id": f"CA-{i}", "iso": f"2026-04-26T15:0{i}:00.000Z"} - for i in range(7) - ], + [{"id": f"CA-{i}", "iso": f"2026-04-26T15:0{i}:00.000Z"} for i in range(7)], ) store = MetricsStore(max_calls=3) assert store.hydrate(str(tmp_path)) == 7 @@ -108,9 +101,7 @@ def test_respects_max_calls(tmp_path: Path) -> None: @pytest.mark.parametrize("invalid_name", ["not_numeric", ".DS_Store"]) def test_skips_non_numeric_directory_layers(tmp_path: Path, invalid_name: str) -> None: """Stray non-numeric YYYY/MM/DD entries must not break the walk.""" - _build_fixture( - tmp_path, [{"id": "CA-only", "iso": "2026-04-26T15:00:00.000Z"}] - ) + _build_fixture(tmp_path, [{"id": "CA-only", "iso": "2026-04-26T15:00:00.000Z"}]) (tmp_path / "calls" / invalid_name).mkdir(parents=True, exist_ok=True) store = MetricsStore() assert store.hydrate(str(tmp_path)) == 1 @@ -119,9 +110,7 @@ def test_skips_non_numeric_directory_layers(tmp_path: Path, invalid_name: str) - def test_skips_records_with_unparseable_started_at(tmp_path: Path) -> None: """A malformed ``started_at`` must NOT land in the store as epoch 0, which would corrupt every sort/range query that depends on it.""" - _build_fixture( - tmp_path, [{"id": "CA-good", "iso": "2026-04-26T15:00:00.000Z"}] - ) + _build_fixture(tmp_path, [{"id": "CA-good", "iso": "2026-04-26T15:00:00.000Z"}]) bad_dir = tmp_path / "calls" / "2026" / "04" / "26" / "CA-bad" bad_dir.mkdir(parents=True, exist_ok=True) (bad_dir / "metadata.json").write_text( @@ -168,3 +157,77 @@ def test_accepts_numeric_unix_seconds_timestamps(tmp_path: Path) -> None: listed = store.get_calls() assert listed[0]["started_at"] == 1745683200.0 assert listed[0]["ended_at"] == 1745683230.0 + + +def test_hydrate_lifts_top_level_cost_and_latency_into_metrics(tmp_path: Path) -> None: + """``CallLogger.log_call_end`` writes ``cost`` / ``latency`` / ``duration_ms`` + at the top of metadata.json (no ``metrics`` key). The hydrate path must + promote those into ``metrics`` so the dashboard renders cost and latency + instead of ``$0.00`` / ``—`` for hydrated calls. + """ + call_dir = tmp_path / "calls" / "2026" / "05" / "08" / "CA-real-shape" + call_dir.mkdir(parents=True, exist_ok=True) + (call_dir / "metadata.json").write_text( + json.dumps( + { + "schema_version": "1.0", + "call_id": "CA-real-shape", + "started_at": "2026-05-08T23:33:00.000Z", + "ended_at": "2026-05-08T23:33:57.000Z", + "duration_ms": 57400, + "status": "completed", + "caller": "", + "callee": "", + "telephony_provider": "twilio", + "provider_mode": "pipeline", + "agent": {"provider": "pipeline", "language": "en"}, + "turns": 9, + "cost": { + "stt": 0.001526, + "tts": 0.02988, + "llm": 0.000406, + "telephony": 0.0085, + "total": 0.040312, + }, + "latency": {"p50_ms": 2127.7, "p95_ms": 3461.7, "p99_ms": 3640.1}, + "error": None, + } + ), + encoding="utf-8", + ) + + store = MetricsStore() + assert store.hydrate(str(tmp_path)) == 1 + rec = store.get_calls()[0] + metrics = rec["metrics"] + assert metrics is not None + assert metrics["cost"]["total"] == pytest.approx(0.040312) + assert metrics["latency"]["p95_ms"] == pytest.approx(3461.7) + assert metrics["latency_avg"]["total_ms"] == pytest.approx(3461.7) + assert metrics["duration_seconds"] == pytest.approx(57.4) + assert metrics["telephony_provider"] == "twilio" + + +def test_hydrate_preserves_explicit_metrics_when_present(tmp_path: Path) -> None: + """If a metadata.json already has ``metrics`` (legacy or future shape) we + must NOT overwrite it with the top-level fallback. + """ + call_dir = tmp_path / "calls" / "2026" / "05" / "08" / "CA-explicit" + call_dir.mkdir(parents=True, exist_ok=True) + (call_dir / "metadata.json").write_text( + json.dumps( + { + "call_id": "CA-explicit", + "started_at": "2026-05-08T10:00:00Z", + "metrics": {"cost": {"total": 0.999}, "marker": "kept"}, + "cost": {"total": 0.001}, + "latency": {"p95_ms": 9999}, + } + ), + encoding="utf-8", + ) + store = MetricsStore() + assert store.hydrate(str(tmp_path)) == 1 + metrics = store.get_calls()[0]["metrics"] + assert metrics["marker"] == "kept" + assert metrics["cost"]["total"] == pytest.approx(0.999) diff --git a/libraries/python/tests/unit/test_stream_handler_unit.py b/libraries/python/tests/unit/test_stream_handler_unit.py index 45350558..8d2f7328 100644 --- a/libraries/python/tests/unit/test_stream_handler_unit.py +++ b/libraries/python/tests/unit/test_stream_handler_unit.py @@ -470,9 +470,13 @@ async def test_barge_in_suppressed_during_aec_warmup(self) -> None: handler._aec = object() # Emulate ``_begin_speaking`` having just run — agent has been # speaking for less than the gate. - handler._speaking_started_at = time.time() - ( - MIN_AGENT_SPEAKING_S_BEFORE_BARGE_IN_AEC / 2 - ) + # First audio chunk reached the wire at ``half_gate`` ago — so the + # gate measured from ``_first_audio_sent_at`` is still inside the + # warmup window (post-fix anchor; was anchored on + # ``_speaking_started_at`` pre-0.6.2). + half_gate = MIN_AGENT_SPEAKING_S_BEFORE_BARGE_IN_AEC / 2 + handler._speaking_started_at = time.time() - half_gate + handler._first_audio_sent_at = time.time() - half_gate await handler._handle_barge_in( Transcript(text="hold on", is_final=True, speech_final=True) @@ -503,9 +507,9 @@ async def test_barge_in_fires_after_warmup_window(self) -> None: handler.audio_sender.send_clear = AsyncMock() handler._llm_cancel_event = asyncio.Event() handler._aec = object() - handler._speaking_started_at = time.time() - ( - MIN_AGENT_SPEAKING_S_BEFORE_BARGE_IN_AEC + 0.1 - ) + past_gate = MIN_AGENT_SPEAKING_S_BEFORE_BARGE_IN_AEC + 0.1 + handler._speaking_started_at = time.time() - past_gate + handler._first_audio_sent_at = time.time() - past_gate await handler._handle_barge_in( Transcript(text="hold on", is_final=True, speech_final=True) @@ -535,6 +539,7 @@ async def test_barge_in_fires_at_400ms_when_aec_off(self) -> None: # AEC OFF (PSTN default) — gate is 0.25 s. handler._aec = None handler._speaking_started_at = time.time() - 0.4 + handler._first_audio_sent_at = time.time() - 0.4 await handler._handle_barge_in( Transcript(text="stop", is_final=True, speech_final=True) @@ -563,6 +568,7 @@ async def test_barge_in_suppressed_within_anti_flicker_when_aec_off( handler._llm_cancel_event = asyncio.Event() handler._aec = None handler._speaking_started_at = time.time() - 0.1 + handler._first_audio_sent_at = time.time() - 0.1 await handler._handle_barge_in( Transcript(text="stop", is_final=True, speech_final=True) @@ -573,6 +579,42 @@ async def test_barge_in_suppressed_within_anti_flicker_when_aec_off( ) assert handler._is_speaking is True + async def test_barge_in_suppressed_before_first_audio_emitted(self) -> None: + """0.6.2 fix: ElevenLabs (and other cloud TTS) take 200-700 ms to + emit the first byte. While ``_begin_speaking`` has fired but the + first chunk has NOT yet hit the wire, VAD picking up background + noise ("hello?", breath, room ambience) must NOT trigger a + self-cancel. Pre-fix, a 250 ms anti-flicker gate measured from + ``_begin_speaking`` expired BEFORE TTS emitted any audio, + cancelling the agent's first turn before a single byte left. + """ + from getpatter.stream_handler import PipelineStreamHandler + from getpatter.providers.base import Transcript + import time + + handler = object.__new__(PipelineStreamHandler) + handler._is_speaking = True + handler.metrics = None + handler.call_id = "test-call" + handler.audio_sender = MagicMock() + handler.audio_sender.send_clear = AsyncMock() + handler._llm_cancel_event = asyncio.Event() + handler._aec = None # PSTN default, no AEC warmup gate + # ``_begin_speaking`` ran 500 ms ago — well past the 250 ms gate + # that pre-fix would let barge-in through. But TTS still hasn't + # emitted the first chunk (cloud provider first-byte latency). + handler._speaking_started_at = time.time() - 0.5 + handler._first_audio_sent_at = None + + await handler._handle_barge_in( + Transcript(text="hello?", is_final=True, speech_final=True) + ) + + assert not handler._llm_cancel_event.is_set(), ( + "barge-in must be suppressed until at least one TTS chunk has hit the wire" + ) + assert handler._is_speaking is True + async def test_consume_loop_breaks_when_cancel_event_set_mid_stream( self, ) -> None: diff --git a/libraries/typescript/src/dashboard/store.ts b/libraries/typescript/src/dashboard/store.ts index 8af06e74..24ca22cb 100644 --- a/libraries/typescript/src/dashboard/store.ts +++ b/libraries/typescript/src/dashboard/store.ts @@ -404,6 +404,48 @@ export class MetricsStore extends EventEmitter { } } +/** + * Build a ``metrics`` object from top-level CallLogger fields. ``CallLogger`` + * writes ``cost`` / ``latency`` / ``duration_ms`` / ``telephony_provider`` at + * the top of ``metadata.json``, but the dashboard UI reads them from + * ``metrics``. Without this fallback every hydrated call shows ``$0.00`` and + * ``—`` for cost and latency. + */ +function metricsFromTopLevel( + meta: Record, +): Record | null { + const cost = + meta.cost && typeof meta.cost === 'object' + ? (meta.cost as Record) + : null; + const latency = + meta.latency && typeof meta.latency === 'object' + ? (meta.latency as Record) + : null; + const durationMs = meta.duration_ms; + const telephony = meta.telephony_provider; + if (cost === null && latency === null && durationMs == null && !telephony) { + return null; + } + const out: Record = {}; + if (cost !== null) out.cost = cost; + if (latency !== null) { + const totalMs = + (typeof latency.p95_ms === 'number' && latency.p95_ms) || + (typeof latency.p50_ms === 'number' && latency.p50_ms) || + 0; + out.latency_avg = { total_ms: totalMs }; + out.latency = latency; + } + if (typeof durationMs === 'number' && durationMs > 0) { + out.duration_seconds = durationMs / 1000; + } + if (typeof telephony === 'string' && telephony) { + out.telephony_provider = telephony; + } + return Object.keys(out).length > 0 ? out : null; +} + /** * Translate a CallLogger ``metadata.json`` payload into a ``CallRecord``. * Returns ``null`` when ``started_at`` is missing or unparseable — the record @@ -421,7 +463,7 @@ function metadataToCallRecord( const metrics = meta.metrics && typeof meta.metrics === 'object' ? (meta.metrics as Record) - : null; + : metricsFromTopLevel(meta); const transcript = Array.isArray(meta.transcript) ? (meta.transcript as CallRecord['transcript']) : []; diff --git a/libraries/typescript/src/stream-handler.ts b/libraries/typescript/src/stream-handler.ts index d6fc4cfe..240d190a 100644 --- a/libraries/typescript/src/stream-handler.ts +++ b/libraries/typescript/src/stream-handler.ts @@ -250,6 +250,17 @@ export class StreamHandler { * sentence. */ private speakingStartedAt: number | null = null; + /** + * Wall-clock (ms) when the FIRST TTS audio chunk actually reached the + * carrier wire — set in ``markFirstAudioSent`` after ``bridge.sendAudio`` + * succeeds, cleared by ``beginSpeaking`` / ``cancelSpeaking``. The barge-in + * gate measures elapsed from this instant, NOT from ``speakingStartedAt``, + * because ElevenLabs (and other cloud TTS) take 200-700 ms to emit the + * first byte. A gate anchored to ``beginSpeaking`` would expire on + * background noise before any audio went out, exit the TTS loop on + * ``isSpeaking=false``, and silently cut the agent's first turn. + */ + private firstAudioSentAt: number | null = null; /** * Minimum wall-clock duration (ms) the agent must have been speaking * before barge-in is allowed to fire when AEC is active. Covers the @@ -311,11 +322,25 @@ export class StreamHandler { this.speakingGeneration++; this.isSpeaking = true; this.speakingStartedAt = Date.now(); + this.firstAudioSentAt = null; // Fresh turn — drop any stale pre-barge-in buffer from a previous turn // so we never replay yesterday's audio to STT. this.inboundAudioRing = []; } + /** + * Record that the first TTS audio chunk of the current turn has hit the + * carrier wire. Idempotent within a turn — only the first call sets the + * timestamp; later chunks are no-ops. Must be invoked AFTER the underlying + * ``bridge.sendAudio`` resolves so the gate is anchored to "audio actually + * went out", not "we asked the carrier to send it". + */ + private markFirstAudioSent(): void { + if (this.firstAudioSentAt === null) { + this.firstAudioSentAt = Date.now(); + } + } + /** * Atomically end speaking AND invalidate any pending grace timer. * Use instead of ``this.isSpeaking = false`` at barge-in sites. @@ -327,6 +352,7 @@ export class StreamHandler { this.speakingGeneration++; // invalidates pending grace timers this.isSpeaking = false; this.speakingStartedAt = null; + this.firstAudioSentAt = null; this.lastCancelAt = Date.now(); if (this.llmAbort !== null) { try { @@ -372,11 +398,13 @@ export class StreamHandler { if (this.speakingGeneration === gen) { this.isSpeaking = false; this.speakingStartedAt = null; + this.firstAudioSentAt = null; } }, grace); } else { this.isSpeaking = false; this.speakingStartedAt = null; + this.firstAudioSentAt = null; } } @@ -387,7 +415,14 @@ export class StreamHandler { */ private canBargeIn(): boolean { if (this.speakingStartedAt === null) return true; - const elapsed = Date.now() - this.speakingStartedAt; + // Anchor the gate on "first audio actually emitted", not on + // ``beginSpeaking`` (which fires before the TTS provider's first-byte + // latency has elapsed). Without this guard, background noise picked up + // by VAD ~250 ms after ``beginSpeaking`` triggers a self-cancel BEFORE + // any TTS chunk has reached the wire — the agent's first turn becomes + // silence even though the SDK believes it spoke. + if (this.firstAudioSentAt === null) return false; + const elapsed = Date.now() - this.firstAudioSentAt; const gate = this.aec ? StreamHandler.MIN_AGENT_SPEAKING_MS_BEFORE_BARGE_IN_AEC : StreamHandler.MIN_AGENT_SPEAKING_MS_BEFORE_BARGE_IN_NO_AEC; @@ -1236,6 +1271,7 @@ export class StreamHandler { } const encoded = this.encodePipelineAudio(chunk); this.deps.bridge.sendAudio(this.ws, encoded, this.streamSid); + this.markFirstAudioSent(); } } catch (e) { getLogger().error(`First message TTS error (${label}):`, e); @@ -1364,6 +1400,7 @@ export class StreamHandler { } const encoded = this.encodePipelineAudio(processedAudio); this.deps.bridge.sendAudio(this.ws, encoded, this.streamSid); + this.markFirstAudioSent(); } } catch (e) { getLogger().error(`TTS streaming error (${this.deps.bridge.label}):`, e); @@ -1796,6 +1833,7 @@ export class StreamHandler { if (!wsTtsStarted) { wsTtsStarted = true; this.metricsAcc.recordTtsFirstByte(); await this.emitAudioOut(); } const encoded = this.encodePipelineAudio(audioChunk); this.deps.bridge.sendAudio(this.ws, encoded, this.streamSid); + this.markFirstAudioSent(); } } } @@ -1975,6 +2013,7 @@ export class StreamHandler { // reusing it on the outbound path corrupts both directions. const outAudio = eventData; this.deps.bridge.sendAudio(this.ws, outAudio.toString('base64'), this.streamSid); + this.markFirstAudioSent(); // Send mark for barge-in accuracy. this.chunkCount++; this.deps.bridge.sendMark(this.ws, `audio_${this.chunkCount}`, this.streamSid); diff --git a/libraries/typescript/tests/dashboard-store.test.ts b/libraries/typescript/tests/dashboard-store.test.ts index 62125da8..5c9e5825 100644 --- a/libraries/typescript/tests/dashboard-store.test.ts +++ b/libraries/typescript/tests/dashboard-store.test.ts @@ -326,4 +326,74 @@ describe('MetricsStore.hydrate', () => { fs.rmSync(root, { recursive: true, force: true }); } }); + + it('lifts top-level cost/latency/duration into metrics (CallLogger schema)', () => { + // CallLogger.logCallEnd writes cost/latency/duration_ms/telephony_provider + // at the top of metadata.json — without this fallback hydrated calls show + // $0.00 / "—" in the dashboard because the UI reads from metrics.cost etc. + const root = fs.mkdtempSync(`${os.tmpdir()}/patter-store-test-`); + try { + const callDir = `${root}/calls/2026/05/08/CA-real-shape`; + fs.mkdirSync(callDir, { recursive: true }); + fs.writeFileSync( + `${callDir}/metadata.json`, + JSON.stringify({ + schema_version: '1.0', + call_id: 'CA-real-shape', + started_at: '2026-05-08T23:33:00.000Z', + ended_at: '2026-05-08T23:33:57.000Z', + duration_ms: 57400, + status: 'completed', + telephony_provider: 'twilio', + provider_mode: 'pipeline', + turns: 9, + cost: { + stt: 0.001526, + tts: 0.02988, + llm: 0.000406, + telephony: 0.0085, + total: 0.040312, + }, + latency: { p50_ms: 2127.7, p95_ms: 3461.7, p99_ms: 3640.1 }, + }), + ); + const store = new MetricsStore(); + expect(store.hydrate(root)).toBe(1); + const rec = store.getCalls()[0]; + expect(rec.metrics).not.toBeNull(); + const m = rec.metrics as Record; + expect((m.cost as Record).total).toBeCloseTo(0.040312, 6); + expect((m.latency as Record).p95_ms).toBeCloseTo(3461.7); + expect((m.latency_avg as Record).total_ms).toBeCloseTo(3461.7); + expect(m.duration_seconds).toBeCloseTo(57.4); + expect(m.telephony_provider).toBe('twilio'); + } finally { + fs.rmSync(root, { recursive: true, force: true }); + } + }); + + it('preserves explicit metrics when present (does not overwrite with top-level)', () => { + const root = fs.mkdtempSync(`${os.tmpdir()}/patter-store-test-`); + try { + const callDir = `${root}/calls/2026/05/08/CA-explicit`; + fs.mkdirSync(callDir, { recursive: true }); + fs.writeFileSync( + `${callDir}/metadata.json`, + JSON.stringify({ + call_id: 'CA-explicit', + started_at: '2026-05-08T10:00:00Z', + metrics: { cost: { total: 0.999 }, marker: 'kept' }, + cost: { total: 0.001 }, + latency: { p95_ms: 9999 }, + }), + ); + const store = new MetricsStore(); + expect(store.hydrate(root)).toBe(1); + const m = store.getCalls()[0].metrics as Record; + expect(m.marker).toBe('kept'); + expect((m.cost as Record).total).toBeCloseTo(0.999); + } finally { + fs.rmSync(root, { recursive: true, force: true }); + } + }); }); diff --git a/libraries/typescript/tests/unit/stream-handler.test.ts b/libraries/typescript/tests/unit/stream-handler.test.ts index 1cb34cfb..3cd32c3d 100644 --- a/libraries/typescript/tests/unit/stream-handler.test.ts +++ b/libraries/typescript/tests/unit/stream-handler.test.ts @@ -406,6 +406,7 @@ describe('StreamHandler', () => { return h as unknown as { isSpeaking: boolean; speakingStartedAt: number | null; + firstAudioSentAt: number | null; aec: unknown; canBargeIn: () => boolean; handleBargeIn: (t: { text?: string }) => boolean; @@ -421,6 +422,20 @@ describe('StreamHandler', () => { expect(p.canBargeIn()).toBe(true); }); + it('canBargeIn() false before the first TTS chunk has hit the wire', () => { + // 0.6.2 fix: ElevenLabs first-byte latency is hundreds of ms. Pre-fix + // a 250 ms gate measured from beginSpeaking expired before any audio + // went out, letting background noise self-cancel the agent's first + // turn. Post-fix the gate is anchored on firstAudioSentAt — if that's + // null we are still waiting for the TTS provider's first byte. + const h = new StreamHandler(makeDeps(), makeMockWs(), '+15551111111', '+15552222222'); + const p = priv(h); + p.aec = null; + p.speakingStartedAt = Date.now() - 5000; // long past the 250 ms gate + p.firstAudioSentAt = null; // but no audio has gone out yet + expect(p.canBargeIn()).toBe(false); + }); + // ----------------------------------------------------------------------- // AEC OFF (default — PSTN deployments). Gate is 250 ms. // ----------------------------------------------------------------------- @@ -430,6 +445,7 @@ describe('StreamHandler', () => { const p = priv(h); p.aec = null; p.speakingStartedAt = Date.now() - 100; + p.firstAudioSentAt = Date.now() - 100; expect(p.canBargeIn()).toBe(false); }); @@ -437,7 +453,8 @@ describe('StreamHandler', () => { const h = new StreamHandler(makeDeps(), makeMockWs(), '+15551111111', '+15552222222'); const p = priv(h); p.aec = null; - p.speakingStartedAt = Date.now() - 400; // 400 ms — past 250 ms, under 1 s + p.speakingStartedAt = Date.now() - 400; + p.firstAudioSentAt = Date.now() - 400; // 400 ms — past 250 ms, under 1 s expect(p.canBargeIn()).toBe(true); }); @@ -448,6 +465,7 @@ describe('StreamHandler', () => { p.aec = null; p.isSpeaking = true; p.speakingStartedAt = Date.now() - 400; + p.firstAudioSentAt = Date.now() - 400; const result = p.handleBargeIn({ text: 'stop' }); expect(result).toBe(true); expect(p.isSpeaking).toBe(false); @@ -467,6 +485,7 @@ describe('StreamHandler', () => { const p = priv(h); p.aec = aecSentinel; p.speakingStartedAt = Date.now() - 400; // would PASS with AEC off + p.firstAudioSentAt = Date.now() - 400; expect(p.canBargeIn()).toBe(false); }); @@ -475,6 +494,7 @@ describe('StreamHandler', () => { const p = priv(h); p.aec = aecSentinel; p.speakingStartedAt = Date.now() - 1200; + p.firstAudioSentAt = Date.now() - 1200; expect(p.canBargeIn()).toBe(true); }); @@ -484,6 +504,7 @@ describe('StreamHandler', () => { p.aec = aecSentinel; p.isSpeaking = true; p.speakingStartedAt = Date.now() - 400; + p.firstAudioSentAt = Date.now() - 400; const result = p.handleBargeIn({ text: 'stop' }); expect(result).toBe(false); expect(p.isSpeaking).toBe(true); @@ -495,6 +516,7 @@ describe('StreamHandler', () => { p.aec = aecSentinel; p.isSpeaking = true; p.speakingStartedAt = Date.now() - 1500; + p.firstAudioSentAt = Date.now() - 1500; const result = p.handleBargeIn({ text: 'stop' }); expect(result).toBe(true); expect(p.isSpeaking).toBe(false);