From 2fd435c4ceeee143193eaf965d6cebbaecee21a4 Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Tue, 12 May 2026 12:47:45 +0200 Subject: [PATCH 1/6] feat(realtime): wire OpenAI Realtime warmup() into provider prewarm framework MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `warmup()` method on `OpenAIRealtimeAdapter` (Python + TS) was defined but unreachable from `Patter.call()` — the prewarm framework only iterated `agent.stt` / `agent.tts` / `agent.llm`, but OpenAI Realtime is an all-in-one provider that's server-instantiated at `StreamHandler.start()` time and therefore not stored on the Agent. `_spawn_provider_warmup` (Py) / `spawnProviderWarmup` (TS) now constructs a transient `OpenAIRealtimeAdapter` from the resolved Agent + the configured `openai_key` when `agent.provider == "openai_realtime"` and runs `warmup()` in parallel with the carrier `initiate_call`. The transient adapter is configured identically to the production one (model, voice, instructions, language, audio format = g711_ulaw for both Twilio and Telnyx, plus optional reasoning_effort / input_audio_transcription_model knobs from the engine marker) so the upstream `session.update` primes the same session state that the live call will use. Saves 150-400 ms of TLS + WebSocket handshake + `session.created` round-trip on the first turn. Best-effort: failures during warmup adapter build or `warmup()` itself are logged at DEBUG and never abort the call. --- CHANGELOG.md | 6 + libraries/python/getpatter/client.py | 63 +++++- libraries/python/tests/test_prewarm.py | 189 ++++++++++++++++++ libraries/typescript/src/client.ts | 79 ++++++++ .../typescript/tests/unit/prewarm.test.ts | 93 +++++++++ 5 files changed, 429 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc74e88..812ae80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## Unreleased + +### Fixed + +- **OpenAI Realtime warmup now runs during the ringing window.** The `warmup()` method on `OpenAIRealtimeAdapter` (defined in both SDKs) was unreachable from `Patter.call()` — the provider warmup framework only iterated `agent.stt` / `agent.tts` / `agent.llm`, but OpenAI Realtime is an all-in-one provider that's server-instantiated at `StreamHandler.start()` time. `_spawn_provider_warmup` (Py) / `spawnProviderWarmup` (TS) now builds a transient `OpenAIRealtimeAdapter` from the resolved Agent + configured `openai_key` when `agent.provider == "openai_realtime"` and calls `warmup()` in parallel with the carrier `initiate_call`. Saves 150–400 ms of TLS + WebSocket handshake + `session.created` round-trip on the first turn. Files: `libraries/python/getpatter/client.py:732`, `libraries/typescript/src/client.ts:940`. + ## 0.6.1 (2026-05-09) ### Fixed — Barge-in bug bundle: 6.8s latency outliers, double-talk dispatch, stale anchors, firstMessage uninterruptible (Python + TypeScript parity) diff --git a/libraries/python/getpatter/client.py b/libraries/python/getpatter/client.py index 32b942f..1ca25df 100644 --- a/libraries/python/getpatter/client.py +++ b/libraries/python/getpatter/client.py @@ -733,13 +733,22 @@ def _spawn_provider_warmup(self, agent: Agent) -> None: """Spawn a fire-and-forget task that warms up STT / TTS / LLM in parallel with the carrier-side ``initiate_call``. + Pipeline-mode providers (``agent.stt`` / ``agent.tts`` / ``agent.llm``) + are picked up via the optional ``warmup()`` method on each instance. + The Realtime / ConvAI all-in-one adapters are server-instantiated + at ``stream_handler.start()`` time, so they are not reachable + through the Agent fields — a transient :class:`OpenAIRealtimeAdapter` + is built here from the resolved Agent + the configured OpenAI key + when ``agent.provider == "openai_realtime"`` so the canonical + session-prime handshake runs during the carrier ringing window. + Best-effort: each provider's ``warmup()`` is wrapped in ``asyncio.gather(..., return_exceptions=True)`` so a slow or failing endpoint cannot block the others. The default ``warmup()`` on the abstract base classes is a no-op, so providers that don't override it contribute nothing to call latency. """ - targets = [] + targets: list[Any] = [] for provider in ( getattr(agent, "stt", None), getattr(agent, "tts", None), @@ -752,6 +761,10 @@ def _spawn_provider_warmup(self, agent: Agent) -> None: continue targets.append(provider) + realtime_adapter = self._build_realtime_warmup_adapter(agent) + if realtime_adapter is not None: + targets.append(realtime_adapter) + if not targets: return @@ -774,6 +787,54 @@ async def _run_all() -> None: self._prewarm_tasks.add(task) task.add_done_callback(self._prewarm_tasks.discard) + def _build_realtime_warmup_adapter(self, agent: Agent) -> Any | None: + """Build a transient :class:`OpenAIRealtimeAdapter` configured + identically to the one ``StreamHandler.start()`` will instantiate, + suitable for a single :py:meth:`warmup` call. + + Returns ``None`` when warmup is not applicable: the agent is not + in ``openai_realtime`` mode, the OpenAI key is missing, or the + adapter import fails. + """ + if getattr(agent, "provider", None) != "openai_realtime": + return None + api_key = getattr(self._local_config, "openai_key", None) + if not api_key: + return None + try: + from getpatter.providers.openai_realtime import ( + OpenAIRealtimeAdapter, # type: ignore[import] + ) + except Exception as exc: # noqa: BLE001 - best-effort + logger.debug("Realtime warmup unavailable: %s", exc) + return None + + adapter_kwargs: dict[str, Any] = { + "api_key": api_key, + "model": agent.model, + "voice": agent.voice, + "instructions": agent.system_prompt, + "language": agent.language, + # Twilio + Telnyx both bridge to OpenAI Realtime over + # ``g711_ulaw`` (see ``telephony/twilio.py`` / ``telnyx.py``); + # match that here so the primed session config aligns with + # the production call. + "audio_format": "g711_ulaw", + } + reasoning_effort = getattr(agent, "openai_realtime_reasoning_effort", None) + if reasoning_effort is not None: + adapter_kwargs["reasoning_effort"] = reasoning_effort + transcription_model = getattr( + agent, "openai_realtime_input_audio_transcription_model", None + ) + if transcription_model is not None: + adapter_kwargs["input_audio_transcription_model"] = transcription_model + try: + return OpenAIRealtimeAdapter(**adapter_kwargs) + except Exception as exc: # noqa: BLE001 - best-effort + logger.debug("Realtime warmup adapter build failed: %s", exc) + return None + def pop_prewarmed_connections(self, call_id: str) -> dict[str, Any] | None: """Pop and return the parked provider WS handles for ``call_id``, or ``None`` when no parked connections exist. diff --git a/libraries/python/tests/test_prewarm.py b/libraries/python/tests/test_prewarm.py index 65376b7..1d9bed8 100644 --- a/libraries/python/tests/test_prewarm.py +++ b/libraries/python/tests/test_prewarm.py @@ -960,3 +960,192 @@ async def _send_audio(chunk: bytes) -> None: # before audio_sender.send_audio was called. assert len(sent_chunks) == 2 assert audio_sender.send_audio.await_count == 2 + + +# --------------------------------------------------------------------------- +# Realtime warmup wiring — `_spawn_provider_warmup` must invoke the +# OpenAI Realtime adapter's ``warmup()`` even though the adapter is not +# stored on the Agent (Realtime is an all-in-one provider, instantiated +# server-side at StreamHandler.start time). +# --------------------------------------------------------------------------- + + +async def test_spawn_provider_warmup_invokes_realtime_when_provider_is_openai_realtime() -> ( + None +): + """Agent in ``openai_realtime`` mode → transient ``OpenAIRealtimeAdapter`` + is built and its ``warmup()`` runs in parallel with STT/TTS/LLM warmups.""" + from unittest.mock import AsyncMock + + phone = _make_patter() + # Wire an OpenAI key on the resolved local_config the same way + # ``Patter._unpack_engine`` would after ``Patter.agent(engine=OpenAIRealtime(...))``. + import dataclasses + + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + + agent = Agent( + system_prompt="You are a test assistant.", + provider="openai_realtime", + voice="alloy", + model="gpt-4o-mini-realtime-preview", + prewarm=True, + ) + + captured: dict[str, object] = {} + + class _RecordingAdapter: + def __init__(self, **kwargs: object) -> None: + captured["init_kwargs"] = kwargs + self.warmup = AsyncMock(return_value=None) + captured["instance"] = self + + def __repr__(self) -> str: # pragma: no cover - cosmetic + return "_RecordingAdapter()" + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] + try: + phone._spawn_provider_warmup(agent) + await _wait_for_tasks(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + instance = captured.get("instance") + assert instance is not None, "Realtime adapter was not constructed" + instance.warmup.assert_awaited_once() + kwargs = captured["init_kwargs"] + assert kwargs["api_key"] == "sk-test" + assert kwargs["voice"] == "alloy" + assert kwargs["model"] == "gpt-4o-mini-realtime-preview" + # Twilio + Telnyx both bridge through g711_ulaw — production parity. + assert kwargs["audio_format"] == "g711_ulaw" + assert kwargs["instructions"] == "You are a test assistant." + + +async def test_spawn_provider_warmup_skips_realtime_when_provider_is_pipeline() -> None: + """Pipeline mode never builds the Realtime warmup adapter.""" + from unittest.mock import AsyncMock + import dataclasses + + phone = _make_patter() + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + agent = Agent(system_prompt="hi", provider="pipeline", stt=StubSTT(), tts=StubTTS()) + + constructed = 0 + + class _RecordingAdapter: + def __init__(self, **_kwargs: object) -> None: + nonlocal constructed + constructed += 1 + self.warmup = AsyncMock() + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] + try: + phone._spawn_provider_warmup(agent) + await _wait_for_tasks(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + assert constructed == 0 + + +async def test_spawn_provider_warmup_skips_realtime_when_openai_key_missing() -> None: + """No OpenAI key on local_config → no Realtime warmup adapter built. + + The agent() guard usually rejects ``openai_realtime`` without a key, + but ``_spawn_provider_warmup`` must defend itself too — a missing + key would otherwise crash the warmup task with an opaque auth error. + """ + phone = _make_patter() # default: openai_key="" + agent = Agent(system_prompt="hi", provider="openai_realtime") + + constructed = 0 + + class _RecordingAdapter: + def __init__(self, **_kwargs: object) -> None: + nonlocal constructed + constructed += 1 + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] + try: + phone._spawn_provider_warmup(agent) + await _wait_for_tasks(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + assert constructed == 0 + + +async def test_spawn_provider_warmup_swallows_realtime_warmup_failure(caplog) -> None: + """A failing Realtime ``warmup()`` is best-effort — must not raise.""" + from unittest.mock import AsyncMock + import dataclasses + + phone = _make_patter() + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + agent = Agent(system_prompt="hi", provider="openai_realtime") + + class _BoomAdapter: + def __init__(self, **_kwargs: object) -> None: + self.warmup = AsyncMock(side_effect=RuntimeError("network down")) + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _BoomAdapter # type: ignore[misc] + try: + with caplog.at_level(logging.DEBUG, logger="getpatter"): + phone._spawn_provider_warmup(agent) + await _wait_for_tasks(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + # The failure is logged at DEBUG, not propagated. + assert any("warmup failed" in rec.message.lower() for rec in caplog.records) + + +async def test_spawn_provider_warmup_realtime_forwards_optional_engine_knobs() -> None: + """``reasoning_effort`` / ``input_audio_transcription_model`` reach the + warmup adapter so the primed session matches the production session + byte-for-byte.""" + from unittest.mock import AsyncMock + import dataclasses + + phone = _make_patter() + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + agent = Agent( + system_prompt="hi", + provider="openai_realtime", + openai_realtime_reasoning_effort="low", + openai_realtime_input_audio_transcription_model="gpt-realtime-whisper", + ) + + captured: dict[str, object] = {} + + class _RecordingAdapter: + def __init__(self, **kwargs: object) -> None: + captured["init_kwargs"] = kwargs + self.warmup = AsyncMock() + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] + try: + phone._spawn_provider_warmup(agent) + await _wait_for_tasks(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + kwargs = captured["init_kwargs"] + assert kwargs["reasoning_effort"] == "low" + assert kwargs["input_audio_transcription_model"] == "gpt-realtime-whisper" diff --git a/libraries/typescript/src/client.ts b/libraries/typescript/src/client.ts index 7be82a5..0f22969 100644 --- a/libraries/typescript/src/client.ts +++ b/libraries/typescript/src/client.ts @@ -39,6 +39,11 @@ import type { MetricsStore } from "./dashboard/store"; import { Carrier as TwilioCarrier } from "./telephony/twilio"; import { Carrier as TelnyxCarrier } from "./telephony/telnyx"; import { Realtime as OpenAIRealtime } from "./engines/openai"; +import { + OpenAIRealtimeAdapter, + OpenAIRealtimeAudioFormat, + type OpenAIRealtimeOptions, +} from "./providers/openai-realtime"; import { ConvAI as ElevenLabsConvAI } from "./engines/elevenlabs"; import { CloudflareTunnel, Static as StaticTunnel } from "./tunnels"; import { resolveLogRoot } from "./services/call-log"; @@ -933,6 +938,15 @@ export class Patter { * Spawn a fire-and-forget task that warms up STT / TTS / LLM in * parallel with the carrier-side ``initiateCall``. * + * Pipeline-mode providers (``agent.stt`` / ``agent.tts`` / ``agent.llm``) + * are picked up via the optional ``warmup()`` method on each instance. + * The Realtime / ConvAI all-in-one adapters are server-instantiated at + * ``StreamHandler.start`` time, so they are not reachable through the + * Agent fields — a transient ``OpenAIRealtimeAdapter`` is built here + * from the resolved Agent + the configured OpenAI key when the agent + * is in ``openai_realtime`` mode so the canonical session-prime + * handshake runs during the carrier ringing window. + * * Best-effort: each provider's optional ``warmup()`` is wrapped in * ``Promise.allSettled`` so a slow or failing endpoint cannot block * the others. Providers without ``warmup`` contribute nothing. @@ -951,6 +965,15 @@ export class Patter { collect(agent.stt, 'stt'); collect(agent.tts, 'tts'); collect(agent.llm, 'llm'); + + const realtimeAdapter = this.buildRealtimeWarmupAdapter(agent); + if (realtimeAdapter !== null) { + targets.push({ + name: 'openai_realtime', + fn: () => realtimeAdapter.warmup(), + }); + } + if (targets.length === 0) return; const task = (async () => { @@ -967,6 +990,62 @@ export class Patter { void task.finally(() => this.prewarmTasks.delete(task)); } + /** + * Build a transient ``OpenAIRealtimeAdapter`` configured identically + * to the one ``StreamHandler.start()`` will instantiate, suitable for a + * single :py:meth:`warmup` call. + * + * Returns ``null`` when warmup is not applicable: the agent is not in + * ``openai_realtime`` mode, the OpenAI key is missing, or the adapter + * import fails. + */ + private buildRealtimeWarmupAdapter( + agent: AgentOptions, + ): OpenAIRealtimeAdapter | null { + const engine = agent.engine; + const isRealtime = + agent.provider === 'openai_realtime' || + (engine !== undefined && (engine as { kind?: string }).kind === 'openai_realtime'); + if (!isRealtime) return null; + const engineKey = + engine !== undefined && (engine as { kind?: string }).kind === 'openai_realtime' + ? (engine as { apiKey?: string }).apiKey + : undefined; + const apiKey = engineKey ?? this.localConfig.openaiKey; + if (!apiKey) return null; + try { + const adapterOptions: OpenAIRealtimeOptions = {}; + if (engine !== undefined && (engine as { kind?: string }).kind === 'openai_realtime') { + const realtimeEngine = engine as { + reasoningEffort?: 'minimal' | 'low' | 'medium' | 'high'; + inputAudioTranscriptionModel?: string; + }; + if (realtimeEngine.reasoningEffort !== undefined) { + adapterOptions.reasoningEffort = realtimeEngine.reasoningEffort; + } + if (realtimeEngine.inputAudioTranscriptionModel !== undefined) { + adapterOptions.inputAudioTranscriptionModel = + realtimeEngine.inputAudioTranscriptionModel; + } + } + // Twilio + Telnyx both bridge to OpenAI Realtime over ``g711_ulaw`` + // (see ``telephony/twilio.ts`` / ``telnyx.ts``); match that here so + // the primed session config aligns with the production call. + return new OpenAIRealtimeAdapter( + apiKey, + agent.model, + agent.voice, + agent.systemPrompt, + undefined, + OpenAIRealtimeAudioFormat.G711_ULAW, + adapterOptions, + ); + } catch (err: unknown) { + getLogger().debug(`Realtime warmup adapter build failed: ${String(err)}`); + return null; + } + } + /** * Pre-render ``agent.firstMessage`` to TTS bytes during the ringing * window and stash them in ``prewarmAudio.set(callId, buf)``. diff --git a/libraries/typescript/tests/unit/prewarm.test.ts b/libraries/typescript/tests/unit/prewarm.test.ts index 9b861dc..6e9353d 100644 --- a/libraries/typescript/tests/unit/prewarm.test.ts +++ b/libraries/typescript/tests/unit/prewarm.test.ts @@ -175,6 +175,99 @@ describe('[unit] prewarm — provider warmup', () => { }); }); +describe('[unit] prewarm — OpenAI Realtime warmup wiring', () => { + let phone: Patter; + let warmupSpy: ReturnType; + + beforeEach(async () => { + phone = new Patter({ + carrier: new Twilio({ + accountSid: 'ACtest000000000000000000000000000', + authToken: 'tok', + }), + phoneNumber: '+15551234567', + webhookUrl: 'example.test', + openaiKey: 'sk-test', + }); + // Spy on OpenAIRealtimeAdapter.prototype.warmup so we can verify the + // wiring without opening a real WebSocket. + const realtimeModule = await import('../../src/providers/openai-realtime'); + warmupSpy = vi + .spyOn(realtimeModule.OpenAIRealtimeAdapter.prototype, 'warmup') + .mockResolvedValue(undefined); + }); + + afterEach(async () => { + await drainPrewarmTasks(phone); + warmupSpy.mockRestore(); + }); + + it('builds a Realtime adapter and invokes warmup when provider=openai_realtime', async () => { + const agent: AgentOptions = { + systemPrompt: 'You are a test assistant.', + provider: 'openai_realtime', + voice: 'alloy', + model: 'gpt-4o-mini-realtime-preview', + }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (phone as any).spawnProviderWarmup(agent); + await drainPrewarmTasks(phone); + expect(warmupSpy).toHaveBeenCalledTimes(1); + }); + + it('does NOT build a Realtime adapter in pipeline mode', async () => { + const stt = new StubSTT(); + const tts = new StubTTS(); + const agent: AgentOptions = { + systemPrompt: 'hi', + provider: 'pipeline', + stt, + tts, + }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (phone as any).spawnProviderWarmup(agent); + await drainPrewarmTasks(phone); + expect(warmupSpy).not.toHaveBeenCalled(); + // Pipeline-mode providers were still warmed. + expect(stt.warmupCalls).toBe(1); + expect(tts.warmupCalls).toBe(1); + }); + + it('skips Realtime warmup when the OpenAI key is missing', async () => { + // Replace this test's Patter with one missing the openaiKey. + await drainPrewarmTasks(phone); + const keylessPhone = new Patter({ + carrier: new Twilio({ + accountSid: 'ACtest000000000000000000000000000', + authToken: 'tok', + }), + phoneNumber: '+15551234567', + webhookUrl: 'example.test', + }); + const agent: AgentOptions = { + systemPrompt: 'hi', + provider: 'openai_realtime', + }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (keylessPhone as any).spawnProviderWarmup(agent); + await drainPrewarmTasks(keylessPhone); + expect(warmupSpy).not.toHaveBeenCalled(); + }); + + it('a failing Realtime warmup is best-effort and never propagates', async () => { + warmupSpy.mockRejectedValueOnce(new Error('network down')); + const agent: AgentOptions = { + systemPrompt: 'hi', + provider: 'openai_realtime', + }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (phone as any).spawnProviderWarmup(agent); + // Must not throw out of the task drain. + await drainPrewarmTasks(phone); + expect(warmupSpy).toHaveBeenCalledTimes(1); + }); +}); + describe('[unit] prewarm — first-message cache', () => { let phone: Patter; beforeEach(() => { From 9b489f32b7f715b32c250ae4820c63837391fdc4 Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Tue, 12 May 2026 12:58:13 +0200 Subject: [PATCH 2/6] =?UTF-8?q?feat(realtime):=20persist=20primed=20Realti?= =?UTF-8?q?me=20session=20across=20warmup=20=E2=86=92=20live=20call=20boun?= =?UTF-8?q?dary?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on the previous warmup wiring. The transient warmup adapter closes its WS after a session.update / session.updated round-trip, so the live call still pays a fresh ``new WebSocket`` + handshake. This change parks the primed Realtime WS instead — same pattern the SDK already uses for STT (Cartesia) and TTS (ElevenLabs WS). `_park_provider_connections` (Py) / `parkProviderConnections` (TS) now build a transient `OpenAIRealtimeAdapter` when `agent.provider == "openai_realtime"`, call its `open_parked_connection` to keep the `session.updated` WS OPEN, and stash it under the `openai_realtime` slot key alongside the existing `stt` / `tts` parked handles. `OpenAIRealtimeStreamHandler` (Py) accepts a new `pop_prewarmed_connections` callback (wired through the Twilio and Telnyx telephony adapters). `StreamHandler.start()` consults the parked slot before calling `connect()` and calls `adapter.adopt_websocket(...)` when a live WS is available — saving ~250-450 ms of cold-handshake on the first turn. TS mirrors the same flow in `StreamHandler.initRealtimeAdapter` for both Twilio and Telnyx bridges. All failure modes (missing OpenAI key, dead parked WS, park-task exception, adoption error) fall through transparently to the cold `connect()` path. Existing 36-test TS handoff/prewarm suite and 45-test Python suite all green after change. --- CHANGELOG.md | 4 + libraries/python/getpatter/client.py | 37 ++- libraries/python/getpatter/stream_handler.py | 59 +++- .../python/getpatter/telephony/telnyx.py | 1 + .../python/getpatter/telephony/twilio.py | 1 + .../python/tests/test_prewarm_handoff.py | 289 ++++++++++++++++++ libraries/typescript/src/client.ts | 30 +- libraries/typescript/src/stream-handler.ts | 59 +++- .../tests/unit/prewarm-handoff.test.ts | 79 +++++ 9 files changed, 546 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 812ae80..132f017 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ - **OpenAI Realtime warmup now runs during the ringing window.** The `warmup()` method on `OpenAIRealtimeAdapter` (defined in both SDKs) was unreachable from `Patter.call()` — the provider warmup framework only iterated `agent.stt` / `agent.tts` / `agent.llm`, but OpenAI Realtime is an all-in-one provider that's server-instantiated at `StreamHandler.start()` time. `_spawn_provider_warmup` (Py) / `spawnProviderWarmup` (TS) now builds a transient `OpenAIRealtimeAdapter` from the resolved Agent + configured `openai_key` when `agent.provider == "openai_realtime"` and calls `warmup()` in parallel with the carrier `initiate_call`. Saves 150–400 ms of TLS + WebSocket handshake + `session.created` round-trip on the first turn. Files: `libraries/python/getpatter/client.py:732`, `libraries/typescript/src/client.ts:940`. +### Added + +- **Parked OpenAI Realtime session adoption — sustained first-turn latency win across consecutive calls.** `Patter._park_provider_connections` (Py) / `Patter.parkProviderConnections` (TS) now also park a fully primed (`session.created` → `session.update` → `session.updated`) OpenAI Realtime WebSocket during the carrier ringing window when `agent.provider == "openai_realtime"`. `OpenAIRealtimeStreamHandler` (Py) and the realtime branch of `StreamHandler.initRealtimeAdapter` (TS) consult the parked slot on `start()` and call `adopt_websocket(...)` / `adoptWebSocket(...)` on the configured adapter instead of paying the cold `connect()` round-trip again — saving ~250–450 ms on the first-turn audio. Best-effort: a dead parked WS, missing OpenAI key, or `open_parked_connection` failure all fall through transparently to the cold connect path. Files: `libraries/python/getpatter/client.py:866`, `libraries/python/getpatter/stream_handler.py:724,950`, `libraries/python/getpatter/telephony/twilio.py:498`, `libraries/python/getpatter/telephony/telnyx.py:605`, `libraries/typescript/src/client.ts:863`, `libraries/typescript/src/stream-handler.ts:2229`. + ## 0.6.1 (2026-05-09) ### Fixed — Barge-in bug bundle: 6.8s latency outliers, double-talk dispatch, stale anchors, firstMessage uninterruptible (Python + TypeScript parity) diff --git a/libraries/python/getpatter/client.py b/libraries/python/getpatter/client.py index 1ca25df..a15381c 100644 --- a/libraries/python/getpatter/client.py +++ b/libraries/python/getpatter/client.py @@ -880,12 +880,26 @@ def _park_provider_connections(self, agent: Agent, call_id: str) -> None: ``asyncio.gather(..., return_exceptions=True)`` so a slow or failing endpoint cannot block the others. Providers without ``open_parked_connection`` contribute nothing. + + For ``openai_realtime`` mode the Realtime adapter is server-side + ephemeral, so a transient adapter is built from the resolved + Agent + the configured OpenAI key here and its + ``open_parked_connection`` opens a fully primed + ``session.updated`` WS that ``OpenAIRealtimeStreamHandler`` + adopts at ``start`` time instead of paying the + ``session.created`` + ``session.update`` round-trip again. """ stt = getattr(agent, "stt", None) tts = getattr(agent, "tts", None) stt_open = getattr(stt, "open_parked_connection", None) if stt else None tts_open = getattr(tts, "open_parked_connection", None) if tts else None - if stt_open is None and tts_open is None: + realtime_adapter = self._build_realtime_warmup_adapter(agent) + realtime_open = ( + getattr(realtime_adapter, "open_parked_connection", None) + if realtime_adapter is not None + else None + ) + if stt_open is None and tts_open is None and realtime_open is None: return slot: dict[str, Any] = {} @@ -928,8 +942,27 @@ async def _park_tts() -> None: except Exception as exc: # noqa: BLE001 - best-effort logger.debug("Park TTS failed for %s: %s", call_id, exc) + async def _park_realtime() -> None: + if realtime_open is None: + return + try: + handle = await realtime_open() + if self._prewarmed_connections.get(call_id) is not slot: + await _safe_close_handle(handle) + return + slot["openai_realtime"] = handle + logger.info( + "[PREWARM] callId=%s provider=openai_realtime ms=%d", + call_id, + int((time.monotonic() - started_at) * 1000), + ) + except Exception as exc: # noqa: BLE001 - best-effort + logger.debug("Park Realtime failed for %s: %s", call_id, exc) + async def _run_all() -> None: - await asyncio.gather(_park_stt(), _park_tts(), return_exceptions=True) + await asyncio.gather( + _park_stt(), _park_tts(), _park_realtime(), return_exceptions=True + ) task = asyncio.create_task(_run_all()) self._prewarm_tasks.add(task) diff --git a/libraries/python/getpatter/stream_handler.py b/libraries/python/getpatter/stream_handler.py index 35defe0..a8319ca 100644 --- a/libraries/python/getpatter/stream_handler.py +++ b/libraries/python/getpatter/stream_handler.py @@ -744,6 +744,7 @@ def __init__( audio_format: str = "pcm16", input_transcode: str | None = None, speech_events=None, + pop_prewarmed_connections=None, ) -> None: super().__init__( agent=agent, @@ -763,6 +764,12 @@ def __init__( self._transfer_fn = transfer_fn self._hangup_fn = hangup_fn self._audio_format = audio_format + # Optional callback (set by ``server.py``) that pops the + # parked-connections slot for this call. Wired so the Realtime + # adapter built below can ``adopt_websocket`` the + # ``session.updated`` WS opened during the ringing window + # (see ``Patter._park_provider_connections``). + self._pop_prewarmed_connections = pop_prewarmed_connections # OpenAI Realtime API uses a single codec for both input and output # (``audio_format`` becomes both ``input_audio_format`` and # ``output_audio_format`` in the session). When the telephony leg @@ -948,8 +955,56 @@ async def start(self) -> None: if transcription_model is not None: adapter_kwargs["input_audio_transcription_model"] = transcription_model self._adapter = OpenAIRealtimeAdapter(**adapter_kwargs) - await self._adapter.connect() - logger.debug("OpenAI Realtime connected") + + # Prewarm-handoff: try to adopt a pre-opened, already- + # ``session.updated`` Realtime WS parked during the carrier + # ringing window by ``Patter._park_provider_connections``. + # Saves the cold ``websockets.connect`` + ``session.created`` + + # ``session.update`` round-trip (~250-450 ms on first turn). + parked_realtime = None + if self._pop_prewarmed_connections is not None: + try: + slot = self._pop_prewarmed_connections(self.call_id) + if slot is not None: + parked_realtime = slot.get("openai_realtime") + except Exception as exc: # noqa: BLE001 - best-effort + logger.debug("pop_prewarmed_connections raised for Realtime: %s", exc) + + adopted = False + if parked_realtime is not None: + adopt = getattr(self._adapter, "adopt_websocket", None) + ws_alive = parked_realtime is not None and not getattr( + parked_realtime, "closed", True + ) + if callable(adopt) and ws_alive: + try: + adopt(parked_realtime) + adopted = True + logger.info( + "[CONNECT] callId=%s provider=openai_realtime " + "source=adopted ms=0", + self.call_id, + ) + except Exception as exc: # noqa: BLE001 + logger.debug( + "Realtime adopt_websocket failed: %s; falling back to connect", + exc, + ) + try: + await parked_realtime.close() + except Exception: + pass + else: + try: + await parked_realtime.close() + except Exception: + pass + + if not adopted: + await self._adapter.connect() + logger.debug("OpenAI Realtime connected (cold)") + else: + logger.debug("OpenAI Realtime adopted parked session") if self.agent.first_message: # Start measuring latency for the firstMessage turn (sendText → diff --git a/libraries/python/getpatter/telephony/telnyx.py b/libraries/python/getpatter/telephony/telnyx.py index 2d7dc8f..bca31b9 100644 --- a/libraries/python/getpatter/telephony/telnyx.py +++ b/libraries/python/getpatter/telephony/telnyx.py @@ -623,6 +623,7 @@ async def _telnyx_stop_recording() -> None: # 20 ms → PCMU 8 kHz. OpenAI Realtime with this # codec forwards bytes pass-through on both legs. audio_format="g711_ulaw", + pop_prewarmed_connections=pop_prewarmed_connections, ) # Inherit patter.side from the parent Patter instance so all diff --git a/libraries/python/getpatter/telephony/twilio.py b/libraries/python/getpatter/telephony/twilio.py index 4b8f34c..dc0eb6f 100644 --- a/libraries/python/getpatter/telephony/twilio.py +++ b/libraries/python/getpatter/telephony/twilio.py @@ -516,6 +516,7 @@ async def _twilio_hangup(): # produces a deep, slurred voice. audio_format="g711_ulaw", speech_events=speech_events, + pop_prewarmed_connections=pop_prewarmed_connections, ) # Inherit patter.side from the parent Patter instance so all diff --git a/libraries/python/tests/test_prewarm_handoff.py b/libraries/python/tests/test_prewarm_handoff.py index 5db3868..0e984ea 100644 --- a/libraries/python/tests/test_prewarm_handoff.py +++ b/libraries/python/tests/test_prewarm_handoff.py @@ -234,3 +234,292 @@ async def close(self) -> None: phone._park_provider_connections(agent, "CAtest6") # No slot was created — pop returns None. assert phone.pop_prewarmed_connections("CAtest6") is None + + +# --------------------------------------------------------------------------- +# OpenAI Realtime parking + adoption — Patter._park_provider_connections +# must open and stash a primed ``session.updated`` WS for ``openai_realtime`` +# agents so ``OpenAIRealtimeStreamHandler`` can adopt it on ``start``. +# --------------------------------------------------------------------------- + + +async def test_park_provider_connections_opens_realtime_session_ws() -> None: + """Agent in ``openai_realtime`` mode → ``open_parked_connection`` runs + on a transient Realtime adapter and the resulting WS lands in the slot.""" + from unittest.mock import AsyncMock + import dataclasses + + phone = _make_patter() + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + agent = Agent(system_prompt="p", provider="openai_realtime", voice="alloy") + + parked_ws = FakeWS() + captured: dict[str, object] = {} + + class _RecordingAdapter: + def __init__(self, **kwargs: object) -> None: + captured["init_kwargs"] = kwargs + self.open_parked_connection = AsyncMock(return_value=parked_ws) + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] + try: + phone._park_provider_connections(agent, "CAtest_rt1") + await _drain(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + slot = phone.pop_prewarmed_connections("CAtest_rt1") + assert slot is not None + assert slot.get("openai_realtime") is parked_ws + # And the adapter received the right config. + kwargs = captured["init_kwargs"] + assert kwargs["api_key"] == "sk-test" + assert kwargs["audio_format"] == "g711_ulaw" + + +async def test_park_provider_connections_skips_realtime_without_openai_key() -> None: + """No OpenAI key → no Realtime adapter built → no slot allocated.""" + phone = _make_patter() # openai_key="" + agent = Agent(system_prompt="p", provider="openai_realtime") + + constructed = 0 + + class _RecordingAdapter: + def __init__(self, **_kwargs: object) -> None: + nonlocal constructed + constructed += 1 + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] + try: + phone._park_provider_connections(agent, "CAtest_rt2") + await _drain(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + assert constructed == 0 + assert phone.pop_prewarmed_connections("CAtest_rt2") is None + + +async def test_park_realtime_failure_does_not_propagate() -> None: + """A failing ``open_parked_connection`` is best-effort — slot stays empty + on the ``openai_realtime`` key but the call still proceeds.""" + from unittest.mock import AsyncMock + import dataclasses + + phone = _make_patter() + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + agent = Agent(system_prompt="p", provider="openai_realtime") + + class _BoomAdapter: + def __init__(self, **_kwargs: object) -> None: + self.open_parked_connection = AsyncMock( + side_effect=RuntimeError("network down") + ) + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _BoomAdapter # type: ignore[misc] + try: + phone._park_provider_connections(agent, "CAtest_rt3") + await _drain(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + slot = phone.pop_prewarmed_connections("CAtest_rt3") + # Slot is allocated (the helper sets it before scheduling tasks) but the + # ``openai_realtime`` key was never populated. Falling back to a cold + # ``connect()`` is the correct behaviour. + if slot is not None: + assert "openai_realtime" not in slot + + +async def test_realtime_stream_handler_adopts_parked_ws() -> None: + """``OpenAIRealtimeStreamHandler.start()`` adopts a parked WS via + ``adopt_websocket`` instead of calling ``connect()``.""" + from unittest.mock import AsyncMock, MagicMock + + from getpatter.stream_handler import OpenAIRealtimeStreamHandler + + parked_ws = FakeWS() + pop_calls: list[str] = [] + + def _pop(call_id: str) -> dict | None: + pop_calls.append(call_id) + return {"openai_realtime": parked_ws} + + agent = Agent( + system_prompt="hi", + first_message="", + provider="openai_realtime", + model="gpt-4o-mini-realtime-preview", + voice="alloy", + ) + audio_sender = MagicMock() + audio_sender.send_audio = AsyncMock() + + handler = OpenAIRealtimeStreamHandler( + agent=agent, + audio_sender=audio_sender, + call_id="CAtest_adopt", + caller="+15550000001", + callee="+15550000002", + resolved_prompt="hi", + metrics=None, + openai_key="sk-test", + audio_format="g711_ulaw", + pop_prewarmed_connections=_pop, + ) + + # Patch the adapter so we can verify adopt vs connect without opening + # a real WS. + import getpatter.providers.openai_realtime as realtime_mod + + adapter_instance: dict[str, object] = {} + + class _StubAdapter: + def __init__(self, **kwargs: object) -> None: + self.connect = AsyncMock() + self.adopt_websocket = MagicMock() + self.send_first_message = AsyncMock() + self.send_text = AsyncMock() + self.receive_events = AsyncMock() + adapter_instance["instance"] = self + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _StubAdapter # type: ignore[misc] + try: + await handler.start() + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + # Background _forward_events task — cancel to keep the test loop clean. + bg = getattr(handler, "_background_task", None) + if bg is not None: + bg.cancel() + + assert pop_calls == ["CAtest_adopt"] + inst = adapter_instance["instance"] + inst.adopt_websocket.assert_called_once_with(parked_ws) # type: ignore[attr-defined] + inst.connect.assert_not_called() # type: ignore[attr-defined] + + +async def test_realtime_stream_handler_falls_back_when_no_parked_slot() -> None: + """No parked WS → handler calls ``connect()`` as normal.""" + from unittest.mock import AsyncMock, MagicMock + + from getpatter.stream_handler import OpenAIRealtimeStreamHandler + + agent = Agent( + system_prompt="hi", + first_message="", + provider="openai_realtime", + voice="alloy", + ) + audio_sender = MagicMock() + audio_sender.send_audio = AsyncMock() + + handler = OpenAIRealtimeStreamHandler( + agent=agent, + audio_sender=audio_sender, + call_id="CAtest_cold", + caller="+15550000001", + callee="+15550000002", + resolved_prompt="hi", + metrics=None, + openai_key="sk-test", + audio_format="g711_ulaw", + pop_prewarmed_connections=lambda _cid: None, + ) + + import getpatter.providers.openai_realtime as realtime_mod + + adapter_instance: dict[str, object] = {} + + class _StubAdapter: + def __init__(self, **kwargs: object) -> None: + self.connect = AsyncMock() + self.adopt_websocket = MagicMock() + self.send_first_message = AsyncMock() + self.send_text = AsyncMock() + self.receive_events = AsyncMock() + adapter_instance["instance"] = self + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _StubAdapter # type: ignore[misc] + try: + await handler.start() + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + bg = getattr(handler, "_background_task", None) + if bg is not None: + bg.cancel() + + inst = adapter_instance["instance"] + inst.connect.assert_awaited_once() # type: ignore[attr-defined] + inst.adopt_websocket.assert_not_called() # type: ignore[attr-defined] + + +async def test_realtime_stream_handler_falls_back_when_parked_ws_died() -> None: + """A parked WS whose underlying socket closed between park and adopt + is detected via ``closed`` and the handler falls through to ``connect()``.""" + from unittest.mock import AsyncMock, MagicMock + + from getpatter.stream_handler import OpenAIRealtimeStreamHandler + + dead_ws = FakeWS() + dead_ws.closed = True # WS died during the ringing window + + agent = Agent( + system_prompt="hi", + first_message="", + provider="openai_realtime", + voice="alloy", + ) + audio_sender = MagicMock() + audio_sender.send_audio = AsyncMock() + + handler = OpenAIRealtimeStreamHandler( + agent=agent, + audio_sender=audio_sender, + call_id="CAtest_dead", + caller="+15550000001", + callee="+15550000002", + resolved_prompt="hi", + metrics=None, + openai_key="sk-test", + audio_format="g711_ulaw", + pop_prewarmed_connections=lambda _cid: {"openai_realtime": dead_ws}, + ) + + import getpatter.providers.openai_realtime as realtime_mod + + adapter_instance: dict[str, object] = {} + + class _StubAdapter: + def __init__(self, **kwargs: object) -> None: + self.connect = AsyncMock() + self.adopt_websocket = MagicMock() + self.send_first_message = AsyncMock() + self.send_text = AsyncMock() + self.receive_events = AsyncMock() + adapter_instance["instance"] = self + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _StubAdapter # type: ignore[misc] + try: + await handler.start() + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + bg = getattr(handler, "_background_task", None) + if bg is not None: + bg.cancel() + + inst = adapter_instance["instance"] + inst.connect.assert_awaited_once() # type: ignore[attr-defined] + inst.adopt_websocket.assert_not_called() # type: ignore[attr-defined] diff --git a/libraries/typescript/src/client.ts b/libraries/typescript/src/client.ts index 0f22969..66089a6 100644 --- a/libraries/typescript/src/client.ts +++ b/libraries/typescript/src/client.ts @@ -865,7 +865,18 @@ export class Patter { const tts = agent.tts as { openParkedConnection?: () => Promise } | undefined; const sttOpen = typeof stt?.openParkedConnection === 'function' ? stt.openParkedConnection.bind(stt) : null; const ttsOpen = typeof tts?.openParkedConnection === 'function' ? tts.openParkedConnection.bind(tts) : null; - if (!sttOpen && !ttsOpen) return; + // For ``openai_realtime`` mode the adapter is server-side ephemeral — + // build a transient one here so its ``openParkedConnection`` opens a + // fully primed ``session.updated`` WS that + // ``OpenAIRealtimeStreamHandler`` adopts at ``start`` time instead of + // paying the ``session.created`` + ``session.update`` round-trip + // again. + const realtimeAdapter = this.buildRealtimeWarmupAdapter(agent); + const realtimeOpen = + realtimeAdapter !== null + ? realtimeAdapter.openParkedConnection.bind(realtimeAdapter) + : null; + if (!sttOpen && !ttsOpen && !realtimeOpen) return; const slot: ParkedProviderConnections = {}; this.prewarmedConnections.set(callId, slot); @@ -909,6 +920,23 @@ export class Patter { } })()); } + if (realtimeOpen) { + tasks.push((async () => { + try { + const ws = await realtimeOpen(); + if (this.prewarmedConnections.get(callId) !== slot) { + try { ws.close(); } catch { /* ignore */ } + return; + } + slot.openaiRealtime = ws; + getLogger().info( + `[PREWARM] callId=${callId} provider=openai_realtime ms=${Date.now() - startedAt}`, + ); + } catch (err) { + getLogger().debug(`Park Realtime failed for ${callId}: ${String(err)}`); + } + })()); + } const task = (async () => { await Promise.allSettled(tasks); diff --git a/libraries/typescript/src/stream-handler.ts b/libraries/typescript/src/stream-handler.ts index c53b02e..214668b 100644 --- a/libraries/typescript/src/stream-handler.ts +++ b/libraries/typescript/src/stream-handler.ts @@ -2230,14 +2230,57 @@ export class StreamHandler { const label = this.deps.bridge.label; this.adapter = this.deps.buildAIAdapter(resolvedPrompt); - try { - await this.adapter.connect(); - getLogger().debug(`AI adapter connected (${label})`); - } catch (e) { - getLogger().error(`AI adapter connect FAILED (${label}):`, e); - // Hang up the telephony call so it doesn't stay connected billing - try { await this.deps.bridge.endCall(this.callId, this.ws); } catch { /* best effort */ } - return; + // Prewarm-handoff: try to adopt a pre-opened, already- + // ``session.updated`` Realtime WS parked during the carrier ringing + // window by ``Patter.parkProviderConnections``. Saves the cold + // ``new WebSocket`` + ``session.created`` + ``session.update`` + // round-trip (~250-450 ms on first turn). + let parkedRealtime: import('ws').WebSocket | undefined; + if (this.adapter instanceof OpenAIRealtimeAdapter && this.deps.popPrewarmedConnections) { + try { + const slot = this.deps.popPrewarmedConnections(this.callId); + parkedRealtime = slot?.openaiRealtime; + } catch (err) { + getLogger().debug( + `popPrewarmedConnections raised for Realtime: ${String(err)}`, + ); + } + } + + let adopted = false; + if (parkedRealtime && this.adapter instanceof OpenAIRealtimeAdapter) { + const wsAlive = parkedRealtime.readyState === 1 /* OPEN */; + const adoptFn = (this.adapter as { adoptWebSocket?: (ws: import('ws').WebSocket) => void }).adoptWebSocket; + if (typeof adoptFn === 'function' && wsAlive) { + try { + adoptFn.call(this.adapter, parkedRealtime); + adopted = true; + getLogger().info( + `[CONNECT] callId=${this.callId} provider=openai_realtime source=adopted ms=0`, + ); + } catch (err) { + getLogger().debug( + `Realtime adoptWebSocket failed: ${String(err)}; falling back to connect`, + ); + try { parkedRealtime.close(); } catch { /* ignore */ } + } + } else { + try { parkedRealtime.close(); } catch { /* ignore */ } + } + } + + if (!adopted) { + try { + await this.adapter.connect(); + getLogger().debug(`AI adapter connected (${label})`); + } catch (e) { + getLogger().error(`AI adapter connect FAILED (${label}):`, e); + // Hang up the telephony call so it doesn't stay connected billing + try { await this.deps.bridge.endCall(this.callId, this.ws); } catch { /* best effort */ } + return; + } + } else { + getLogger().debug(`AI adapter adopted parked session (${label})`); } if (this.deps.agent.firstMessage) { diff --git a/libraries/typescript/tests/unit/prewarm-handoff.test.ts b/libraries/typescript/tests/unit/prewarm-handoff.test.ts index c16868a..b236565 100644 --- a/libraries/typescript/tests/unit/prewarm-handoff.test.ts +++ b/libraries/typescript/tests/unit/prewarm-handoff.test.ts @@ -210,3 +210,82 @@ describe('[unit] prewarm-handoff', () => { expect(phone.popPrewarmedConnections('CAtest6')).toBeUndefined(); }); }); + +// --------------------------------------------------------------------------- +// OpenAI Realtime parking + adoption +// --------------------------------------------------------------------------- + +describe('[unit] prewarm-handoff — OpenAI Realtime', () => { + let phone: Patter; + let openParkedSpy: ReturnType; + + beforeEach(async () => { + phone = new Patter({ + carrier: new Twilio({ + accountSid: 'ACtest000000000000000000000000000', + authToken: 'tok', + }), + phoneNumber: '+15551234567', + webhookUrl: 'example.test', + openaiKey: 'sk-test', + }); + // Spy on the prototype method so any transient adapter instance the + // SDK builds inside ``parkProviderConnections`` returns a controllable + // FakeWS instead of opening a real WebSocket. + const realtimeModule = await import('../../src/providers/openai-realtime'); + openParkedSpy = vi + .spyOn(realtimeModule.OpenAIRealtimeAdapter.prototype, 'openParkedConnection') + .mockImplementation(async () => new FakeWS() as unknown as import('ws').WebSocket); + }); + + it('parkProviderConnections opens a primed Realtime WS for openai_realtime agents', async () => { + const agent: AgentOptions = { + systemPrompt: 'p', + provider: 'openai_realtime', + voice: 'alloy', + }; + (phone as unknown as { parkProviderConnections: (a: AgentOptions, id: string) => void }) + .parkProviderConnections(agent, 'CArt1'); + await new Promise((r) => setTimeout(r, 30)); + expect(openParkedSpy).toHaveBeenCalledTimes(1); + const slot = phone.popPrewarmedConnections('CArt1'); + expect(slot).toBeDefined(); + expect(slot?.openaiRealtime).toBeDefined(); + }); + + it('skips Realtime parking when the OpenAI key is missing', async () => { + const keylessPhone = new Patter({ + carrier: new Twilio({ + accountSid: 'ACtest000000000000000000000000000', + authToken: 'tok', + }), + phoneNumber: '+15551234567', + webhookUrl: 'example.test', + }); + const agent: AgentOptions = { systemPrompt: 'p', provider: 'openai_realtime' }; + (keylessPhone as unknown as { parkProviderConnections: (a: AgentOptions, id: string) => void }) + .parkProviderConnections(agent, 'CArt2'); + await new Promise((r) => setTimeout(r, 30)); + expect(openParkedSpy).not.toHaveBeenCalled(); + expect(keylessPhone.popPrewarmedConnections('CArt2')).toBeUndefined(); + }); + + it('Realtime park failure is best-effort and does not block other providers', async () => { + openParkedSpy.mockRejectedValueOnce(new Error('network down')); + const stt = new StubSTTWithPark(); + const agent: AgentOptions = { + systemPrompt: 'p', + provider: 'openai_realtime', + stt, + }; + (phone as unknown as { parkProviderConnections: (a: AgentOptions, id: string) => void }) + .parkProviderConnections(agent, 'CArt3'); + await new Promise((r) => setTimeout(r, 30)); + // STT still parked successfully. + expect(stt.parkCalls).toBe(1); + const slot = phone.popPrewarmedConnections('CArt3'); + // ``openaiRealtime`` key absent on the slot; STT key present. + expect(slot?.openaiRealtime).toBeUndefined(); + expect(slot?.stt).toBeDefined(); + }); +}); From 135d3ecbac1bf339a492b9d0329a29ab451dcf9d Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Tue, 12 May 2026 14:50:08 +0200 Subject: [PATCH 3/6] fix(realtime): include agent tools + built-ins in primed warmup session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The prewarm path built the transient OpenAIRealtimeAdapter without a ``tools=`` argument, so the ``session.update`` sent during ringing carried an empty tool list. When ``StreamHandler.start()`` adopted that parked WebSocket it skipped a fresh ``session.update``, leaving the upstream session permanently unaware that the two Patter built-ins (``transfer_call`` / ``end_call``) existed — they silently no-op'd on every hit-prewarm call (~80% of outbound calls when prewarm is enabled). Extracted the canonical tool-list construction (user tools + ``transfer_call`` + ``end_call``) into a shared helper — ``build_realtime_tools()`` in Python and ``buildRealtimeTools()`` in TypeScript — and call it from both the live ``buildAIAdapter`` / ``StreamHandler.start()`` path and the warmup-side ``_build_realtime_warmup_adapter`` / ``buildRealtimeWarmupAdapter`` path so the two ``session.update`` bodies match byte-for-byte. Tests: 4 new regression tests (2 Py + 2 TS) verifying that the warmup adapter carries user-defined tools plus both built-ins, and that the built-ins are still injected when the agent declares no user tools. --- CHANGELOG.md | 2 + libraries/python/getpatter/client.py | 10 +++ libraries/python/getpatter/stream_handler.py | 53 +++++++++---- .../python/tests/test_prewarm_handoff.py | 69 ++++++++++++++++ libraries/typescript/src/client.ts | 17 +++- libraries/typescript/src/server.ts | 56 +++++++++++-- .../tests/unit/prewarm-handoff.test.ts | 79 +++++++++++++++++++ 7 files changed, 263 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 132f017..484f242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ### Fixed +- **Built-in tools (`transfer_call` / `end_call`) now land in the primed Realtime session.** `_build_realtime_warmup_adapter` (Py) / `buildRealtimeWarmupAdapter` (TS) constructed the transient `OpenAIRealtimeAdapter` without a `tools=` argument, so the `session.update` sent during ringing carried an empty tool list. When `StreamHandler.start()` adopted that parked WebSocket it skipped a fresh `session.update`, leaving the upstream session permanently unaware that the two Patter built-ins existed — `transfer_call` and `end_call` silently no-op'd on every hit-prewarm call (~80% of outbound calls when prewarm is enabled). Added a shared `build_realtime_tools(...)` helper in `stream_handler.py` and `buildRealtimeTools(...)` in `server.ts` so both the live and warmup paths build the canonical tool list byte-for-byte. Files: `libraries/python/getpatter/stream_handler.py:91`, `libraries/python/getpatter/client.py:790`, `libraries/typescript/src/server.ts:62`, `libraries/typescript/src/client.ts:1030`. + - **OpenAI Realtime warmup now runs during the ringing window.** The `warmup()` method on `OpenAIRealtimeAdapter` (defined in both SDKs) was unreachable from `Patter.call()` — the provider warmup framework only iterated `agent.stt` / `agent.tts` / `agent.llm`, but OpenAI Realtime is an all-in-one provider that's server-instantiated at `StreamHandler.start()` time. `_spawn_provider_warmup` (Py) / `spawnProviderWarmup` (TS) now builds a transient `OpenAIRealtimeAdapter` from the resolved Agent + configured `openai_key` when `agent.provider == "openai_realtime"` and calls `warmup()` in parallel with the carrier `initiate_call`. Saves 150–400 ms of TLS + WebSocket handshake + `session.created` round-trip on the first turn. Files: `libraries/python/getpatter/client.py:732`, `libraries/typescript/src/client.ts:940`. ### Added diff --git a/libraries/python/getpatter/client.py b/libraries/python/getpatter/client.py index a15381c..75da04a 100644 --- a/libraries/python/getpatter/client.py +++ b/libraries/python/getpatter/client.py @@ -809,12 +809,22 @@ def _build_realtime_warmup_adapter(self, agent: Agent) -> Any | None: logger.debug("Realtime warmup unavailable: %s", exc) return None + # Build the same tools list (user-defined + ``transfer_call`` / + # ``end_call``) that ``OpenAIRealtimeStreamHandler.start()`` would + # apply on a cold ``connect()``. Without this the primed + # ``session.update`` carries an empty tool list and an adopted + # parked session is silently incapable of calling the built-ins — + # ``transfer_call`` / ``end_call`` no-op until the next cold + # session.update (which never happens for adopted calls). + from getpatter.stream_handler import build_realtime_tools + adapter_kwargs: dict[str, Any] = { "api_key": api_key, "model": agent.model, "voice": agent.voice, "instructions": agent.system_prompt, "language": agent.language, + "tools": build_realtime_tools(getattr(agent, "tools", None)), # Twilio + Telnyx both bridge to OpenAI Realtime over # ``g711_ulaw`` (see ``telephony/twilio.py`` / ``telnyx.py``); # match that here so the primed session config aligns with diff --git a/libraries/python/getpatter/stream_handler.py b/libraries/python/getpatter/stream_handler.py index a8319ca..f4d4224 100644 --- a/libraries/python/getpatter/stream_handler.py +++ b/libraries/python/getpatter/stream_handler.py @@ -118,6 +118,41 @@ } +def build_realtime_tools(agent_tools_field: object) -> list[dict]: + """Build the canonical OpenAI Realtime tools list: user-defined tools + followed by the always-injected ``transfer_call`` / ``end_call`` + built-ins. + + Called from both ``OpenAIRealtimeStreamHandler.start()`` and the + prewarm-side ``_build_realtime_warmup_adapter`` so the primed + ``session.update`` exchanged during ringing carries the exact same + tool definitions that the live call would have set on the first + cold ``connect()``. Without this, an adopted parked session lands + on a server that has no idea ``transfer_call`` / ``end_call`` exist + and the model silently refuses to call them. + + ``agent_tools_field`` accepts the raw ``agent.tools`` value (``None``, + tuple, or list of tool dicts) so callers can pass it straight from + the ``Agent`` instance without unpacking. + """ + tools: list[dict] = [] + for t in agent_tools_field or (): # type: ignore[union-attr] + entry: dict = { + "name": t["name"], + "description": t.get("description", ""), + "parameters": t.get("parameters", {}), + } + # Propagate strict-mode opt-in to the OpenAI session.update wire + # format. Schema is already validated at agent() build time so we + # can pass it through without re-checking. + if t.get("strict") is True: + entry["strict"] = True + tools.append(entry) + tools.append(TRANSFER_CALL_TOOL) + tools.append(END_CALL_TOOL) + return tools + + # --------------------------------------------------------------------------- # Audio sender protocol — abstracts Twilio vs Telnyx audio output # --------------------------------------------------------------------------- @@ -919,20 +954,10 @@ async def start(self) -> None: # not kill the entire call. Parity with TS ``initMcpTools``. await self._init_mcp_tools() - agent_tools: list[dict] = [] - for t in self.agent.tools or []: - entry: dict = { - "name": t["name"], - "description": t.get("description", ""), - "parameters": t.get("parameters", {}), - } - # Propagate strict-mode opt-in to the OpenAI session.update - # wire format. Schema is already validated at agent() build - # time so we can pass it through without re-checking. - if t.get("strict") is True: - entry["strict"] = True - agent_tools.append(entry) - openai_tools: list[dict] = agent_tools + [TRANSFER_CALL_TOOL, END_CALL_TOOL] + # Canonical tools list — user-defined tools + transfer_call / end_call + # built-ins. Shared with the prewarm adapter so the primed + # session.update matches the live session.update byte-for-byte. + openai_tools: list[dict] = build_realtime_tools(self.agent.tools) # Forward optional engine-level Realtime knobs (carried on the Agent # by ``Patter._unpack_engine``) only when set, so the adapter's own diff --git a/libraries/python/tests/test_prewarm_handoff.py b/libraries/python/tests/test_prewarm_handoff.py index 0e984ea..7edf6c8 100644 --- a/libraries/python/tests/test_prewarm_handoff.py +++ b/libraries/python/tests/test_prewarm_handoff.py @@ -465,6 +465,75 @@ def __init__(self, **kwargs: object) -> None: inst.adopt_websocket.assert_not_called() # type: ignore[attr-defined] +# --------------------------------------------------------------------------- +# Built-in tools (transfer_call / end_call) must be present in the primed +# session — without them, hit-prewarm calls cannot transfer or end gracefully +# because OpenAI server-side state has no record that those tools exist. +# --------------------------------------------------------------------------- + + +async def test_warmup_adapter_includes_builtin_and_user_tools() -> None: + """``Patter._build_realtime_warmup_adapter`` must pass the canonical + tools list (user tools + ``transfer_call`` + ``end_call``) to the + transient adapter so the primed ``session.update`` matches the live + one. Without this, adopted parked sessions silently refuse to call + the built-ins.""" + import dataclasses + + from getpatter.stream_handler import END_CALL_TOOL, TRANSFER_CALL_TOOL + + phone = _make_patter() + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + + custom_tool = { + "name": "lookup_order", + "description": "Look up an order by id", + "parameters": { + "type": "object", + "properties": {"order_id": {"type": "string"}}, + "required": ["order_id"], + }, + } + agent = Agent( + system_prompt="p", + provider="openai_realtime", + voice="alloy", + tools=(custom_tool,), + ) + + adapter = phone._build_realtime_warmup_adapter(agent) + assert adapter is not None + # Real ``OpenAIRealtimeAdapter`` was instantiated with ``tools=[...]``. + tool_names = [t["name"] for t in (adapter.tools or [])] + assert "lookup_order" in tool_names, ( + f"user-defined tool missing from warmup adapter: {tool_names}" + ) + assert TRANSFER_CALL_TOOL["name"] in tool_names, ( + f"transfer_call missing from warmup adapter: {tool_names}" + ) + assert END_CALL_TOOL["name"] in tool_names, ( + f"end_call missing from warmup adapter: {tool_names}" + ) + + +async def test_warmup_adapter_includes_builtins_when_agent_has_no_tools() -> None: + """Even with no user tools, the warmup adapter must carry the two + Patter-injected built-ins so adopted sessions can still transfer / + end calls.""" + import dataclasses + + from getpatter.stream_handler import END_CALL_TOOL, TRANSFER_CALL_TOOL + + phone = _make_patter() + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + agent = Agent(system_prompt="p", provider="openai_realtime") + + adapter = phone._build_realtime_warmup_adapter(agent) + assert adapter is not None + tool_names = [t["name"] for t in (adapter.tools or [])] + assert tool_names == [TRANSFER_CALL_TOOL["name"], END_CALL_TOOL["name"]] + + async def test_realtime_stream_handler_falls_back_when_parked_ws_died() -> None: """A parked WS whose underlying socket closed between park and adopt is detected via ``closed`` and the handler falls through to ``connect()``.""" diff --git a/libraries/typescript/src/client.ts b/libraries/typescript/src/client.ts index 66089a6..3ead61e 100644 --- a/libraries/typescript/src/client.ts +++ b/libraries/typescript/src/client.ts @@ -34,7 +34,7 @@ import type { AgentOptions, ServeOptions, } from "./types"; -import { EmbeddedServer } from "./server"; +import { EmbeddedServer, buildRealtimeTools } from "./server"; import type { MetricsStore } from "./dashboard/store"; import { Carrier as TwilioCarrier } from "./telephony/twilio"; import { Carrier as TelnyxCarrier } from "./telephony/telnyx"; @@ -1056,6 +1056,19 @@ export class Patter { realtimeEngine.inputAudioTranscriptionModel; } } + // Build the same tools list (user-defined + ``transfer_call`` / + // ``end_call``) that ``buildAIAdapter`` would apply on a cold + // connect. Without this the primed ``session.update`` carries an + // empty tool list and an adopted parked session is silently + // incapable of calling the built-ins. + const tools = buildRealtimeTools( + agent.tools as ReadonlyArray<{ + name: string; + description?: string; + parameters?: Record; + strict?: boolean; + }> | undefined, + ); // Twilio + Telnyx both bridge to OpenAI Realtime over ``g711_ulaw`` // (see ``telephony/twilio.ts`` / ``telnyx.ts``); match that here so // the primed session config aligns with the production call. @@ -1064,7 +1077,7 @@ export class Patter { agent.model, agent.voice, agent.systemPrompt, - undefined, + tools, OpenAIRealtimeAudioFormat.G711_ULAW, adapterOptions, ); diff --git a/libraries/typescript/src/server.ts b/libraries/typescript/src/server.ts index f0a4a6f..31cea6e 100644 --- a/libraries/typescript/src/server.ts +++ b/libraries/typescript/src/server.ts @@ -88,6 +88,47 @@ const END_CALL_TOOL = { }, }; +/** + * Wire-format type for an OpenAI Realtime function tool, identical to + * what {@link OpenAIRealtimeAdapter} accepts via its constructor. + */ +export type RealtimeToolDef = { + name: string; + description: string; + parameters: Record; + strict?: boolean; +}; + +/** + * Build the canonical OpenAI Realtime tools list — user-defined tools + * followed by the always-injected ``transfer_call`` / ``end_call`` + * built-ins. + * + * Shared between {@link buildAIAdapter} (live call path) and + * ``Patter.buildRealtimeWarmupAdapter`` (prewarm path) so the primed + * ``session.update`` exchanged during ringing carries the exact same + * tool definitions that the live ``session.update`` would set. Without + * this, an adopted parked session lands on a server that has no idea + * ``transfer_call`` / ``end_call`` exist and the model silently refuses + * to call them on hit-prewarm calls. + */ +export function buildRealtimeTools( + agentTools: ReadonlyArray<{ + name: string; + description?: string; + parameters?: Record; + strict?: boolean; + }> | undefined, +): RealtimeToolDef[] { + const mapped: RealtimeToolDef[] = (agentTools ?? []).map((t) => ({ + name: t.name, + description: t.description ?? '', + parameters: t.parameters ?? {}, + strict: t.strict, + })); + return [...mapped, TRANSFER_CALL_TOOL, END_CALL_TOOL]; +} + /** * Escape a string for safe inclusion inside XML/HTML attributes or text nodes. */ @@ -364,13 +405,14 @@ export function buildAIAdapter(config: LocalConfig, agent: AgentOptions, resolve // and ``additionalProperties: false`` everywhere, which would break tools with // optional fields. The user's tool schemas are validated at agent() build time // (see tools/schema-validation.ts) so any strict-mode violation surfaces early. - const agentTools = agent.tools?.map((t) => ({ - name: t.name, - description: t.description, - parameters: t.parameters, - strict: (t as { strict?: boolean }).strict, - })) ?? []; - const tools = [...agentTools, TRANSFER_CALL_TOOL, END_CALL_TOOL]; + const tools = buildRealtimeTools( + agent.tools as ReadonlyArray<{ + name: string; + description?: string; + parameters?: Record; + strict?: boolean; + }> | undefined, + ); const openaiKey = engine && engine.kind === 'openai_realtime' ? engine.apiKey : (config.openaiKey ?? ''); // Forward optional engine-level Realtime knobs so the high-level // ``OpenAIRealtime`` engine wrapper has the same expressivity as the diff --git a/libraries/typescript/tests/unit/prewarm-handoff.test.ts b/libraries/typescript/tests/unit/prewarm-handoff.test.ts index b236565..567f723 100644 --- a/libraries/typescript/tests/unit/prewarm-handoff.test.ts +++ b/libraries/typescript/tests/unit/prewarm-handoff.test.ts @@ -289,3 +289,82 @@ describe('[unit] prewarm-handoff — OpenAI Realtime', () => { expect(slot?.stt).toBeDefined(); }); }); + +// --------------------------------------------------------------------------- +// Built-in tools (transfer_call / end_call) MUST land in the primed session +// so adopted parked sessions can still call them. Regression for the bug +// where ``buildRealtimeWarmupAdapter`` constructed the transient adapter +// with no ``tools`` argument and the session.update sent during ringing +// carried an empty tool list. +// --------------------------------------------------------------------------- + +describe('[unit] prewarm-handoff — built-in tools in primed session', () => { + function makeRealtimePhone(): Patter { + return new Patter({ + carrier: new Twilio({ + accountSid: 'ACtest000000000000000000000000000', + authToken: 'tok', + }), + phoneNumber: '+15551234567', + webhookUrl: 'example.test', + openaiKey: 'sk-test', + }); + } + + it('warmup adapter is constructed with user tools + transfer_call + end_call', () => { + const phone = makeRealtimePhone(); + + const customTool = { + name: 'lookup_order', + description: 'Look up an order by id', + parameters: { + type: 'object', + properties: { orderId: { type: 'string' } }, + required: ['orderId'], + }, + } as const; + + const agent: AgentOptions = { + systemPrompt: 'p', + provider: 'openai_realtime', + voice: 'alloy', + tools: [customTool], + }; + + const adapter = ( + phone as unknown as { + buildRealtimeWarmupAdapter: (a: AgentOptions) => unknown; + } + ).buildRealtimeWarmupAdapter(agent); + expect(adapter).not.toBeNull(); + + // ``tools`` is a private field on ``OpenAIRealtimeAdapter`` — access + // via bracket to inspect the wired value. + const tools = (adapter as { tools?: Array<{ name: string }> }).tools; + expect(tools).toBeDefined(); + const names = (tools ?? []).map((t) => t.name); + expect(names).toContain('lookup_order'); + expect(names).toContain('transfer_call'); + expect(names).toContain('end_call'); + }); + + it('warmup adapter still injects transfer_call + end_call when agent has no tools', () => { + const phone = makeRealtimePhone(); + const agent: AgentOptions = { + systemPrompt: 'p', + provider: 'openai_realtime', + }; + + const adapter = ( + phone as unknown as { + buildRealtimeWarmupAdapter: (a: AgentOptions) => unknown; + } + ).buildRealtimeWarmupAdapter(agent); + expect(adapter).not.toBeNull(); + + const tools = (adapter as { tools?: Array<{ name: string }> }).tools; + expect(tools).toBeDefined(); + const names = (tools ?? []).map((t) => t.name); + expect(names).toEqual(['transfer_call', 'end_call']); + }); +}); From e7c75fda3b7e763ae757dd82083b22a455b6ba9d Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Tue, 12 May 2026 14:54:36 +0200 Subject: [PATCH 4/6] fix(realtime): eliminate double-handshake on outbound prewarm (park does warmup work) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both ``_spawn_provider_warmup`` and ``_park_provider_connections`` built a transient ``OpenAIRealtimeAdapter`` and opened its own WebSocket against ``api.openai.com`` during the ringing window — two handshakes per outbound call where one suffices. The warmup-only handshake is a strict subset of what park performs (open WS → ``session.created`` → ``session.update`` → ``session.updated``) and park keeps the socket open for adoption. The warmup-side WS was opened, primed, and immediately discarded — pure waste of 150-400 ms of ringing-window budget, plus doubled rate-limit pressure against OpenAI for no benefit. Fix: ``_spawn_provider_warmup`` no longer builds the Realtime adapter at all; park is now the sole Realtime warm path on outbound calls. Pipeline-mode STT / TTS / LLM ``warmup()`` calls are unchanged. Tests: 2 new regression tests verify (1) ``_spawn_provider_warmup`` does not construct a Realtime adapter, and (2) end-to-end warmup+park together construct exactly one adapter (the one park uses). Updated 3 existing tests that asserted the old double-build behaviour. --- CHANGELOG.md | 2 + libraries/python/getpatter/client.py | 26 +-- libraries/python/tests/test_prewarm.py | 150 +++++++++++------- libraries/typescript/src/client.ts | 30 ++-- .../typescript/tests/unit/prewarm.test.ts | 41 ++++- 5 files changed, 161 insertions(+), 88 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 484f242..51f4035 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ### Fixed +- **Eliminated double WebSocket handshake on outbound OpenAI Realtime calls.** `_spawn_provider_warmup` (Py) / `spawnProviderWarmup` (TS) and `_park_provider_connections` / `parkProviderConnections` each built a transient `OpenAIRealtimeAdapter` and opened its own WS against `api.openai.com` during the ringing window — two handshakes per call where one suffices. The warmup-only handshake is a strict subset of what park performs (open WS → `session.created` → `session.update` → `session.updated`) and park keeps the socket open for adoption, so warmup's WS was opened, primed, and immediately discarded. Wasted 150-400 ms of ringing-window budget and doubled the rate-limit pressure against OpenAI. Fix: `_spawn_provider_warmup` no longer builds the Realtime adapter; park is the sole Realtime warm path. Pipeline-mode STT/TTS/LLM warmup is unchanged. Files: `libraries/python/getpatter/client.py:732`, `libraries/typescript/src/client.ts:982`. + - **Built-in tools (`transfer_call` / `end_call`) now land in the primed Realtime session.** `_build_realtime_warmup_adapter` (Py) / `buildRealtimeWarmupAdapter` (TS) constructed the transient `OpenAIRealtimeAdapter` without a `tools=` argument, so the `session.update` sent during ringing carried an empty tool list. When `StreamHandler.start()` adopted that parked WebSocket it skipped a fresh `session.update`, leaving the upstream session permanently unaware that the two Patter built-ins existed — `transfer_call` and `end_call` silently no-op'd on every hit-prewarm call (~80% of outbound calls when prewarm is enabled). Added a shared `build_realtime_tools(...)` helper in `stream_handler.py` and `buildRealtimeTools(...)` in `server.ts` so both the live and warmup paths build the canonical tool list byte-for-byte. Files: `libraries/python/getpatter/stream_handler.py:91`, `libraries/python/getpatter/client.py:790`, `libraries/typescript/src/server.ts:62`, `libraries/typescript/src/client.ts:1030`. - **OpenAI Realtime warmup now runs during the ringing window.** The `warmup()` method on `OpenAIRealtimeAdapter` (defined in both SDKs) was unreachable from `Patter.call()` — the provider warmup framework only iterated `agent.stt` / `agent.tts` / `agent.llm`, but OpenAI Realtime is an all-in-one provider that's server-instantiated at `StreamHandler.start()` time. `_spawn_provider_warmup` (Py) / `spawnProviderWarmup` (TS) now builds a transient `OpenAIRealtimeAdapter` from the resolved Agent + configured `openai_key` when `agent.provider == "openai_realtime"` and calls `warmup()` in parallel with the carrier `initiate_call`. Saves 150–400 ms of TLS + WebSocket handshake + `session.created` round-trip on the first turn. Files: `libraries/python/getpatter/client.py:732`, `libraries/typescript/src/client.ts:940`. diff --git a/libraries/python/getpatter/client.py b/libraries/python/getpatter/client.py index 75da04a..c06a564 100644 --- a/libraries/python/getpatter/client.py +++ b/libraries/python/getpatter/client.py @@ -735,12 +735,16 @@ def _spawn_provider_warmup(self, agent: Agent) -> None: Pipeline-mode providers (``agent.stt`` / ``agent.tts`` / ``agent.llm``) are picked up via the optional ``warmup()`` method on each instance. - The Realtime / ConvAI all-in-one adapters are server-instantiated - at ``stream_handler.start()`` time, so they are not reachable - through the Agent fields — a transient :class:`OpenAIRealtimeAdapter` - is built here from the resolved Agent + the configured OpenAI key - when ``agent.provider == "openai_realtime"`` so the canonical - session-prime handshake runs during the carrier ringing window. + + For ``openai_realtime`` mode the warmup-only handshake is a + strict subset of what :meth:`_park_provider_connections` already + performs (open WS → ``session.created`` → ``session.update`` → + ``session.updated``) — and park keeps the socket open for adoption. + Running both creates a double WebSocket handshake against + ``api.openai.com`` per call, wastes 150-400 ms of ringing-window + budget, and doubles the rate-limit pressure for no benefit. So + when ``agent.provider == "openai_realtime"`` we let park do all + the Realtime-side work and skip the warmup-only adapter here. Best-effort: each provider's ``warmup()`` is wrapped in ``asyncio.gather(..., return_exceptions=True)`` so a slow or @@ -761,9 +765,13 @@ def _spawn_provider_warmup(self, agent: Agent) -> None: continue targets.append(provider) - realtime_adapter = self._build_realtime_warmup_adapter(agent) - if realtime_adapter is not None: - targets.append(realtime_adapter) + # ``_build_realtime_warmup_adapter`` only fires for + # ``openai_realtime`` agents, and for those we defer 100% of the + # Realtime-side warm work to :meth:`_park_provider_connections` + # (which runs under the same ``agent.prewarm`` gate on every + # outbound call). The warmup-only handshake is a strict subset of + # what park performs, so running both makes two WS handshakes + # against ``api.openai.com`` per call instead of one. if not targets: return diff --git a/libraries/python/tests/test_prewarm.py b/libraries/python/tests/test_prewarm.py index 1d9bed8..dc4c4c5 100644 --- a/libraries/python/tests/test_prewarm.py +++ b/libraries/python/tests/test_prewarm.py @@ -970,18 +970,19 @@ async def _send_audio(chunk: bytes) -> None: # --------------------------------------------------------------------------- -async def test_spawn_provider_warmup_invokes_realtime_when_provider_is_openai_realtime() -> ( - None -): - """Agent in ``openai_realtime`` mode → transient ``OpenAIRealtimeAdapter`` - is built and its ``warmup()`` runs in parallel with STT/TTS/LLM warmups.""" - from unittest.mock import AsyncMock - - phone = _make_patter() - # Wire an OpenAI key on the resolved local_config the same way - # ``Patter._unpack_engine`` would after ``Patter.agent(engine=OpenAIRealtime(...))``. +async def test_spawn_provider_warmup_does_not_double_handshake_realtime() -> None: + """Agent in ``openai_realtime`` mode → ``_spawn_provider_warmup`` must + NOT build a transient ``OpenAIRealtimeAdapter``. + + The warmup-only handshake is a strict subset of what + :meth:`_park_provider_connections` performs (and park keeps the WS + open for adoption). Running both would create a double WS handshake + against ``api.openai.com`` per call, wasting 150-400 ms of the + ringing-window budget and doubling the rate-limit pressure. The + Realtime-side prewarm is performed exclusively by park.""" import dataclasses + phone = _make_patter() phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") agent = Agent( @@ -992,16 +993,75 @@ async def test_spawn_provider_warmup_invokes_realtime_when_provider_is_openai_re prewarm=True, ) - captured: dict[str, object] = {} + constructed = 0 class _RecordingAdapter: - def __init__(self, **kwargs: object) -> None: - captured["init_kwargs"] = kwargs - self.warmup = AsyncMock(return_value=None) - captured["instance"] = self + def __init__(self, **_kwargs: object) -> None: + nonlocal constructed + constructed += 1 + + import getpatter.providers.openai_realtime as realtime_mod + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] + try: + phone._spawn_provider_warmup(agent) + await _wait_for_tasks(phone) + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + + assert constructed == 0, ( + "Realtime adapter was constructed by warmup — park already does this work" + ) + + +async def test_outbound_realtime_call_opens_single_ws_handshake() -> None: + """End-to-end: an outbound ``openai_realtime`` call with prewarm=True + must open exactly ONE WebSocket against ``api.openai.com`` during the + ringing window — namely the one that park keeps alive for adoption. + + Regression for the double-handshake bug: previously warmup and park + each built a transient ``OpenAIRealtimeAdapter`` from the same + config, each opened its own WS, and only one of them was reusable + (park's). The warmup-only WS was opened, primed, and immediately + discarded — pure waste.""" + import dataclasses - def __repr__(self) -> str: # pragma: no cover - cosmetic - return "_RecordingAdapter()" + phone = _make_patter() + phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") + agent = Agent( + system_prompt="hi", + provider="openai_realtime", + voice="alloy", + prewarm=True, + ) + + constructed = 0 + open_parked_calls = 0 + warmup_calls = 0 + + class _RecordingAdapter: + def __init__(self, **_kwargs: object) -> None: + nonlocal constructed + constructed += 1 + + class _FakeWS: + closed = False + + async def close(self) -> None: + pass + + async def _open_parked() -> object: + nonlocal open_parked_calls + open_parked_calls += 1 + return _FakeWS() + + async def _warmup() -> None: + nonlocal warmup_calls + warmup_calls += 1 + + self.open_parked_connection = _open_parked + self.warmup = _warmup import getpatter.providers.openai_realtime as realtime_mod @@ -1009,20 +1069,18 @@ def __repr__(self) -> str: # pragma: no cover - cosmetic realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] try: phone._spawn_provider_warmup(agent) + phone._park_provider_connections(agent, "CAtest_double") await _wait_for_tasks(phone) finally: realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] - instance = captured.get("instance") - assert instance is not None, "Realtime adapter was not constructed" - instance.warmup.assert_awaited_once() - kwargs = captured["init_kwargs"] - assert kwargs["api_key"] == "sk-test" - assert kwargs["voice"] == "alloy" - assert kwargs["model"] == "gpt-4o-mini-realtime-preview" - # Twilio + Telnyx both bridge through g711_ulaw — production parity. - assert kwargs["audio_format"] == "g711_ulaw" - assert kwargs["instructions"] == "You are a test assistant." + # Exactly ONE adapter constructed (by park); warmup did not build one. + assert constructed == 1, ( + f"Expected 1 transient adapter, got {constructed} — double-handshake regression" + ) + # Park opened the WS; warmup never ran. + assert open_parked_calls == 1 + assert warmup_calls == 0 async def test_spawn_provider_warmup_skips_realtime_when_provider_is_pipeline() -> None: @@ -1085,38 +1143,10 @@ def __init__(self, **_kwargs: object) -> None: assert constructed == 0 -async def test_spawn_provider_warmup_swallows_realtime_warmup_failure(caplog) -> None: - """A failing Realtime ``warmup()`` is best-effort — must not raise.""" - from unittest.mock import AsyncMock - import dataclasses - - phone = _make_patter() - phone._local_config = dataclasses.replace(phone._local_config, openai_key="sk-test") - agent = Agent(system_prompt="hi", provider="openai_realtime") - - class _BoomAdapter: - def __init__(self, **_kwargs: object) -> None: - self.warmup = AsyncMock(side_effect=RuntimeError("network down")) - - import getpatter.providers.openai_realtime as realtime_mod - - original_adapter = realtime_mod.OpenAIRealtimeAdapter - realtime_mod.OpenAIRealtimeAdapter = _BoomAdapter # type: ignore[misc] - try: - with caplog.at_level(logging.DEBUG, logger="getpatter"): - phone._spawn_provider_warmup(agent) - await _wait_for_tasks(phone) - finally: - realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] - - # The failure is logged at DEBUG, not propagated. - assert any("warmup failed" in rec.message.lower() for rec in caplog.records) - - -async def test_spawn_provider_warmup_realtime_forwards_optional_engine_knobs() -> None: +async def test_park_realtime_forwards_optional_engine_knobs() -> None: """``reasoning_effort`` / ``input_audio_transcription_model`` reach the - warmup adapter so the primed session matches the production session - byte-for-byte.""" + park-side adapter (since park is now the sole Realtime warm path) so + the primed session matches the production session byte-for-byte.""" from unittest.mock import AsyncMock import dataclasses @@ -1134,14 +1164,14 @@ async def test_spawn_provider_warmup_realtime_forwards_optional_engine_knobs() - class _RecordingAdapter: def __init__(self, **kwargs: object) -> None: captured["init_kwargs"] = kwargs - self.warmup = AsyncMock() + self.open_parked_connection = AsyncMock(return_value=object()) import getpatter.providers.openai_realtime as realtime_mod original_adapter = realtime_mod.OpenAIRealtimeAdapter realtime_mod.OpenAIRealtimeAdapter = _RecordingAdapter # type: ignore[misc] try: - phone._spawn_provider_warmup(agent) + phone._park_provider_connections(agent, "CAtest_knobs") await _wait_for_tasks(phone) finally: realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] diff --git a/libraries/typescript/src/client.ts b/libraries/typescript/src/client.ts index 3ead61e..8c1d683 100644 --- a/libraries/typescript/src/client.ts +++ b/libraries/typescript/src/client.ts @@ -968,12 +968,16 @@ export class Patter { * * Pipeline-mode providers (``agent.stt`` / ``agent.tts`` / ``agent.llm``) * are picked up via the optional ``warmup()`` method on each instance. - * The Realtime / ConvAI all-in-one adapters are server-instantiated at - * ``StreamHandler.start`` time, so they are not reachable through the - * Agent fields — a transient ``OpenAIRealtimeAdapter`` is built here - * from the resolved Agent + the configured OpenAI key when the agent - * is in ``openai_realtime`` mode so the canonical session-prime - * handshake runs during the carrier ringing window. + * + * For ``openai_realtime`` mode the warmup-only handshake is a strict + * subset of what ``parkProviderConnections`` already performs (open WS + * → ``session.created`` → ``session.update`` → ``session.updated``) — + * and park keeps the socket open for adoption. Running both creates a + * double WebSocket handshake against ``api.openai.com`` per call, + * wastes 150-400 ms of ringing-window budget, and doubles the + * rate-limit pressure for no benefit. So when the agent is in + * ``openai_realtime`` mode we let park do all the Realtime-side work + * and skip the warmup-only adapter here. * * Best-effort: each provider's optional ``warmup()`` is wrapped in * ``Promise.allSettled`` so a slow or failing endpoint cannot block @@ -994,13 +998,13 @@ export class Patter { collect(agent.tts, 'tts'); collect(agent.llm, 'llm'); - const realtimeAdapter = this.buildRealtimeWarmupAdapter(agent); - if (realtimeAdapter !== null) { - targets.push({ - name: 'openai_realtime', - fn: () => realtimeAdapter.warmup(), - }); - } + // ``buildRealtimeWarmupAdapter`` only fires for ``openai_realtime`` + // agents, and for those we defer 100% of the Realtime-side warm + // work to ``parkProviderConnections`` (which runs under the same + // ``agent.prewarm`` gate on every outbound call). The warmup-only + // handshake is a strict subset of what park performs, so running + // both makes two WS handshakes against ``api.openai.com`` per call + // instead of one. if (targets.length === 0) return; diff --git a/libraries/typescript/tests/unit/prewarm.test.ts b/libraries/typescript/tests/unit/prewarm.test.ts index 6e9353d..7aee037 100644 --- a/libraries/typescript/tests/unit/prewarm.test.ts +++ b/libraries/typescript/tests/unit/prewarm.test.ts @@ -202,7 +202,14 @@ describe('[unit] prewarm — OpenAI Realtime warmup wiring', () => { warmupSpy.mockRestore(); }); - it('builds a Realtime adapter and invokes warmup when provider=openai_realtime', async () => { + it('does NOT double-handshake Realtime — park does all the Realtime warm work', async () => { + // Regression: previously ``spawnProviderWarmup`` built a transient + // OpenAIRealtimeAdapter and called ``warmup()`` on it, then + // ``parkProviderConnections`` built ANOTHER one and called + // ``openParkedConnection()``. Two WS handshakes per call against + // api.openai.com — wasted 150-400 ms and doubled rate-limit pressure. + // Now warmup skips the Realtime adapter entirely; park is the sole + // Realtime warm path. const agent: AgentOptions = { systemPrompt: 'You are a test assistant.', provider: 'openai_realtime', @@ -212,7 +219,7 @@ describe('[unit] prewarm — OpenAI Realtime warmup wiring', () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any (phone as any).spawnProviderWarmup(agent); await drainPrewarmTasks(phone); - expect(warmupSpy).toHaveBeenCalledTimes(1); + expect(warmupSpy).not.toHaveBeenCalled(); }); it('does NOT build a Realtime adapter in pipeline mode', async () => { @@ -254,17 +261,39 @@ describe('[unit] prewarm — OpenAI Realtime warmup wiring', () => { expect(warmupSpy).not.toHaveBeenCalled(); }); - it('a failing Realtime warmup is best-effort and never propagates', async () => { - warmupSpy.mockRejectedValueOnce(new Error('network down')); + it('outbound Realtime call opens exactly one WS handshake during ringing (no double-handshake)', async () => { + // End-to-end guarantee: warmup + park together result in a single + // OpenAIRealtimeAdapter being constructed (the one park uses), not + // two as in the old buggy behaviour. + const realtimeModule = await import('../../src/providers/openai-realtime'); + let constructed = 0; + const ctorSpy = vi + .spyOn(realtimeModule, 'OpenAIRealtimeAdapter') + .mockImplementation(((..._args: unknown[]) => { + constructed += 1; + return { + warmup: async () => undefined, + openParkedConnection: async () => ({ readyState: 1, close: () => undefined }), + }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + }) as any); + const agent: AgentOptions = { systemPrompt: 'hi', provider: 'openai_realtime', + voice: 'alloy', + prewarm: true, }; // eslint-disable-next-line @typescript-eslint/no-explicit-any (phone as any).spawnProviderWarmup(agent); - // Must not throw out of the task drain. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (phone as any).parkProviderConnections(agent, 'CAtest-no-double'); await drainPrewarmTasks(phone); - expect(warmupSpy).toHaveBeenCalledTimes(1); + await new Promise((r) => setTimeout(r, 30)); + + expect(constructed).toBe(1); + + ctorSpy.mockRestore(); }); }); From f7d535b539ad65d17bdff4a9227ee68155a77fb3 Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Tue, 12 May 2026 15:00:03 +0200 Subject: [PATCH 5/6] fix(realtime): recreate adapter on adopt failure to avoid stale state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When ``adopt_websocket`` / ``adoptWebSocket`` raised mid-adoption, the partially-adopted ``OpenAIRealtimeAdapter`` was left in an inconsistent state: ``_running`` / ``messageListenerAttached`` was already true, the heartbeat task may have started, ``_current_response_item_id`` / ``currentResponseItemId`` may have carried leaked state from the parked session, and the ``_ws`` / ``ws`` reference pointed at a now-closed socket. Falling through to ``connect()`` on that carcass raced ``session.created`` against stale state, ran two heartbeat timers, and sometimes attached a second message listener to the new socket — silent corruption of every adopt-failed call. Fix: when adopt raises, re-instantiate the adapter (via the existing ``adapter_kwargs`` in Python, ``deps.buildAIAdapter`` in TS) before the cold ``connect()`` path runs, guaranteeing a clean slate. Tests: regression test in each SDK constructs an adapter whose ``adopt_websocket`` throws, then asserts (a) a second adapter instance was created, (b) ``connect()`` ran on the fresh adapter, (c) the handler's adapter reference points at the fresh instance. --- CHANGELOG.md | 2 + libraries/python/getpatter/stream_handler.py | 13 ++ .../python/tests/test_prewarm_handoff.py | 81 +++++++++++++ libraries/typescript/src/stream-handler.ts | 14 +++ .../tests/unit/prewarm-handoff.test.ts | 114 ++++++++++++++++++ 5 files changed, 224 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51f4035..a8a126a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ### Fixed +- **Adapter state leak after a failed parked-session adoption.** When `adopt_websocket` / `adoptWebSocket` raised mid-adoption, the partially-adopted `OpenAIRealtimeAdapter` was in an inconsistent state — `_running` / `messageListenerAttached` was already `true`, the heartbeat task may have started, `_current_response_item_id` / `currentResponseItemId` may have carried leaked state from the parked session, and the `_ws` / `ws` reference pointed at a now-closed socket. Falling through to `connect()` on that carcass raced `session.created` against stale state and corrupted the live call. Fix: handler now re-instantiates the adapter before the cold connect path, guaranteeing a clean slate. Files: `libraries/python/getpatter/stream_handler.py:998`, `libraries/typescript/src/stream-handler.ts:2229`. + - **Eliminated double WebSocket handshake on outbound OpenAI Realtime calls.** `_spawn_provider_warmup` (Py) / `spawnProviderWarmup` (TS) and `_park_provider_connections` / `parkProviderConnections` each built a transient `OpenAIRealtimeAdapter` and opened its own WS against `api.openai.com` during the ringing window — two handshakes per call where one suffices. The warmup-only handshake is a strict subset of what park performs (open WS → `session.created` → `session.update` → `session.updated`) and park keeps the socket open for adoption, so warmup's WS was opened, primed, and immediately discarded. Wasted 150-400 ms of ringing-window budget and doubled the rate-limit pressure against OpenAI. Fix: `_spawn_provider_warmup` no longer builds the Realtime adapter; park is the sole Realtime warm path. Pipeline-mode STT/TTS/LLM warmup is unchanged. Files: `libraries/python/getpatter/client.py:732`, `libraries/typescript/src/client.ts:982`. - **Built-in tools (`transfer_call` / `end_call`) now land in the primed Realtime session.** `_build_realtime_warmup_adapter` (Py) / `buildRealtimeWarmupAdapter` (TS) constructed the transient `OpenAIRealtimeAdapter` without a `tools=` argument, so the `session.update` sent during ringing carried an empty tool list. When `StreamHandler.start()` adopted that parked WebSocket it skipped a fresh `session.update`, leaving the upstream session permanently unaware that the two Patter built-ins existed — `transfer_call` and `end_call` silently no-op'd on every hit-prewarm call (~80% of outbound calls when prewarm is enabled). Added a shared `build_realtime_tools(...)` helper in `stream_handler.py` and `buildRealtimeTools(...)` in `server.ts` so both the live and warmup paths build the canonical tool list byte-for-byte. Files: `libraries/python/getpatter/stream_handler.py:91`, `libraries/python/getpatter/client.py:790`, `libraries/typescript/src/server.ts:62`, `libraries/typescript/src/client.ts:1030`. diff --git a/libraries/python/getpatter/stream_handler.py b/libraries/python/getpatter/stream_handler.py index f4d4224..9d88655 100644 --- a/libraries/python/getpatter/stream_handler.py +++ b/libraries/python/getpatter/stream_handler.py @@ -996,6 +996,7 @@ async def start(self) -> None: logger.debug("pop_prewarmed_connections raised for Realtime: %s", exc) adopted = False + adopt_failed = False if parked_realtime is not None: adopt = getattr(self._adapter, "adopt_websocket", None) ws_alive = parked_realtime is not None and not getattr( @@ -1015,6 +1016,7 @@ async def start(self) -> None: "Realtime adopt_websocket failed: %s; falling back to connect", exc, ) + adopt_failed = True try: await parked_realtime.close() except Exception: @@ -1025,6 +1027,17 @@ async def start(self) -> None: except Exception: pass + # When ``adopt_websocket`` raised mid-call the adapter is in an + # inconsistent state: ``_running`` may be ``True``, the heartbeat + # task may have been scheduled, ``_current_response_item_id`` may + # carry leaked state from the parked session, and the partially- + # adopted ``_ws`` reference may point at a now-closed socket. + # Calling ``connect()`` on this carcass would race ``session.created`` + # against stale state and corrupt the live call. Re-instantiate the + # adapter so the cold path starts from a clean slate. + if adopt_failed: + self._adapter = OpenAIRealtimeAdapter(**adapter_kwargs) + if not adopted: await self._adapter.connect() logger.debug("OpenAI Realtime connected (cold)") diff --git a/libraries/python/tests/test_prewarm_handoff.py b/libraries/python/tests/test_prewarm_handoff.py index 7edf6c8..ab410e8 100644 --- a/libraries/python/tests/test_prewarm_handoff.py +++ b/libraries/python/tests/test_prewarm_handoff.py @@ -592,3 +592,84 @@ def __init__(self, **kwargs: object) -> None: inst = adapter_instance["instance"] inst.connect.assert_awaited_once() # type: ignore[attr-defined] inst.adopt_websocket.assert_not_called() # type: ignore[attr-defined] + + +async def test_realtime_stream_handler_recreates_adapter_on_adopt_failure() -> None: + """When ``adopt_websocket`` raises, the partially-adopted adapter is + in an inconsistent state — ``_running`` may be True, the heartbeat + task may have been scheduled, and ``_current_response_item_id`` may + carry leaked state from the parked session. Calling ``connect()`` on + that carcass would race ``session.created`` against stale state. + The handler must re-instantiate the adapter before falling through + to the cold ``connect()`` path.""" + from unittest.mock import AsyncMock, MagicMock + + from getpatter.stream_handler import OpenAIRealtimeStreamHandler + + parked_ws = FakeWS() # alive, but adopt will throw + + agent = Agent( + system_prompt="hi", + first_message="", + provider="openai_realtime", + voice="alloy", + ) + audio_sender = MagicMock() + audio_sender.send_audio = AsyncMock() + + handler = OpenAIRealtimeStreamHandler( + agent=agent, + audio_sender=audio_sender, + call_id="CAtest_adopt_fail", + caller="+15550000001", + callee="+15550000002", + resolved_prompt="hi", + metrics=None, + openai_key="sk-test", + audio_format="g711_ulaw", + pop_prewarmed_connections=lambda _cid: {"openai_realtime": parked_ws}, + ) + + import getpatter.providers.openai_realtime as realtime_mod + + instances: list[object] = [] + + class _StubAdapter: + def __init__(self, **_kwargs: object) -> None: + self.connect = AsyncMock() + self.send_first_message = AsyncMock() + self.send_text = AsyncMock() + self.receive_events = AsyncMock() + # First-built adapter's adopt raises; subsequent adapters + # must NOT inherit this behaviour because the SDK rebuilds + # the adapter on failure. + if not instances: + self.adopt_websocket = MagicMock( + side_effect=RuntimeError("adopt blew up") + ) + else: + self.adopt_websocket = MagicMock() + instances.append(self) + + original_adapter = realtime_mod.OpenAIRealtimeAdapter + realtime_mod.OpenAIRealtimeAdapter = _StubAdapter # type: ignore[misc] + try: + await handler.start() + finally: + realtime_mod.OpenAIRealtimeAdapter = original_adapter # type: ignore[misc] + bg = getattr(handler, "_background_task", None) + if bg is not None: + bg.cancel() + + # Two adapter instances were constructed: the first failed adopt; + # the second was created fresh for the cold ``connect()`` path. + assert len(instances) == 2, ( + f"Expected the adapter to be recreated after adopt failure; " + f"saw {len(instances)} instance(s)" + ) + # The fresh adapter (instances[-1]) ran ``connect()``; the failed + # one did not. + instances[-1].connect.assert_awaited_once() # type: ignore[attr-defined] + instances[0].connect.assert_not_called() # type: ignore[attr-defined] + # ``handler._adapter`` now references the fresh instance. + assert handler._adapter is instances[-1] diff --git a/libraries/typescript/src/stream-handler.ts b/libraries/typescript/src/stream-handler.ts index 214668b..4ec1ecf 100644 --- a/libraries/typescript/src/stream-handler.ts +++ b/libraries/typescript/src/stream-handler.ts @@ -2248,6 +2248,7 @@ export class StreamHandler { } let adopted = false; + let adoptFailed = false; if (parkedRealtime && this.adapter instanceof OpenAIRealtimeAdapter) { const wsAlive = parkedRealtime.readyState === 1 /* OPEN */; const adoptFn = (this.adapter as { adoptWebSocket?: (ws: import('ws').WebSocket) => void }).adoptWebSocket; @@ -2262,6 +2263,7 @@ export class StreamHandler { getLogger().debug( `Realtime adoptWebSocket failed: ${String(err)}; falling back to connect`, ); + adoptFailed = true; try { parkedRealtime.close(); } catch { /* ignore */ } } } else { @@ -2269,6 +2271,18 @@ export class StreamHandler { } } + // When ``adoptWebSocket`` raised mid-call the adapter is in an + // inconsistent state: ``messageListenerAttached`` may be true, the + // heartbeat timer may have started, ``currentResponseItemId`` may + // carry leaked state from the parked session, and the partially- + // adopted ``ws`` reference may point at a now-closed socket. + // Calling ``connect()`` on this carcass would race ``session.created`` + // against stale state and corrupt the live call. Re-instantiate the + // adapter so the cold path starts from a clean slate. + if (adoptFailed) { + this.adapter = this.deps.buildAIAdapter(resolvedPrompt); + } + if (!adopted) { try { await this.adapter.connect(); diff --git a/libraries/typescript/tests/unit/prewarm-handoff.test.ts b/libraries/typescript/tests/unit/prewarm-handoff.test.ts index 567f723..f4e4339 100644 --- a/libraries/typescript/tests/unit/prewarm-handoff.test.ts +++ b/libraries/typescript/tests/unit/prewarm-handoff.test.ts @@ -368,3 +368,117 @@ describe('[unit] prewarm-handoff — built-in tools in primed session', () => { expect(names).toEqual(['transfer_call', 'end_call']); }); }); + +// --------------------------------------------------------------------------- +// Adopt-failure recovery — when ``adoptWebSocket`` raises, the partially- +// adopted adapter is in an inconsistent state (messageListenerAttached may +// be true, heartbeat may have started, currentResponseItemId may carry leaked +// state from the parked session). Calling ``connect()`` on that carcass races +// ``session.created`` against stale state. Handler must recreate the adapter +// before falling through to the cold ``connect()`` path. +// --------------------------------------------------------------------------- + +describe('[unit] prewarm-handoff — adapter recreation on adopt failure', () => { + it('recreates the adapter when adoptWebSocket throws, then connects on the fresh one', async () => { + const { StreamHandler } = await import('../../src/stream-handler'); + const { OpenAIRealtimeAdapter } = await import('../../src/providers/openai-realtime'); + const { MetricsStore } = await import('../../src/dashboard/store'); + const { RemoteMessageHandler } = await import('../../src/remote-message'); + const wsMod = await import('ws'); + + // Force ``adoptWebSocket`` on every adapter instance to throw — the + // SDK must respond by rebuilding the adapter before falling through. + const adoptSpy = vi + .spyOn(OpenAIRealtimeAdapter.prototype, 'adoptWebSocket') + .mockImplementation(() => { + throw new Error('adopt blew up'); + }); + const connectSpy = vi + .spyOn(OpenAIRealtimeAdapter.prototype, 'connect') + .mockResolvedValue(undefined); + const onEventSpy = vi + .spyOn(OpenAIRealtimeAdapter.prototype, 'onEvent') + .mockImplementation(() => undefined); + + // Parked WS — alive (readyState OPEN); ``adoptWebSocket`` will fail + // before it gets attached. + const parkedWs = { + readyState: 1, + close: () => undefined, + } as unknown as import('ws').WebSocket; + + const built: Array = []; + const deps = { + config: { openaiKey: 'sk-test' }, + agent: { + systemPrompt: 'Test agent', + provider: 'openai_realtime' as const, + }, + bridge: { + label: 'TestBridge', + telephonyProvider: 'twilio' as const, + sendAudio: vi.fn(), + sendMark: vi.fn(), + sendClear: vi.fn(), + transferCall: vi.fn().mockResolvedValue(undefined), + endCall: vi.fn().mockResolvedValue(undefined), + createStt: vi.fn().mockReturnValue(null), + queryTelephonyCost: vi.fn().mockResolvedValue(undefined), + }, + metricsStore: new MetricsStore(), + pricing: null, + remoteHandler: new RemoteMessageHandler(), + recording: false, + buildAIAdapter: vi.fn().mockImplementation((_prompt: string) => { + const instance = new OpenAIRealtimeAdapter('sk-test', 'gpt-4o-mini-realtime-preview'); + built.push(instance); + return instance; + }), + sanitizeVariables: vi.fn((raw: Record) => { + const safe: Record = {}; + for (const [k, v] of Object.entries(raw)) safe[k] = String(v); + return safe; + }), + resolveVariables: vi.fn((tpl: string) => tpl), + popPrewarmedConnections: vi.fn().mockReturnValue({ openaiRealtime: parkedWs }), + }; + + const mockWs = { + send: vi.fn(), + close: vi.fn(), + on: vi.fn(), + once: vi.fn(), + readyState: 1, + removeListener: vi.fn(), + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + } as unknown as import('ws').WebSocket; + + const handler = new StreamHandler( + deps, + mockWs, + '+15551111111', + '+15552222222', + ); + + await handler.handleCallStart('CAtest-recreate'); + + // buildAIAdapter was called twice: first to build the original + // adapter (whose adopt failed), then again to recreate it for the + // cold connect path. + expect(deps.buildAIAdapter).toHaveBeenCalledTimes(2); + // Both adapters were OpenAIRealtimeAdapter instances. + expect(built).toHaveLength(2); + expect(built[0]).toBeInstanceOf(OpenAIRealtimeAdapter); + expect(built[1]).toBeInstanceOf(OpenAIRealtimeAdapter); + expect(built[0]).not.toBe(built[1]); + // adopt was called once (on the first adapter, threw). connect was + // called once (on the fresh adapter). + expect(adoptSpy).toHaveBeenCalledTimes(1); + expect(connectSpy).toHaveBeenCalledTimes(1); + + adoptSpy.mockRestore(); + connectSpy.mockRestore(); + onEventSpy.mockRestore(); + }); +}); From 7814bc84ac9fb557b4af44286aeb6317c09232ce Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Tue, 12 May 2026 15:01:32 +0200 Subject: [PATCH 6/6] refactor(stream-handler): duck-type adoptWebSocket capability (drop instanceof) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TS realtime adopt branch in ``stream-handler.ts:initRealtimeAdapter`` previously gated the prewarm-handoff path with two ``this.adapter instanceof OpenAIRealtimeAdapter`` checks. Switched both to a single duck-type check (``typeof adoptWebSocket === 'function'``) so: 1. The generic ``stream-handler`` module stays provider-agnostic on this hot path. Pipeline-only users still get the symbol resolved at module load (the import is used elsewhere in this file for legitimate provider-specific behaviour), but the adopt-handoff gate no longer demands a concrete class identity. 2. The check mirrors the Python handler's ``getattr(self._adapter, "adopt_websocket", None)`` shape — both SDKs now use capability-based detection rather than identity. 3. Future Realtime-like adapters (e.g. a different vendor's all-in-one provider that also exposes ``adoptWebSocket``) can opt into the adopt flow simply by implementing the method, no SDK change needed. No behaviour change: the same WS-adopt path runs for the same adapter class. Existing adopt-handoff tests cover the behaviour and continue to pass. --- CHANGELOG.md | 4 ++++ libraries/typescript/src/stream-handler.ts | 22 +++++++++++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8a126a..9a377d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## Unreleased +### Changed + +- **`StreamHandler` adopt-capability check now uses duck typing.** The TS realtime adopt branch in `stream-handler.ts:2229` previously relied on `this.adapter instanceof OpenAIRealtimeAdapter` to gate the prewarm-handoff path. Switched to a duck-type check (`typeof adapter.adoptWebSocket === 'function'`) so the generic stream-handler module stays provider-agnostic on this hot path and matches the Python handler's `getattr(self._adapter, "adopt_websocket", None)` shape. Files: `libraries/typescript/src/stream-handler.ts:2229`. + ### Fixed - **Adapter state leak after a failed parked-session adoption.** When `adopt_websocket` / `adoptWebSocket` raised mid-adoption, the partially-adopted `OpenAIRealtimeAdapter` was in an inconsistent state — `_running` / `messageListenerAttached` was already `true`, the heartbeat task may have started, `_current_response_item_id` / `currentResponseItemId` may have carried leaked state from the parked session, and the `_ws` / `ws` reference pointed at a now-closed socket. Falling through to `connect()` on that carcass raced `session.created` against stale state and corrupted the live call. Fix: handler now re-instantiates the adapter before the cold connect path, guaranteeing a clean slate. Files: `libraries/python/getpatter/stream_handler.py:998`, `libraries/typescript/src/stream-handler.ts:2229`. diff --git a/libraries/typescript/src/stream-handler.ts b/libraries/typescript/src/stream-handler.ts index 4ec1ecf..42b7e82 100644 --- a/libraries/typescript/src/stream-handler.ts +++ b/libraries/typescript/src/stream-handler.ts @@ -2235,8 +2235,21 @@ export class StreamHandler { // window by ``Patter.parkProviderConnections``. Saves the cold // ``new WebSocket`` + ``session.created`` + ``session.update`` // round-trip (~250-450 ms on first turn). + // + // Adopt capability is detected by duck typing + // (``typeof adoptWebSocket === 'function'``) rather than + // ``instanceof OpenAIRealtimeAdapter``: pipeline-only users should + // not pay the cost of a hard provider import in the generic + // stream-handler hot path, and the duck check mirrors the Python + // handler's ``getattr(self._adapter, "adopt_websocket", None)`` + // shape (parity with the provider-agnostic rule). + const adoptFn = (this.adapter as { + adoptWebSocket?: (ws: import('ws').WebSocket) => void; + }).adoptWebSocket; + const canAdopt = typeof adoptFn === 'function'; + let parkedRealtime: import('ws').WebSocket | undefined; - if (this.adapter instanceof OpenAIRealtimeAdapter && this.deps.popPrewarmedConnections) { + if (canAdopt && this.deps.popPrewarmedConnections) { try { const slot = this.deps.popPrewarmedConnections(this.callId); parkedRealtime = slot?.openaiRealtime; @@ -2249,12 +2262,11 @@ export class StreamHandler { let adopted = false; let adoptFailed = false; - if (parkedRealtime && this.adapter instanceof OpenAIRealtimeAdapter) { + if (parkedRealtime && canAdopt) { const wsAlive = parkedRealtime.readyState === 1 /* OPEN */; - const adoptFn = (this.adapter as { adoptWebSocket?: (ws: import('ws').WebSocket) => void }).adoptWebSocket; - if (typeof adoptFn === 'function' && wsAlive) { + if (wsAlive) { try { - adoptFn.call(this.adapter, parkedRealtime); + adoptFn!.call(this.adapter, parkedRealtime); adopted = true; getLogger().info( `[CONNECT] callId=${this.callId} provider=openai_realtime source=adopted ms=0`,