From 9aa31362e84d9fd68a60cb32eb40bd28eab4d80b Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Tue, 12 May 2026 20:01:12 +0200 Subject: [PATCH] fix(0.6.1): pipeline firstMessage barge-in aborts TTS stream + unblocks LLM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the user interrupted the agent during the firstMessage in pipeline mode (Deepgram STT + LLM + ElevenLabs WS TTS), the existing barge-in cancel flipped ``isSpeaking`` / ``_is_speaking`` to ``False`` but the ``for await`` / ``async for`` consuming ``tts.synthesizeStream`` / ``tts.synthesize`` stayed suspended on the next-frame wait (``ws.recv()``). The check at the top of the loop body never re-ran, the provider WS sat idle until ``FRAME_TIMEOUT_MS`` (30 s on ElevenLabs WS TTS), and the "speaking lock" was never released — subsequent Deepgram finals were captured but the LLM dispatch path never fired, leaving the call silent for the user. Fix (parity Py/TS): * Add ``firstMessageAbort`` (TS, ``AbortController``) / ``_first_message_abort`` (Py, ``asyncio.Event``) raced against the iterator's ``next()`` / ``__anext__``. * Add an optional ``cancel()`` hook on the TTS adapter interface. Implementation in ``ElevenLabsWebSocketTTS`` (both SDKs) closes the in-flight WS (``activeSocket`` / ``_active_socket``) so the next-frame wait unblocks via ``ConnectionClosed`` within one event-loop tick. * Use the manual iterator protocol in the firstMessage loop so we can race ``next()`` with the abort signal and call ``iter.return()`` / ``agen.aclose()`` on abort, ensuring the generator's ``finally`` runs and closes the WS. * Regression tests in both SDKs: standalone iterator race + ``_handle_barge_in`` must invoke ``tts.cancel`` when available and set the abort event. CHANGELOG entry added under ``## 0.6.1 (2026-05-12)``. --- CHANGELOG.md | 4 + .../getpatter/providers/elevenlabs_ws_tts.py | 59 +++++- libraries/python/getpatter/stream_handler.py | 147 ++++++++++++--- .../tests/unit/test_stream_handler_unit.py | 176 ++++++++++++++++++ libraries/typescript/src/provider-factory.ts | 11 ++ .../src/providers/elevenlabs-ws-tts.ts | 42 +++++ libraries/typescript/src/stream-handler.ts | 132 +++++++++++-- .../tests/unit/stream-handler.test.ts | 108 +++++++++++ 8 files changed, 635 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1a9088..bb0fd51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## 0.6.1 (2026-05-12) +### Fixed — Barge-in during firstMessage in pipeline mode now aborts the TTS synthesize stream and unblocks subsequent LLM turn dispatch + +When the user interrupted the agent during the firstMessage in pipeline mode (Deepgram STT + LLM + ElevenLabs WS TTS), `cancelSpeaking` / `_do_cancel_for_barge_in` flipped `isSpeaking` / `_is_speaking` to `false` but the `for await` / `async for` consuming `tts.synthesizeStream` / `tts.synthesize` stayed suspended on the next-frame wait (`ws.recv()` on the provider socket). The check at the top of the loop body never re-ran, the provider WS sat idle until `FRAME_TIMEOUT_MS` (30 s on ElevenLabs WS TTS), and the "speaking lock" was never released — so subsequent user transcripts were captured by Deepgram but the LLM dispatch path never fired and the call went silent. Fix: added `firstMessageAbort` (TS, `AbortController`) / `_first_message_abort` (Py, `asyncio.Event`) raced against the iterator's `next()` / `__anext__`, plus an optional `cancel()` hook on the TTS adapter that closes the in-flight WS (`activeSocket` / `_active_socket`) so the next-frame wait unblocks within one event-loop tick. The generator's `finally` then runs cleanly and the call resumes normally. Files: `libraries/typescript/src/stream-handler.ts`, `libraries/typescript/src/provider-factory.ts`, `libraries/typescript/src/providers/elevenlabs-ws-tts.ts`, `libraries/python/getpatter/stream_handler.py`, `libraries/python/getpatter/providers/elevenlabs_ws_tts.py`. + ### Changed — `StreamHandler` adopt-capability check now uses duck typing The TS realtime adopt branch in `stream-handler.ts` previously relied on `this.adapter instanceof OpenAIRealtimeAdapter` to gate the prewarm-handoff path. Switched to a duck-type check (`typeof adapter.adoptWebSocket === 'function'`) so the generic stream-handler module stays provider-agnostic on this hot path and matches the Python handler's `getattr(self._adapter, "adopt_websocket", None)` shape. Files: `libraries/typescript/src/stream-handler.ts`. diff --git a/libraries/python/getpatter/providers/elevenlabs_ws_tts.py b/libraries/python/getpatter/providers/elevenlabs_ws_tts.py index a5c3e6e..3ac347c 100644 --- a/libraries/python/getpatter/providers/elevenlabs_ws_tts.py +++ b/libraries/python/getpatter/providers/elevenlabs_ws_tts.py @@ -226,6 +226,15 @@ def __init__( # send) instead of opening a fresh socket. The slot is # consumed exactly once. self._adopted_connection: Optional[ElevenLabsParkedWS] = None + # Currently in-flight WebSocket inside :meth:`synthesize`. + # Captured at the top of the generator and cleared in the + # ``finally`` block. :meth:`cancel` inspects this handle and + # closes the socket so a barge-in that occurs while the + # iterator is suspended on ``ws.recv()`` can unblock the loop + # within one event-loop tick. Without this, the wait stays + # pending until ``frame_timeout`` (30 s) and the call goes + # silent for the user. Mirrors TS ``activeSocket``. + self._active_socket = None @property def api_key(self) -> str: @@ -426,6 +435,9 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]: ), timeout=self.open_timeout, ) + # Publish the active socket so :meth:`cancel` from outside can + # close it and unblock the next-frame wait. Cleared in finally. + self._active_socket = ws try: # Initial keep-alive packet establishes the session. Per the # ElevenLabs docs the first message must contain a single space @@ -443,7 +455,7 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]: # after the consumer drains, which serves as the EOS. await ws.send(json.dumps({"text": text + " ", "flush": True})) - from websockets.exceptions import ConnectionClosedOK + from websockets.exceptions import ConnectionClosed, ConnectionClosedOK while True: try: @@ -455,6 +467,11 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]: except ConnectionClosedOK: # Server closed cleanly — treat as end-of-stream. return + except ConnectionClosed: + # Local :meth:`cancel` closed the socket from outside — + # treat as end-of-stream so the generator's ``finally`` + # runs and the consumer exits within one tick. + return # WebSocket frames may be text (JSON) or bytes (rare — # some deployments may emit binary audio frames directly). @@ -507,6 +524,11 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]: if msg.get("isFinal"): return finally: + # Clear the active-socket handle BEFORE closing so a racing + # :meth:`cancel` from a parallel coroutine doesn't try to + # close a socket we are already tearing down. + if self._active_socket is ws: + self._active_socket = None # Best-effort: tell the server to stop synthesising any # buffered text the consumer is no longer interested in. # Failure to send is non-fatal — the socket close below @@ -520,6 +542,41 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]: except Exception: pass + def cancel(self) -> None: + """Force-close the currently in-flight synthesis socket, if any. + + Called by the stream handler on barge-in during a firstMessage + synth: the generator's frame wait is unblocked by the resulting + ``ConnectionClosed`` exception, which then drives the loop into + its ``finally`` for a clean teardown. No-op when no synth is in + flight. Idempotent — safe to call repeatedly. + + Implemented as a sync method (not a coroutine) so the cancel + path can be invoked from any context — including inside the + synchronous portion of :func:`_do_cancel_for_barge_in` where an + ``await`` would change call ordering. The actual ``ws.close`` + is scheduled on the running event loop and awaited best-effort + by the in-flight generator's ``finally``. + """ + ws = self._active_socket + if ws is None: + return + self._active_socket = None + try: + loop = asyncio.get_event_loop() + loop.create_task(ws.close()) + except Exception: + # No running loop or other transient failure — try the + # synchronous low-level transport fallback. The TCP socket + # close still wakes the pending ``ws.recv()`` with a + # ``ConnectionClosed`` on the next event-loop tick. + try: + transport = getattr(ws, "transport", None) + if transport is not None: + transport.close() + except Exception: + pass + async def warmup(self) -> None: """Pre-call WebSocket warmup for the ElevenLabs ``/stream-input`` endpoint. diff --git a/libraries/python/getpatter/stream_handler.py b/libraries/python/getpatter/stream_handler.py index 520fc44..f436d50 100644 --- a/libraries/python/getpatter/stream_handler.py +++ b/libraries/python/getpatter/stream_handler.py @@ -1902,6 +1902,17 @@ def __init__( # generic ``audio_*`` marks the Realtime path sends so the two paths # can coexist without name collisions. self._first_message_mark_counter: int = 0 + # Per-call abort event for the firstMessage ``tts.synthesize`` loop. + # Set by ``_do_cancel_for_barge_in`` so a barge-in DURING the agent's + # first utterance (pipeline mode) breaks out of the async iterator + # immediately, runs the generator's ``finally`` (which closes the + # provider WS), and frees the dispatch path for the user's next + # turn. Without this, the ``async for`` was suspended on + # ``ws.recv()`` while ``_is_speaking`` had already flipped to False + # — leaving the TTS WS in flight until ``frame_timeout`` (30 s on + # ElevenLabs WS TTS) and the call silent for the user. Mirrors TS + # ``firstMessageAbort``. + self._first_message_abort: asyncio.Event | None = None async def start(self) -> None: """Initialize STT/TTS providers, hooks, and start the STT receive loop.""" @@ -2171,41 +2182,103 @@ async def _connect_stt() -> None: except Exception as exc: # noqa: BLE001 - best-effort logger.debug("pop_prewarm_audio raised: %s", exc) prewarm_bytes = None + # Arm a fresh abort event so a barge-in during the firstMessage + # synth path can break the async iterator immediately rather + # than hanging on the next ``__anext__`` until + # ``frame_timeout`` (30 s on ElevenLabs WS TTS). Combined + # with ``tts.cancel()`` (closes the provider WS) the loop + # exits within one event-loop tick. + self._first_message_abort = asyncio.Event() + fm_abort = self._first_message_abort try: if prewarm_bytes: if self.metrics is not None: self.metrics.record_tts_first_byte() first_chunk_sent = await self._stream_prewarm_bytes(prewarm_bytes) else: - async for audio_chunk in self._tts.synthesize( - self.agent.first_message - ): - if not self._is_speaking: - break # barge-in or test-hangup - if not first_chunk_sent: - first_chunk_sent = True - if self.metrics is not None: - self.metrics.record_tts_first_byte() - # BUG #128: route every TTS chunk through the paced - # sender so we never push more than - # ``_FIRST_MESSAGE_MARK_WINDOW`` chunks of audio - # ahead of carrier playback. The previous burst-send - # let Twilio's outbound buffer reach several - # seconds — a barge-in's send_clear race-lost - # against the queued media frames and the agent - # kept talking on the user's earpiece for up to - # ~2 s after the user spoke. The paced sender also - # drives the AEC far-end tap and - # ``_mark_first_audio_sent`` internally so this - # path stays parity-clean with - # ``_stream_prewarm_bytes``. - sent = await self._send_paced_first_message_bytes(audio_chunk) - if not sent: - break + # Use the manual async-iterator protocol so we can race + # ``__anext__`` against the abort event. ``async for`` + # does not surface a hook to cancel a pending + # ``__anext__`` from outside. + agen = self._tts.synthesize(self.agent.first_message) + aiter = agen.__aiter__() + try: + while True: + if not self._is_speaking or fm_abort.is_set(): + break + next_task = asyncio.ensure_future(aiter.__anext__()) + abort_task = asyncio.ensure_future(fm_abort.wait()) + done, pending = await asyncio.wait( + {next_task, abort_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + if abort_task in done and next_task not in done: + # Cancel the pending ``__anext__`` so it + # stops blocking on ``ws.recv()``; the + # ``aclose`` in finally runs the + # generator's ``finally`` (closes WS). + next_task.cancel() + try: + await next_task + except ( + asyncio.CancelledError, + StopAsyncIteration, + Exception, # noqa: BLE001 - best-effort drain + ): + pass + break + # next_task settled — drain the still-pending + # abort_task so we don't leak a future per + # iteration. + if abort_task in pending: + abort_task.cancel() + try: + await abort_task + except (asyncio.CancelledError, Exception): # noqa: BLE001 + pass + try: + audio_chunk = next_task.result() + except StopAsyncIteration: + break + if not self._is_speaking: + break # barge-in or test-hangup + if not first_chunk_sent: + first_chunk_sent = True + if self.metrics is not None: + self.metrics.record_tts_first_byte() + # BUG #128: route every TTS chunk through the paced + # sender so we never push more than + # ``_FIRST_MESSAGE_MARK_WINDOW`` chunks of audio + # ahead of carrier playback. The previous burst-send + # let Twilio's outbound buffer reach several + # seconds — a barge-in's send_clear race-lost + # against the queued media frames and the agent + # kept talking on the user's earpiece for up to + # ~2 s after the user spoke. The paced sender also + # drives the AEC far-end tap and + # ``_mark_first_audio_sent`` internally so this + # path stays parity-clean with + # ``_stream_prewarm_bytes``. + sent = await self._send_paced_first_message_bytes( + audio_chunk + ) + if not sent: + break + finally: + # Always run the generator's ``finally`` so the + # provider WS closes promptly. Idempotent — an + # already-finalised generator just no-ops. + try: + await agen.aclose() + except Exception as exc: # noqa: BLE001 - best-effort + logger.debug("firstMessage tts.aclose raised: %s", exc) finally: # Drop any partial int16 byte to prevent cross-turn corruption # if the stream threw before a complete sample was delivered. self.audio_sender.reset_pcm_carry() + # Clear the abort event handle so the next turn does not + # see an already-set sentinel. + self._first_message_abort = None # Flip back to not-speaking with grace so the ring # buffer accumulated during the intro is flushed and # the next user utterance is recognised cleanly. @@ -2783,6 +2856,30 @@ async def _do_cancel_for_barge_in(self, transcript_text: str) -> None: cancel_event = getattr(self, "_llm_cancel_event", None) if cancel_event is not None: cancel_event.set() + # Signal any in-flight firstMessage ``synthesize`` loop to abort + # immediately. The loop races ``__anext__`` with this event and, + # on set, calls ``aclose`` on the async generator so its + # ``finally`` runs and the provider WS is closed promptly. + fm_abort = getattr(self, "_first_message_abort", None) + if fm_abort is not None and not fm_abort.is_set(): + fm_abort.set() + # Adapter-side teardown: WS-based TTS providers own a + # long-lived socket inside ``synthesize`` whose ``ws.recv()`` + # wait will otherwise hang until ``frame_timeout`` (30 s). + # Calling ``cancel`` from here closes the socket so the + # generator unblocks within one event-loop tick. Best-effort + # — providers without a ``cancel`` method (HTTP streaming + # adapters) simply skip this. + tts = getattr(self, "_tts", None) + tts_cancel = getattr(tts, "cancel", None) if tts is not None else None + if callable(tts_cancel): + try: + tts_cancel() + except Exception as exc: # noqa: BLE001 - best-effort + logger.debug( + "tts.cancel during firstMessage barge-in raised: %s", + exc, + ) try: await self.audio_sender.send_clear() except Exception as exc: diff --git a/libraries/python/tests/unit/test_stream_handler_unit.py b/libraries/python/tests/unit/test_stream_handler_unit.py index 8d2f732..cff537e 100644 --- a/libraries/python/tests/unit/test_stream_handler_unit.py +++ b/libraries/python/tests/unit/test_stream_handler_unit.py @@ -647,3 +647,179 @@ async def consume() -> None: # Without cancellation we would have 50 tokens; the cancel must bound it. assert len(consumed) < 50 assert len(consumed) <= 4 # one extra possible due to ordering + + +# --------------------------------------------------------------------------- +# 0.6.1 fix — Barge-in during firstMessage aborts the TTS synthesize stream +# +# Pre-fix: when the user spoke over the agent's firstMessage in pipeline mode +# (Deepgram STT + Cerebras LLM + ElevenLabs WS TTS), the cancel path flipped +# ``_is_speaking=False`` BUT the ``async for`` consuming ``tts.synthesize`` +# was suspended on ``ws.recv()`` waiting for the next frame. The check at +# the top of the loop body never re-ran. The provider WS sat idle for up +# to ``frame_timeout`` (30 s on ElevenLabs WS) before raising — by then +# every subsequent user utterance was transcribed but no LLM dispatch +# fired because the "speaking lock" path had never released. The call +# was silent for the user. +# +# Post-fix: ``_do_cancel_for_barge_in`` sets ``_first_message_abort`` +# whose ``wait()`` is raced against ``__anext__`` inside the firstMessage +# loop AND calls ``tts.cancel()`` (closes the provider WS). The +# generator's ``finally`` runs and the loop exits within one event-loop +# tick rather than after ``frame_timeout``. +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestBargeInDuringFirstMessageAbortsTtsStream: + async def test_barge_in_sets_first_message_abort_event(self) -> None: + """``_do_cancel_for_barge_in`` must set ``_first_message_abort`` + so the firstMessage synth loop breaks immediately.""" + from getpatter.stream_handler import PipelineStreamHandler + from getpatter.providers.base import Transcript + + handler = object.__new__(PipelineStreamHandler) + handler._is_speaking = True + handler.metrics = None + handler.call_id = "test-call" + handler.audio_sender = MagicMock() + handler.audio_sender.send_clear = AsyncMock() + handler._llm_cancel_event = asyncio.Event() + handler._first_message_abort = asyncio.Event() + assert not handler._first_message_abort.is_set() + + await handler._handle_barge_in( + Transcript(text="hold on", is_final=True, speech_final=True) + ) + + assert handler._first_message_abort.is_set(), ( + "barge-in must set the firstMessage abort event so the " + "synthesize loop unblocks immediately" + ) + + async def test_barge_in_invokes_tts_cancel_when_available(self) -> None: + """``_do_cancel_for_barge_in`` must invoke ``tts.cancel()`` when the + configured TTS adapter exposes the optional hook (WS-based + providers). HTTP-streaming adapters without ``cancel`` are + silently skipped.""" + from getpatter.stream_handler import PipelineStreamHandler + from getpatter.providers.base import Transcript + + handler = object.__new__(PipelineStreamHandler) + handler._is_speaking = True + handler.metrics = None + handler.call_id = "test-call" + handler.audio_sender = MagicMock() + handler.audio_sender.send_clear = AsyncMock() + handler._llm_cancel_event = asyncio.Event() + handler._first_message_abort = asyncio.Event() + cancel_mock = MagicMock() + tts_stub = MagicMock() + tts_stub.cancel = cancel_mock + handler._tts = tts_stub + + await handler._handle_barge_in( + Transcript(text="hold on", is_final=True, speech_final=True) + ) + + cancel_mock.assert_called_once() + + async def test_cancel_safe_when_first_message_abort_unset(self) -> None: + """When no firstMessage is in flight ``_first_message_abort`` is + ``None`` — the cancel path must not raise.""" + from getpatter.stream_handler import PipelineStreamHandler + from getpatter.providers.base import Transcript + + handler = object.__new__(PipelineStreamHandler) + handler._is_speaking = True + handler.metrics = None + handler.call_id = "test-call" + handler.audio_sender = MagicMock() + handler.audio_sender.send_clear = AsyncMock() + handler._llm_cancel_event = asyncio.Event() + handler._first_message_abort = None + handler._tts = None # no TTS adapter — cancel path skipped + + # Must not raise even though the abort handle and TTS are None. + await handler._handle_barge_in( + Transcript(text="hold on", is_final=True, speech_final=True) + ) + assert handler._llm_cancel_event.is_set() + + async def test_manual_aiter_race_unblocks_stalled_generator(self) -> None: + """End-to-end async-iterator race: a stalled async generator + (mimics ElevenLabs WS sitting on ``ws.recv()``) must be broken + out of within one tick of (abort_event.set() + tts.cancel()), and + the generator's ``finally`` must run (closes the provider WS).""" + finally_ran = {"value": False} + recv_event = asyncio.Event() # Resolved when adapter.cancel() fires. + + async def stalled_synth(): + try: + yield b"\x01\x02\x03\x04" + await recv_event.wait() # Mirrors ws.recv() suspension. + yield b"\x05\x06\x07\x08" + finally: + finally_ran["value"] = True + + adapter_cancel_calls = {"value": 0} + + def adapter_cancel() -> None: + adapter_cancel_calls["value"] += 1 + recv_event.set() + + abort = asyncio.Event() + agen = stalled_synth() + aiter = agen.__aiter__() + chunks: list[bytes] = [] + + async def consume() -> None: + try: + while True: + if abort.is_set(): + break + next_task = asyncio.ensure_future(aiter.__anext__()) + abort_task = asyncio.ensure_future(abort.wait()) + done, pending = await asyncio.wait( + {next_task, abort_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + if abort_task in done and next_task not in done: + adapter_cancel() + next_task.cancel() + try: + await next_task + except (asyncio.CancelledError, StopAsyncIteration, Exception): + pass + break + if abort_task in pending: + abort_task.cancel() + try: + await abort_task + except (asyncio.CancelledError, Exception): + pass + try: + chunks.append(next_task.result()) + except StopAsyncIteration: + break + finally: + try: + await agen.aclose() + except Exception: + pass + + consumer_task = asyncio.create_task(consume()) + await asyncio.sleep(0.01) + abort.set() + # Bounded — must finish well under the imagined ``frame_timeout``. + await asyncio.wait_for(consumer_task, timeout=2.0) + + assert chunks == [b"\x01\x02\x03\x04"], ( + "first chunk should land before the abort fires" + ) + assert adapter_cancel_calls["value"] == 1, ( + "adapter.cancel() must be invoked exactly once on abort" + ) + assert finally_ran["value"], ( + "generator finally must run so the provider WS closes promptly" + ) diff --git a/libraries/typescript/src/provider-factory.ts b/libraries/typescript/src/provider-factory.ts index c5f0cc0..f31623e 100644 --- a/libraries/typescript/src/provider-factory.ts +++ b/libraries/typescript/src/provider-factory.ts @@ -78,6 +78,17 @@ export interface TTSAdapter { * Default behaviour is a no-op. Failures must never abort the call. */ warmup?(): Promise; + /** + * Optional hook for adapters that hold a long-lived connection during + * ``synthesizeStream`` (e.g. WS-based providers). The stream handler + * calls this on barge-in during a firstMessage so the in-flight + * generator unblocks immediately instead of waiting for the next + * frame timeout. Implementations should be idempotent and best-effort + * — a missing or no-op ``cancel`` MUST NOT prevent the call from + * continuing. HTTP-streaming adapters with no persistent socket can + * omit it entirely. + */ + cancel?(): void; } /** diff --git a/libraries/typescript/src/providers/elevenlabs-ws-tts.ts b/libraries/typescript/src/providers/elevenlabs-ws-tts.ts index 30fe67c..12b43b5 100644 --- a/libraries/typescript/src/providers/elevenlabs-ws-tts.ts +++ b/libraries/typescript/src/providers/elevenlabs-ws-tts.ts @@ -160,6 +160,16 @@ export class ElevenLabsWebSocketTTS implements TTSAdapter { */ private adoptedConnection: ElevenLabsParkedWS | null = null; + /** + * Currently in-flight WebSocket inside ``synthesizeStream``. Captured at + * the top of the generator and cleared in ``finally``. ``cancel()`` + * inspects this handle and closes the socket so a barge-in that occurs + * while the iterator is suspended on the next-frame wait can unblock + * the loop within one event-loop tick. Without this, the wait stays + * pending until ``FRAME_TIMEOUT_MS`` (30 s) and the call goes silent. + */ + private activeSocket: WebSocket | null = null; + /** * The wire format requested over the ElevenLabs WS. Initially set from * the constructor; ``setTelephonyCarrier`` may auto-flip it to the @@ -311,6 +321,9 @@ export class ElevenLabsWebSocketTTS implements TTSAdapter { headers: { 'xi-api-key': this.apiKey }, }); } + // Publish the active socket so ``cancel()`` from outside can close it + // and unblock the next-frame wait. Cleared in finally. + this.activeSocket = ws; const queue: Buffer[] = []; let done = false; @@ -462,6 +475,12 @@ export class ElevenLabsWebSocketTTS implements TTSAdapter { } } finally { if (connectTimer) clearTimeout(connectTimer); + // Clear the active-socket handle BEFORE closing so a racing + // ``cancel()`` from a parallel coroutine doesn't try to close a + // socket we are already tearing down. + if (this.activeSocket === ws) { + this.activeSocket = null; + } // Best-effort EOS so the server stops billing for unconsumed audio. try { if (ws.readyState === WebSocket.OPEN) { @@ -625,10 +644,33 @@ export class ElevenLabsWebSocketTTS implements TTSAdapter { } } + /** + * Force-close the currently in-flight synthesis socket, if any. Used by + * the stream handler on barge-in during a firstMessage synth: the + * generator's frame wait is unblocked by the resulting ``close`` event, + * which then drives the loop into its ``finally`` for a clean teardown. + * No-op when no synth is in flight. Idempotent. + */ + cancel(): void { + const ws = this.activeSocket; + if (!ws) return; + this.activeSocket = null; + try { + if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) { + ws.close(); + } + } catch { + /* best-effort */ + } + } + /** No-op — connections are per-utterance and torn down inside synthesizeStream. */ async close(): Promise { // Drop any orphaned parked WS so we never leak it past close. this.discardAdoptedConnection(); + // Also fold in the cancel() behaviour so callers who only know + // ``close()`` still tear down an in-flight synth cleanly. + this.cancel(); } } diff --git a/libraries/typescript/src/stream-handler.ts b/libraries/typescript/src/stream-handler.ts index 0030038..d281674 100644 --- a/libraries/typescript/src/stream-handler.ts +++ b/libraries/typescript/src/stream-handler.ts @@ -321,6 +321,18 @@ export class StreamHandler { * earlier. Mirrors Python ``_llm_cancel_event``. */ private llmAbort: AbortController | null = null; + /** + * AbortController for the current firstMessage ``synthesizeStream`` loop. + * Aborted by ``cancelSpeaking`` so a barge-in DURING the agent's first + * utterance (pipeline mode) breaks out of the iterator immediately, + * invokes the TTS generator's ``finally`` (which closes the WS), and + * unblocks the LLM dispatch path for the user's next turn. Without + * this, the ``for await`` was suspended on the next chunk while + * ``isSpeaking`` had already flipped to false — leaving the TTS WS + * in flight until ``FRAME_TIMEOUT_MS`` (30 s) and the call silent + * for the user. Mirrors Python ``_first_message_abort``. + */ + private firstMessageAbort: AbortController | null = null; /** * Wall-clock timestamp of the most recent ``cancelSpeaking`` call, or @@ -467,6 +479,13 @@ export class StreamHandler { // No-op — abort() throws nothing in modern runtimes, but be defensive. } } + if (this.firstMessageAbort !== null && !this.firstMessageAbort.signal.aborted) { + try { + this.firstMessageAbort.abort(); + } catch { + // No-op — defensive against ancient runtimes. + } + } } /** @@ -1698,31 +1717,105 @@ export class StreamHandler { getLogger().debug(`popPrewarmAudio raised: ${String(err)}`); } } + // Arm an abort controller so a barge-in during the firstMessage + // synth path can break the iterator immediately rather than + // hanging on the next ``await iter.next()`` until ``FRAME_TIMEOUT_MS`` + // (30 s on ElevenLabs WS TTS) — see field doc above. Combined + // with ``tts.cancel?.()`` (closes the provider WS) the loop + // exits within one event-loop tick. + this.firstMessageAbort = new AbortController(); + const fmAbortSignal = this.firstMessageAbort.signal; try { if (prewarmBytes) { this.metricsAcc.recordTtsFirstByte(); await this.emitAudioOut(); firstChunkSent = await this.streamPrewarmBytes(prewarmBytes); } else { - for await (const chunk of this.tts.synthesizeStream(this.deps.agent.firstMessage)) { - if (!this.isSpeaking) break; // barge-in or test-hangup - if (!firstChunkSent) { - firstChunkSent = true; - this.metricsAcc.recordTtsFirstByte(); - await this.emitAudioOut(); + const stream = this.tts.synthesizeStream(this.deps.agent.firstMessage); + // Use the manual iterator protocol so we can race ``.next()`` + // with the abort signal. ``for await`` does not surface a hook + // to cancel a pending ``.next()`` from outside. + const iter = (stream as AsyncIterable)[Symbol.asyncIterator](); + try { + while (true) { + if (!this.isSpeaking || fmAbortSignal.aborted) break; + const nextP = iter.next(); + let abortFired = false; + const abortP = new Promise>((resolve) => { + const onAbort = () => { + abortFired = true; + resolve({ value: undefined, done: true }); + }; + if (fmAbortSignal.aborted) onAbort(); + else fmAbortSignal.addEventListener('abort', onAbort, { once: true }); + }); + const result = await Promise.race([nextP, abortP]); + if (abortFired || result.done) { + if (abortFired) { + // Adapter-side teardown: WS-based providers (e.g. + // ElevenLabs WS) own a long-lived socket inside + // ``synthesizeStream`` and the next-frame wait will + // otherwise hang until ``FRAME_TIMEOUT_MS`` (30 s). + // Closing it externally unblocks the wait so the + // generator can run its ``finally`` and the + // ``iter.return()`` below resolves promptly. + // HTTP-streaming adapters with no persistent socket + // simply have no ``cancel`` method and the call here + // is a no-op. + try { + this.tts?.cancel?.(); + } catch (err) { + getLogger().debug( + `tts.cancel during firstMessage barge-in raised: ${String(err)}`, + ); + } + // Hand the in-flight ``.next()`` back to the generator + // so its ``finally`` runs (closes the TTS WS, + // releases the billing-side connection). Errors here + // are best-effort — the call must continue regardless. + try { + await iter.return?.(undefined); + } catch (err) { + getLogger().debug( + `firstMessage iter.return after abort raised: ${String(err)}`, + ); + } + } + break; + } + const chunk = result.value; + if (!this.isSpeaking) break; // barge-in or test-hangup + if (!firstChunkSent) { + firstChunkSent = true; + this.metricsAcc.recordTtsFirstByte(); + await this.emitAudioOut(); + } + // BUG #128: route every TTS chunk through the paced sender so + // we never push more than ``FIRST_MESSAGE_MARK_WINDOW`` chunks + // of audio ahead of carrier playback. The previous burst-send + // (sendAudio in a tight loop with no marks) let Twilio's + // outbound buffer reach several seconds — a barge-in's + // sendClear race-lost against the queued media frames and the + // agent kept talking on the user's earpiece for up to ~2 s + // after the user spoke. The paced sender also drives the AEC + // far-end tap and ``markFirstAudioSent`` internally so this + // path stays parity-clean with ``streamPrewarmBytes``. + const sent = await this.sendPacedFirstMessageBytes(chunk); + if (!sent) break; + } + } finally { + // Best-effort: if we exited via ``isSpeaking=false`` (not the + // abort branch) or via ``sendPacedFirstMessageBytes`` returning + // false, the generator's ``finally`` still needs to run. + // Idempotent: a generator already finalised by an earlier + // ``return()`` just no-ops here. + try { + await iter.return?.(undefined); + } catch (err) { + getLogger().debug( + `firstMessage iter.return on cleanup raised: ${String(err)}`, + ); } - // BUG #128: route every TTS chunk through the paced sender so - // we never push more than ``FIRST_MESSAGE_MARK_WINDOW`` chunks - // of audio ahead of carrier playback. The previous burst-send - // (sendAudio in a tight loop with no marks) let Twilio's - // outbound buffer reach several seconds — a barge-in's - // sendClear race-lost against the queued media frames and the - // agent kept talking on the user's earpiece for up to ~2 s - // after the user spoke. The paced sender also drives the AEC - // far-end tap and ``markFirstAudioSent`` internally so this - // path stays parity-clean with ``streamPrewarmBytes``. - const sent = await this.sendPacedFirstMessageBytes(chunk); - if (!sent) break; } } } catch (e) { @@ -1731,6 +1824,9 @@ export class StreamHandler { // Drop any partial int16 byte to prevent cross-turn corruption // if the stream threw before a complete sample was delivered. this.resetTtsCarry(); + // Release the abort controller so the next turn does not observe + // an already-aborted signal. + this.firstMessageAbort = null; // Flip back to not-speaking with grace so the ring buffer // accumulated during the intro is flushed and the next user // utterance is recognised cleanly. diff --git a/libraries/typescript/tests/unit/stream-handler.test.ts b/libraries/typescript/tests/unit/stream-handler.test.ts index d685dc2..1c7b6b9 100644 --- a/libraries/typescript/tests/unit/stream-handler.test.ts +++ b/libraries/typescript/tests/unit/stream-handler.test.ts @@ -396,6 +396,114 @@ describe('StreamHandler', () => { }); }); + // ------------------------------------------------------------------------- + // 0.6.1 fix — Barge-in during firstMessage aborts the TTS synthesize stream + // + // Pre-fix: when the user spoke over the agent's firstMessage in pipeline + // mode (Deepgram + Cerebras + ElevenLabs WS TTS), ``cancelSpeaking`` + // flipped ``isSpeaking=false`` BUT the ``for await`` consuming + // ``tts.synthesizeStream`` was suspended on ``await iter.next()`` + // waiting for the next WS frame. The check at the top of the loop body + // never re-ran. The provider WS sat idle for up to ``FRAME_TIMEOUT_MS`` + // (30 s on ElevenLabs WS) before raising — by then every subsequent + // user utterance was transcribed but no LLM dispatch fired because + // the "speaking lock" path had never released. The call was silent. + // + // Post-fix: ``cancelSpeaking`` aborts ``firstMessageAbort`` whose + // signal is raced against ``iter.next()`` inside the firstMessage + // loop. On abort the loop calls ``tts.cancel?.()`` (closes the + // provider WS) and ``iter.return()`` so the generator's ``finally`` + // runs and exits within one event-loop tick rather than after + // ``FRAME_TIMEOUT_MS``. + // ------------------------------------------------------------------------- + describe('barge-in during firstMessage aborts TTS stream', () => { + it('aborts firstMessageAbort on cancelSpeaking', () => { + const deps = makeDeps(); + const ws = makeMockWs(); + const handler = new StreamHandler(deps, ws, '+15551111111', '+15552222222'); + const controller = new AbortController(); + (handler as unknown as { firstMessageAbort: AbortController | null }).firstMessageAbort = + controller; + (handler as unknown as { isSpeaking: boolean }).isSpeaking = true; + + expect(controller.signal.aborted).toBe(false); + (handler as unknown as { cancelSpeaking: () => void }).cancelSpeaking(); + expect(controller.signal.aborted).toBe(true); + }); + + it('manual iterator race breaks within one tick of abort and calls adapter.cancel()', async () => { + // Sentinel async-generator that mimics ElevenLabs WS sitting on + // ``ws.recv()`` after the first chunk: yields one chunk, then + // ``await``s a promise that is ONLY resolved when the adapter's + // ``cancel()`` fires (mirroring ``ws.on('close', wakeWaiter)`` in + // ``elevenlabs-ws-tts.ts``). The test verifies that on abort the + // consumer (a) closes the adapter so the wait unblocks, (b) the + // generator's ``finally`` runs, and (c) the consumer exits + // promptly — NOT after a 30 s frame timeout. + const finallyRan = { value: false }; + let resolveStall!: () => void; + const stallPromise = new Promise((r) => { + resolveStall = r; + }); + const stalled = (async function* () { + try { + yield Buffer.from([1, 2, 3, 4]); + await stallPromise; // Resolved by adapter.cancel() below. + yield Buffer.from([5, 6, 7, 8]); + } finally { + finallyRan.value = true; + } + })(); + + // Mock TTS adapter exposing the optional ``cancel()`` hook — the + // production ElevenLabsWebSocketTTS shape. + const adapterCancel = vi.fn(() => resolveStall()); + + // Replicate the race exactly as stream-handler does — iterator + // protocol + AbortController + adapter.cancel() + iter.return(). + const ctrl = new AbortController(); + const iter = (stalled as AsyncIterable)[Symbol.asyncIterator](); + const chunks: Buffer[] = []; + const consumer = (async () => { + while (true) { + if (ctrl.signal.aborted) break; + const nextP = iter.next(); + let aborted = false; + const abortP = new Promise>((resolve) => { + const onAbort = () => { + aborted = true; + resolve({ value: undefined, done: true }); + }; + if (ctrl.signal.aborted) onAbort(); + else ctrl.signal.addEventListener('abort', onAbort, { once: true }); + }); + const result = await Promise.race([nextP, abortP]); + if (aborted || result.done) { + if (aborted) { + adapterCancel(); + await iter.return?.(undefined); + } + break; + } + chunks.push(result.value); + } + })(); + + // Let the first chunk land, then trigger the barge-in. + await new Promise((r) => setTimeout(r, 10)); + ctrl.abort(); + // Bounded await — the consumer MUST finish within a few ms once + // the abort flow closes the adapter. If we ever regress and + // forget to call ``adapter.cancel()`` this test will time out + // at the vitest 15 s default well below FRAME_TIMEOUT_MS=30 s. + await consumer; + + expect(chunks.length).toBe(1); // First chunk delivered before abort + expect(adapterCancel).toHaveBeenCalledTimes(1); + expect(finallyRan.value).toBe(true); // Generator finally observed + }); + }); + // ------------------------------------------------------------------------- // canBargeIn() — adaptive gate on minimum speaking duration. With AEC on // it covers the filter's ~1 s warmup window; with AEC off it is just a