Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.6.1 (2026-05-12)

### Fixed — Barge-in during firstMessage in pipeline mode now aborts the TTS synthesize stream and unblocks subsequent LLM turn dispatch

When the user interrupted the agent during the firstMessage in pipeline mode (Deepgram STT + LLM + ElevenLabs WS TTS), `cancelSpeaking` / `_do_cancel_for_barge_in` flipped `isSpeaking` / `_is_speaking` to `false` but the `for await` / `async for` consuming `tts.synthesizeStream` / `tts.synthesize` stayed suspended on the next-frame wait (`ws.recv()` on the provider socket). The check at the top of the loop body never re-ran, the provider WS sat idle until `FRAME_TIMEOUT_MS` (30 s on ElevenLabs WS TTS), and the "speaking lock" was never released — so subsequent user transcripts were captured by Deepgram but the LLM dispatch path never fired and the call went silent. Fix: added `firstMessageAbort` (TS, `AbortController`) / `_first_message_abort` (Py, `asyncio.Event`) raced against the iterator's `next()` / `__anext__`, plus an optional `cancel()` hook on the TTS adapter that closes the in-flight WS (`activeSocket` / `_active_socket`) so the next-frame wait unblocks within one event-loop tick. The generator's `finally` then runs cleanly and the call resumes normally. Files: `libraries/typescript/src/stream-handler.ts`, `libraries/typescript/src/provider-factory.ts`, `libraries/typescript/src/providers/elevenlabs-ws-tts.ts`, `libraries/python/getpatter/stream_handler.py`, `libraries/python/getpatter/providers/elevenlabs_ws_tts.py`.

### Changed — `StreamHandler` adopt-capability check now uses duck typing

The TS realtime adopt branch in `stream-handler.ts` previously relied on `this.adapter instanceof OpenAIRealtimeAdapter` to gate the prewarm-handoff path. Switched to a duck-type check (`typeof adapter.adoptWebSocket === 'function'`) so the generic stream-handler module stays provider-agnostic on this hot path and matches the Python handler's `getattr(self._adapter, "adopt_websocket", None)` shape. Files: `libraries/typescript/src/stream-handler.ts`.
Expand Down
59 changes: 58 additions & 1 deletion libraries/python/getpatter/providers/elevenlabs_ws_tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,15 @@ def __init__(
# send) instead of opening a fresh socket. The slot is
# consumed exactly once.
self._adopted_connection: Optional[ElevenLabsParkedWS] = None
# Currently in-flight WebSocket inside :meth:`synthesize`.
# Captured at the top of the generator and cleared in the
# ``finally`` block. :meth:`cancel` inspects this handle and
# closes the socket so a barge-in that occurs while the
# iterator is suspended on ``ws.recv()`` can unblock the loop
# within one event-loop tick. Without this, the wait stays
# pending until ``frame_timeout`` (30 s) and the call goes
# silent for the user. Mirrors TS ``activeSocket``.
self._active_socket = None

@property
def api_key(self) -> str:
Expand Down Expand Up @@ -426,6 +435,9 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]:
),
timeout=self.open_timeout,
)
# Publish the active socket so :meth:`cancel` from outside can
# close it and unblock the next-frame wait. Cleared in finally.
self._active_socket = ws
try:
# Initial keep-alive packet establishes the session. Per the
# ElevenLabs docs the first message must contain a single space
Expand All @@ -443,7 +455,7 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]:
# after the consumer drains, which serves as the EOS.
await ws.send(json.dumps({"text": text + " ", "flush": True}))

from websockets.exceptions import ConnectionClosedOK
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK

while True:
try:
Expand All @@ -455,6 +467,11 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]:
except ConnectionClosedOK:
# Server closed cleanly — treat as end-of-stream.
return
except ConnectionClosed:
# Local :meth:`cancel` closed the socket from outside —
# treat as end-of-stream so the generator's ``finally``
# runs and the consumer exits within one tick.
return

# WebSocket frames may be text (JSON) or bytes (rare —
# some deployments may emit binary audio frames directly).
Expand Down Expand Up @@ -507,6 +524,11 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]:
if msg.get("isFinal"):
return
finally:
# Clear the active-socket handle BEFORE closing so a racing
# :meth:`cancel` from a parallel coroutine doesn't try to
# close a socket we are already tearing down.
if self._active_socket is ws:
self._active_socket = None
# Best-effort: tell the server to stop synthesising any
# buffered text the consumer is no longer interested in.
# Failure to send is non-fatal — the socket close below
Expand All @@ -520,6 +542,41 @@ async def synthesize(self, text: str) -> AsyncGenerator[bytes, None]:
except Exception:
pass

def cancel(self) -> None:
"""Force-close the currently in-flight synthesis socket, if any.

Called by the stream handler on barge-in during a firstMessage
synth: the generator's frame wait is unblocked by the resulting
``ConnectionClosed`` exception, which then drives the loop into
its ``finally`` for a clean teardown. No-op when no synth is in
flight. Idempotent — safe to call repeatedly.

Implemented as a sync method (not a coroutine) so the cancel
path can be invoked from any context — including inside the
synchronous portion of :func:`_do_cancel_for_barge_in` where an
``await`` would change call ordering. The actual ``ws.close``
is scheduled on the running event loop and awaited best-effort
by the in-flight generator's ``finally``.
"""
ws = self._active_socket
if ws is None:
return
self._active_socket = None
try:
loop = asyncio.get_event_loop()
loop.create_task(ws.close())
except Exception:
# No running loop or other transient failure — try the
# synchronous low-level transport fallback. The TCP socket
# close still wakes the pending ``ws.recv()`` with a
# ``ConnectionClosed`` on the next event-loop tick.
try:
transport = getattr(ws, "transport", None)
if transport is not None:
transport.close()
except Exception:
pass

async def warmup(self) -> None:
"""Pre-call WebSocket warmup for the ElevenLabs ``/stream-input`` endpoint.

Expand Down
147 changes: 122 additions & 25 deletions libraries/python/getpatter/stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,17 @@ def __init__(
# generic ``audio_*`` marks the Realtime path sends so the two paths
# can coexist without name collisions.
self._first_message_mark_counter: int = 0
# Per-call abort event for the firstMessage ``tts.synthesize`` loop.
# Set by ``_do_cancel_for_barge_in`` so a barge-in DURING the agent's
# first utterance (pipeline mode) breaks out of the async iterator
# immediately, runs the generator's ``finally`` (which closes the
# provider WS), and frees the dispatch path for the user's next
# turn. Without this, the ``async for`` was suspended on
# ``ws.recv()`` while ``_is_speaking`` had already flipped to False
# — leaving the TTS WS in flight until ``frame_timeout`` (30 s on
# ElevenLabs WS TTS) and the call silent for the user. Mirrors TS
# ``firstMessageAbort``.
self._first_message_abort: asyncio.Event | None = None

async def start(self) -> None:
"""Initialize STT/TTS providers, hooks, and start the STT receive loop."""
Expand Down Expand Up @@ -2171,41 +2182,103 @@ async def _connect_stt() -> None:
except Exception as exc: # noqa: BLE001 - best-effort
logger.debug("pop_prewarm_audio raised: %s", exc)
prewarm_bytes = None
# Arm a fresh abort event so a barge-in during the firstMessage
# synth path can break the async iterator immediately rather
# than hanging on the next ``__anext__`` until
# ``frame_timeout`` (30 s on ElevenLabs WS TTS). Combined
# with ``tts.cancel()`` (closes the provider WS) the loop
# exits within one event-loop tick.
self._first_message_abort = asyncio.Event()
fm_abort = self._first_message_abort
try:
if prewarm_bytes:
if self.metrics is not None:
self.metrics.record_tts_first_byte()
first_chunk_sent = await self._stream_prewarm_bytes(prewarm_bytes)
else:
async for audio_chunk in self._tts.synthesize(
self.agent.first_message
):
if not self._is_speaking:
break # barge-in or test-hangup
if not first_chunk_sent:
first_chunk_sent = True
if self.metrics is not None:
self.metrics.record_tts_first_byte()
# BUG #128: route every TTS chunk through the paced
# sender so we never push more than
# ``_FIRST_MESSAGE_MARK_WINDOW`` chunks of audio
# ahead of carrier playback. The previous burst-send
# let Twilio's outbound buffer reach several
# seconds — a barge-in's send_clear race-lost
# against the queued media frames and the agent
# kept talking on the user's earpiece for up to
# ~2 s after the user spoke. The paced sender also
# drives the AEC far-end tap and
# ``_mark_first_audio_sent`` internally so this
# path stays parity-clean with
# ``_stream_prewarm_bytes``.
sent = await self._send_paced_first_message_bytes(audio_chunk)
if not sent:
break
# Use the manual async-iterator protocol so we can race
# ``__anext__`` against the abort event. ``async for``
# does not surface a hook to cancel a pending
# ``__anext__`` from outside.
agen = self._tts.synthesize(self.agent.first_message)
aiter = agen.__aiter__()
try:
while True:
if not self._is_speaking or fm_abort.is_set():
break
next_task = asyncio.ensure_future(aiter.__anext__())
abort_task = asyncio.ensure_future(fm_abort.wait())
done, pending = await asyncio.wait(
{next_task, abort_task},
return_when=asyncio.FIRST_COMPLETED,
)
if abort_task in done and next_task not in done:
# Cancel the pending ``__anext__`` so it
# stops blocking on ``ws.recv()``; the
# ``aclose`` in finally runs the
# generator's ``finally`` (closes WS).
next_task.cancel()
try:
await next_task
except (
asyncio.CancelledError,
StopAsyncIteration,
Exception, # noqa: BLE001 - best-effort drain
):
pass
break
# next_task settled — drain the still-pending
# abort_task so we don't leak a future per
# iteration.
if abort_task in pending:
abort_task.cancel()
try:
await abort_task
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
try:
audio_chunk = next_task.result()
except StopAsyncIteration:
break
if not self._is_speaking:
break # barge-in or test-hangup
if not first_chunk_sent:
first_chunk_sent = True
if self.metrics is not None:
self.metrics.record_tts_first_byte()
# BUG #128: route every TTS chunk through the paced
# sender so we never push more than
# ``_FIRST_MESSAGE_MARK_WINDOW`` chunks of audio
# ahead of carrier playback. The previous burst-send
# let Twilio's outbound buffer reach several
# seconds — a barge-in's send_clear race-lost
# against the queued media frames and the agent
# kept talking on the user's earpiece for up to
# ~2 s after the user spoke. The paced sender also
# drives the AEC far-end tap and
# ``_mark_first_audio_sent`` internally so this
# path stays parity-clean with
# ``_stream_prewarm_bytes``.
sent = await self._send_paced_first_message_bytes(
audio_chunk
)
if not sent:
break
finally:
# Always run the generator's ``finally`` so the
# provider WS closes promptly. Idempotent — an
# already-finalised generator just no-ops.
try:
await agen.aclose()
except Exception as exc: # noqa: BLE001 - best-effort
logger.debug("firstMessage tts.aclose raised: %s", exc)
finally:
# Drop any partial int16 byte to prevent cross-turn corruption
# if the stream threw before a complete sample was delivered.
self.audio_sender.reset_pcm_carry()
# Clear the abort event handle so the next turn does not
# see an already-set sentinel.
self._first_message_abort = None
# Flip back to not-speaking with grace so the ring
# buffer accumulated during the intro is flushed and
# the next user utterance is recognised cleanly.
Expand Down Expand Up @@ -2783,6 +2856,30 @@ async def _do_cancel_for_barge_in(self, transcript_text: str) -> None:
cancel_event = getattr(self, "_llm_cancel_event", None)
if cancel_event is not None:
cancel_event.set()
# Signal any in-flight firstMessage ``synthesize`` loop to abort
# immediately. The loop races ``__anext__`` with this event and,
# on set, calls ``aclose`` on the async generator so its
# ``finally`` runs and the provider WS is closed promptly.
fm_abort = getattr(self, "_first_message_abort", None)
if fm_abort is not None and not fm_abort.is_set():
fm_abort.set()
# Adapter-side teardown: WS-based TTS providers own a
# long-lived socket inside ``synthesize`` whose ``ws.recv()``
# wait will otherwise hang until ``frame_timeout`` (30 s).
# Calling ``cancel`` from here closes the socket so the
# generator unblocks within one event-loop tick. Best-effort
# — providers without a ``cancel`` method (HTTP streaming
# adapters) simply skip this.
tts = getattr(self, "_tts", None)
tts_cancel = getattr(tts, "cancel", None) if tts is not None else None
if callable(tts_cancel):
try:
tts_cancel()
except Exception as exc: # noqa: BLE001 - best-effort
logger.debug(
"tts.cancel during firstMessage barge-in raised: %s",
exc,
)
try:
await self.audio_sender.send_clear()
except Exception as exc:
Expand Down
Loading
Loading