diff --git a/CHANGELOG.md b/CHANGELOG.md index a1a9088..dff861f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## 0.6.1 (2026-05-12) +### Fixed — OpenAI Realtime firstMessage silently cancelled on prewarm-adopted sessions + +With `agent.prewarm=true` (default) the OpenAI Realtime WebSocket is opened, primed (`session.created` → `session.update` → `session.updated`), and parked during the carrier ringing window; `StreamHandler` then adopts it at call pickup with `source=adopted ms=0`. The audio bridge between the Twilio/Telnyx stream and the upstream OpenAI session is therefore live the instant the callee answers. OpenAI's server-VAD treats any caller audio that arrives before the assistant's first audio frame as a barge-in and cancels the in-flight `response.create`. In practice the caller's "Hi" / "Hello?" reliably reaches OpenAI in the ~250-450 ms before the firstMessage audio starts streaming back — so the configured `first_message` was *silently cancelled* and the caller heard the agent respond to their hello instead of delivering the scripted opening. The cold-path `connect()` masked this because its WS handshake naturally buffered ~300 ms of caller silence. + +Fix: `send_first_message` (Py) / `sendFirstMessage` (TS) now arm a one-shot server-VAD lockout immediately before issuing `response.create` for the firstMessage turn. A `session.update` with `turn_detection: null` (an OpenAI-documented value that disables server-VAD entirely — no audio-driven response cancellation) is sent first, then the `response.create`. The receive loop / message listener watches for the firstMessage `response.done` and re-issues a `session.update` restoring the original `turn_detection` block (snapshotted from `vad_type` / `silence_duration_ms` / `threshold` / `prefix_padding_ms`) so barge-in works normally for every subsequent turn. The lockout is strictly one-shot: subsequent `response.done` events (e.g. from later turns) do not re-send the restore. Best-effort: failures on either send fall back to the pre-fix behaviour without breaking the call. Complements the client-side `_first_audio_sent_at` / `firstAudioSentAt` guard added in PR #92 — that one prevents the local audio bridge from clearing the playout buffer on caller speech, this one prevents the *server* from cancelling the response. + +Files: `libraries/python/getpatter/providers/openai_realtime.py`, `libraries/typescript/src/providers/openai-realtime.ts`. Coverage: `libraries/python/tests/unit/test_providers_io_unit.py` (3 new tests covering lockout sequence, custom `silence_duration_ms` restore, and one-shot semantics) + `libraries/typescript/tests/unit/openai-realtime.test.ts` (4 new tests covering the same behaviours plus the no-ws no-op). + ### Changed — `StreamHandler` adopt-capability check now uses duck typing The TS realtime adopt branch in `stream-handler.ts` previously relied on `this.adapter instanceof OpenAIRealtimeAdapter` to gate the prewarm-handoff path. Switched to a duck-type check (`typeof adapter.adoptWebSocket === 'function'`) so the generic stream-handler module stays provider-agnostic on this hot path and matches the Python handler's `getattr(self._adapter, "adopt_websocket", None)` shape. Files: `libraries/typescript/src/stream-handler.ts`. diff --git a/libraries/python/getpatter/providers/openai_realtime.py b/libraries/python/getpatter/providers/openai_realtime.py index d9462b3..1bd61c3 100644 --- a/libraries/python/getpatter/providers/openai_realtime.py +++ b/libraries/python/getpatter/providers/openai_realtime.py @@ -164,6 +164,16 @@ def __init__( import time as _time self._session_start_monotonic: float = _time.monotonic() + # ``send_first_message`` arms a one-shot server-VAD lockout so the + # firstMessage turn cannot be interrupted by the caller's first audio + # frames (which happens reliably on prewarm-adopted sessions where the + # adopt+response.create races the caller's "hello?" by a few hundred + # ms). The flag is consumed inside ``receive_events`` when the + # firstMessage ``response.done`` arrives, at which point we re-issue + # ``session.update`` to restore the original ``turn_detection`` block + # captured here. See ``send_first_message`` for the full rationale. + self._first_message_protection_pending: bool = False + self._saved_turn_detection: dict | None = None def record_session_end(self) -> None: """Emit ``patter.cost.realtime_minutes`` for the elapsed session duration.""" @@ -603,6 +613,38 @@ async def _iter_raw(): self._current_response_item_id = None self._current_response_audio_ms = 0 self._current_response_first_audio_at = None + # If ``send_first_message`` armed the server-VAD lockout + # for the firstMessage turn, this ``response.done`` + # signals the firstMessage finished streaming and it is + # safe to restore the original ``turn_detection`` so + # barge-in works for the rest of the call. Best-effort: + # a failed send leaves the session without VAD, which + # degrades barge-in but does not break the call — the + # next ``session.update`` (e.g. on a tool turn) would + # also rearm. See ``send_first_message`` for the + # full rationale. + if ( + self._first_message_protection_pending + and self._saved_turn_detection is not None + ): + try: + await self._ws.send( + json.dumps( + { + "type": "session.update", + "session": { + "turn_detection": self._saved_turn_detection, + }, + } + ) + ) + except Exception as exc: # noqa: BLE001 + logger.debug( + "first_message: turn_detection restore failed: %s", + exc, + ) + self._first_message_protection_pending = False + self._saved_turn_detection = None yield ("response_done", data.get("response", {})) elif event_type == "error": @@ -704,9 +746,63 @@ async def send_first_message(self, text: str) -> None: producing role-confused openings (e.g. a receptionist agent responding "I'd like to schedule a haircut" because it took its own first_message as a customer cue). + + Server-VAD lockout during firstMessage + ------------------------------------- + + OpenAI Realtime server-VAD treats any caller audio that arrives + before the assistant's first audio frame as a barge-in and cancels + the in-flight ``response.create``. On the prewarm-adopted path + (``source=adopted ms=0``) the WS→audio bridge opens immediately at + call pickup; the caller's "Hi" / "Hello?" reliably reaches OpenAI in + the ~250-450 ms before the firstMessage audio starts streaming back, + so the configured ``first_message`` is *silently cancelled* and the + caller hears the agent respond to their hello instead of delivering + the scripted opening. + + Fix: send a ``session.update`` that sets ``turn_detection`` to + ``None`` (OpenAI-documented: disables server-VAD entirely, no + audio-driven response cancellation), then ``response.create`` the + firstMessage. ``receive_events`` re-arms ``turn_detection`` from the + saved snapshot the moment ``response.done`` arrives for the + firstMessage turn, restoring normal barge-in for every subsequent + turn. The complementary client-side guard (``_first_audio_sent_at`` + in ``stream_handler``) already prevents the caller's outbound clear + from firing — this lockout closes the gap on the *server* side. + + Best-effort: if ``session.update`` raises we still proceed with + ``response.create``. The fallback behaviour matches the pre-fix + state — a higher likelihood of first-message preemption — but never + worse, so the call still completes. """ if self._ws is None: return + # Snapshot the original turn_detection block so ``receive_events`` + # can restore it after the firstMessage ``response.done``. We build + # it from the same configured fields ``_build_session_config`` uses + # so the restore is byte-identical to the cold connect path. + self._saved_turn_detection = { + "type": self.vad_type, + "threshold": 0.5, + "prefix_padding_ms": 300, + "silence_duration_ms": self.silence_duration_ms, + } + self._first_message_protection_pending = True + try: + await self._ws.send( + json.dumps( + { + "type": "session.update", + "session": {"turn_detection": None}, + } + ) + ) + except Exception as exc: # noqa: BLE001 - best-effort lockout + logger.debug("send_first_message: turn_detection lockout failed: %s", exc) + # Clear protection state so receive_events doesn't restore a + # turn_detection we never actually disabled. + self._first_message_protection_pending = False + self._saved_turn_detection = None await self._ws.send( json.dumps( { diff --git a/libraries/python/tests/unit/test_providers_io_unit.py b/libraries/python/tests/unit/test_providers_io_unit.py index 71ada0a..30e8ef6 100644 --- a/libraries/python/tests/unit/test_providers_io_unit.py +++ b/libraries/python/tests/unit/test_providers_io_unit.py @@ -80,6 +80,7 @@ async def _fake_ws_connect(mock_ws): def _ws_connect_side_effect(mock_ws): async def _connect(*a, **kw): return mock_ws + return _connect @@ -100,7 +101,10 @@ async def test_connect_sends_session_update(self) -> None: mock_ws = AsyncMock() mock_ws.recv.return_value = json.dumps({"type": "session.created"}) - with patch("getpatter.providers.openai_realtime.websockets.connect", side_effect=_ws_connect_side_effect(mock_ws)): + with patch( + "getpatter.providers.openai_realtime.websockets.connect", + side_effect=_ws_connect_side_effect(mock_ws), + ): await adapter.connect() assert adapter._running is True @@ -136,12 +140,21 @@ async def test_connect_honours_custom_silence_duration_ms(self) -> None: async def test_connect_with_tools(self) -> None: from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter - tools = [{"name": "search", "description": "Search", "parameters": {"type": "object"}}] + tools = [ + { + "name": "search", + "description": "Search", + "parameters": {"type": "object"}, + } + ] adapter = OpenAIRealtimeAdapter(api_key="sk-test", tools=tools) mock_ws = AsyncMock() mock_ws.recv.return_value = json.dumps({"type": "session.created"}) - with patch("getpatter.providers.openai_realtime.websockets.connect", side_effect=_ws_connect_side_effect(mock_ws)): + with patch( + "getpatter.providers.openai_realtime.websockets.connect", + side_effect=_ws_connect_side_effect(mock_ws), + ): await adapter.connect() sent = json.loads(mock_ws.send.call_args[0][0]) @@ -152,11 +165,16 @@ async def test_connect_with_tools(self) -> None: async def test_connect_default_instructions(self) -> None: from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter - adapter = OpenAIRealtimeAdapter(api_key="sk-test", instructions="", language="fr") + adapter = OpenAIRealtimeAdapter( + api_key="sk-test", instructions="", language="fr" + ) mock_ws = AsyncMock() mock_ws.recv.return_value = json.dumps({"type": "session.created"}) - with patch("getpatter.providers.openai_realtime.websockets.connect", side_effect=_ws_connect_side_effect(mock_ws)): + with patch( + "getpatter.providers.openai_realtime.websockets.connect", + side_effect=_ws_connect_side_effect(mock_ws), + ): await adapter.connect() sent = json.loads(mock_ws.send.call_args[0][0]) @@ -170,7 +188,10 @@ async def test_connect_raises_on_unexpected_first_message(self) -> None: mock_ws = AsyncMock() mock_ws.recv.return_value = json.dumps({"type": "error"}) - with patch("getpatter.providers.openai_realtime.websockets.connect", side_effect=_ws_connect_side_effect(mock_ws)): + with patch( + "getpatter.providers.openai_realtime.websockets.connect", + side_effect=_ws_connect_side_effect(mock_ws), + ): with pytest.raises(RuntimeError, match="Expected session.created"): await adapter.connect() @@ -228,6 +249,124 @@ async def test_send_function_result(self) -> None: assert first["item"]["type"] == "function_call_output" assert first["item"]["call_id"] == "call_123" + @pytest.mark.asyncio + async def test_send_first_message_disables_turn_detection_then_restores( + self, + ) -> None: + """``send_first_message`` must arm a one-shot server-VAD lockout + (``session.update`` with ``turn_detection: None``) BEFORE the + ``response.create`` so the caller's "hello" cannot pre-empt the + firstMessage. ``receive_events`` must restore ``turn_detection`` + from the saved snapshot the moment the firstMessage + ``response.done`` arrives. + """ + from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter + + adapter = OpenAIRealtimeAdapter(api_key="sk-test") + adapter._ws = AsyncMock() + + await adapter.send_first_message("Hello! Can you hear me?") + + # Two sends expected pre-response.done: lockout + response.create. + assert adapter._ws.send.call_count == 2 + lockout = json.loads(adapter._ws.send.call_args_list[0][0][0]) + assert lockout["type"] == "session.update" + assert lockout["session"]["turn_detection"] is None + + resp = json.loads(adapter._ws.send.call_args_list[1][0][0]) + assert resp["type"] == "response.create" + assert resp["response"]["modalities"] == ["audio", "text"] + assert "Hello! Can you hear me?" in resp["response"]["instructions"] + + # Protection flag must be armed for receive_events to restore. + assert adapter._first_message_protection_pending is True + assert adapter._saved_turn_detection is not None + + # Simulate the server completing the firstMessage. The adapter + # must re-issue a session.update restoring the original + # turn_detection so subsequent turns barge in normally. + adapter._ws.send.reset_mock() + messages = [json.dumps({"type": "response.done", "response": {"id": "r1"}})] + # Re-bind the ws to the iterable mock; preserve the same send mock + # so the assertion below sees the restore call. + send_mock = adapter._ws.send + iter_ws = _AsyncIterableWS(messages) + iter_ws.send = send_mock + adapter._ws = iter_ws + + events = [] + async for event in adapter.receive_events(): + events.append(event) + + assert len(events) == 1 and events[0][0] == "response_done" + send_mock.assert_called_once() + restore = json.loads(send_mock.call_args_list[0][0][0]) + assert restore["type"] == "session.update" + assert restore["session"]["turn_detection"]["type"] == "server_vad" + assert restore["session"]["turn_detection"]["silence_duration_ms"] == 300 + assert restore["session"]["turn_detection"]["threshold"] == 0.5 + assert restore["session"]["turn_detection"]["prefix_padding_ms"] == 300 + + # The lockout is one-shot — protection state must be cleared. + assert adapter._first_message_protection_pending is False + assert adapter._saved_turn_detection is None + + @pytest.mark.asyncio + async def test_send_first_message_restore_honours_custom_silence_duration( + self, + ) -> None: + """Constructor overrides for VAD type and silence-duration must be + preserved when ``receive_events`` restores ``turn_detection``.""" + from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter + + adapter = OpenAIRealtimeAdapter( + api_key="sk-test", + silence_duration_ms=750, + vad_type="semantic_vad", + ) + adapter._ws = AsyncMock() + await adapter.send_first_message("Hi.") + + send_mock = adapter._ws.send + messages = [json.dumps({"type": "response.done"})] + iter_ws = _AsyncIterableWS(messages) + iter_ws.send = send_mock + adapter._ws = iter_ws + + async for _event in adapter.receive_events(): + pass + + # 2 pre-response.done sends + 1 restore. + assert send_mock.call_count == 3 + restore = json.loads(send_mock.call_args_list[2][0][0]) + assert restore["session"]["turn_detection"]["type"] == "semantic_vad" + assert restore["session"]["turn_detection"]["silence_duration_ms"] == 750 + + @pytest.mark.asyncio + async def test_send_first_message_restore_is_one_shot(self) -> None: + """A second ``response.done`` (e.g. from a later turn) must NOT + re-issue the restore — the firstMessage lockout is one-shot.""" + from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter + + adapter = OpenAIRealtimeAdapter(api_key="sk-test") + adapter._ws = AsyncMock() + await adapter.send_first_message("Hi.") + + send_mock = adapter._ws.send + messages = [ + json.dumps({"type": "response.done"}), + json.dumps({"type": "response.done"}), + ] + iter_ws = _AsyncIterableWS(messages) + iter_ws.send = send_mock + adapter._ws = iter_ws + + async for _event in adapter.receive_events(): + pass + + # 2 pre-response.done sends + exactly 1 restore (not 2). + assert send_mock.call_count == 3 + @pytest.mark.asyncio async def test_receive_events_yields_audio(self) -> None: from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter @@ -248,7 +387,9 @@ async def test_receive_events_yields_transcript_output(self) -> None: from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter adapter = OpenAIRealtimeAdapter(api_key="sk-test") - messages = [json.dumps({"type": "response.audio_transcript.delta", "delta": "Hello"})] + messages = [ + json.dumps({"type": "response.audio_transcript.delta", "delta": "Hello"}) + ] adapter._ws = _AsyncIterableWS(messages) events = [] @@ -261,7 +402,14 @@ async def test_receive_events_yields_transcript_input(self) -> None: from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter adapter = OpenAIRealtimeAdapter(api_key="sk-test") - messages = [json.dumps({"type": "conversation.item.input_audio_transcription.completed", "transcript": "Hi"})] + messages = [ + json.dumps( + { + "type": "conversation.item.input_audio_transcription.completed", + "transcript": "Hi", + } + ) + ] adapter._ws = _AsyncIterableWS(messages) events = [] @@ -291,10 +439,16 @@ async def test_receive_events_yields_function_call(self) -> None: from getpatter.providers.openai_realtime import OpenAIRealtimeAdapter adapter = OpenAIRealtimeAdapter(api_key="sk-test") - messages = [json.dumps({ - "type": "response.function_call_arguments.done", - "call_id": "fc1", "name": "search", "arguments": '{"q":"test"}', - })] + messages = [ + json.dumps( + { + "type": "response.function_call_arguments.done", + "call_id": "fc1", + "name": "search", + "arguments": '{"q":"test"}', + } + ) + ] adapter._ws = _AsyncIterableWS(messages) events = [] @@ -380,7 +534,10 @@ async def test_connect_with_agent_id(self) -> None: adapter = ElevenLabsConvAIAdapter(api_key="el-test", agent_id="agent_xyz") mock_ws = AsyncMock() - with patch("getpatter.providers.elevenlabs_convai.websockets.connect", side_effect=_ws_connect_side_effect(mock_ws)) as mc: + with patch( + "getpatter.providers.elevenlabs_convai.websockets.connect", + side_effect=_ws_connect_side_effect(mock_ws), + ) as mc: await adapter.connect() call_url = mc.call_args[0][0] @@ -390,23 +547,36 @@ async def test_connect_with_agent_id(self) -> None: async def test_connect_with_first_message(self) -> None: from getpatter.providers.elevenlabs_convai import ElevenLabsConvAIAdapter - adapter = ElevenLabsConvAIAdapter(api_key="el-test", agent_id="agent-test", first_message="Hi there!") + adapter = ElevenLabsConvAIAdapter( + api_key="el-test", agent_id="agent-test", first_message="Hi there!" + ) mock_ws = AsyncMock() - with patch("getpatter.providers.elevenlabs_convai.websockets.connect", side_effect=_ws_connect_side_effect(mock_ws)): + with patch( + "getpatter.providers.elevenlabs_convai.websockets.connect", + side_effect=_ws_connect_side_effect(mock_ws), + ): await adapter.connect() sent = json.loads(mock_ws.send.call_args[0][0]) - assert sent["conversation_config_override"]["agent"]["first_message"] == "Hi there!" + assert ( + sent["conversation_config_override"]["agent"]["first_message"] + == "Hi there!" + ) @pytest.mark.asyncio async def test_connect_without_first_message(self) -> None: from getpatter.providers.elevenlabs_convai import ElevenLabsConvAIAdapter - adapter = ElevenLabsConvAIAdapter(api_key="el-test", agent_id="agent-test", first_message="") + adapter = ElevenLabsConvAIAdapter( + api_key="el-test", agent_id="agent-test", first_message="" + ) mock_ws = AsyncMock() - with patch("getpatter.providers.elevenlabs_convai.websockets.connect", side_effect=_ws_connect_side_effect(mock_ws)): + with patch( + "getpatter.providers.elevenlabs_convai.websockets.connect", + side_effect=_ws_connect_side_effect(mock_ws), + ): await adapter.connect() sent = json.loads(mock_ws.send.call_args[0][0]) @@ -470,11 +640,13 @@ async def test_receive_events_yields_transcripts(self) -> None: adapter = ElevenLabsConvAIAdapter(api_key="el-test", agent_id="agent-test") await self._prime_adapter_with_ws( adapter, - _AsyncIterableWS([ - json.dumps({"type": "user_transcript", "text": "Hi"}), - json.dumps({"type": "agent_response", "text": "Hello"}), - json.dumps({"type": "interruption"}), - ]), + _AsyncIterableWS( + [ + json.dumps({"type": "user_transcript", "text": "Hi"}), + json.dumps({"type": "agent_response", "text": "Hello"}), + json.dumps({"type": "interruption"}), + ] + ), ) events = [] @@ -595,9 +767,13 @@ async def test_ping_triggers_pong(self) -> None: adapter = ElevenLabsConvAIAdapter(api_key="el-test", agent_id="agent-test") mock_ws = AsyncMock() # Iterable messages include a ping. - mock_ws.__aiter__ = lambda self: _AsyncIterHelper([ - json.dumps({"type": "ping", "ping_event": {"event_id": "xyz", "ping_ms": 0}}), - ]) + mock_ws.__aiter__ = lambda self: _AsyncIterHelper( + [ + json.dumps( + {"type": "ping", "ping_event": {"event_id": "xyz", "ping_ms": 0}} + ), + ] + ) adapter._ws = mock_ws adapter._events = asyncio.Queue() adapter._reader_task = asyncio.create_task(adapter._read_loop()) @@ -830,7 +1006,10 @@ async def test_connect(self) -> None: stt = DeepgramSTT(api_key="dg-test") mock_ws = AsyncMock() - with patch("getpatter.providers.deepgram_stt.websockets.connect", side_effect=_ws_connect_side_effect(mock_ws)) as mc: + with patch( + "getpatter.providers.deepgram_stt.websockets.connect", + side_effect=_ws_connect_side_effect(mock_ws), + ) as mc: await stt.connect() assert stt._ws is mock_ws @@ -862,12 +1041,18 @@ async def test_receive_transcripts_yields_results(self) -> None: from getpatter.providers.deepgram_stt import DeepgramSTT stt = DeepgramSTT(api_key="dg-test") - messages = [json.dumps({ - "type": "Results", - "is_final": True, - "speech_final": True, - "channel": {"alternatives": [{"transcript": "Hello", "confidence": 0.9}]}, - })] + messages = [ + json.dumps( + { + "type": "Results", + "is_final": True, + "speech_final": True, + "channel": { + "alternatives": [{"transcript": "Hello", "confidence": 0.9}] + }, + } + ) + ] stt._ws = _AsyncIterableWS(messages) transcripts = [] @@ -979,7 +1164,7 @@ def test_resample_24k_to_16k_basic(self) -> None: samples = [100, 200, 300, 400, 500, 600] audio = struct.pack(f"<{len(samples)}h", *samples) result = OpenAITTS._resample_24k_to_16k(audio) - out_samples = struct.unpack(f"<{len(result)//2}h", result) + out_samples = struct.unpack(f"<{len(result) // 2}h", result) assert len(out_samples) == 4 def test_resample_24k_to_16k_empty(self) -> None: @@ -1092,7 +1277,9 @@ async def test_initiate_call(self) -> None: adapter._client = AsyncMock() adapter._client.post.return_value = mock_resp - call_id = await adapter.initiate_call("+15551111111", "+15552222222", "wss://stream.example.com") + call_id = await adapter.initiate_call( + "+15551111111", "+15552222222", "wss://stream.example.com" + ) assert call_id == "v3:new-id" @pytest.mark.asyncio @@ -1386,11 +1573,15 @@ async def test_provision_number(self) -> None: mock_number = MagicMock() mock_number.phone_number = "+15559999999" adapter._twilio_client = MagicMock() - adapter._twilio_client.available_phone_numbers.return_value.local.list.return_value = [mock_number] + adapter._twilio_client.available_phone_numbers.return_value.local.list.return_value = [ + mock_number + ] mock_purchased = MagicMock() mock_purchased.phone_number = "+15559999999" - adapter._twilio_client.incoming_phone_numbers.create.return_value = mock_purchased + adapter._twilio_client.incoming_phone_numbers.create.return_value = ( + mock_purchased + ) number = await adapter.provision_number("US") assert number == "+15559999999" @@ -1416,7 +1607,9 @@ async def test_configure_number(self) -> None: adapter._twilio_client.incoming_phone_numbers.list.return_value = [mock_num] await adapter.configure_number("+15551111111", "https://example.com/webhook") - mock_num.update.assert_called_once_with(voice_url="https://example.com/webhook", voice_method="POST") + mock_num.update.assert_called_once_with( + voice_url="https://example.com/webhook", voice_method="POST" + ) @pytest.mark.asyncio async def test_configure_number_not_found(self) -> None: @@ -1427,7 +1620,9 @@ async def test_configure_number_not_found(self) -> None: adapter._twilio_client.incoming_phone_numbers.list.return_value = [] with pytest.raises(ValueError, match="not found"): - await adapter.configure_number("+15551111111", "https://example.com/webhook") + await adapter.configure_number( + "+15551111111", "https://example.com/webhook" + ) @pytest.mark.asyncio async def test_initiate_call(self) -> None: @@ -1439,7 +1634,9 @@ async def test_initiate_call(self) -> None: adapter._twilio_client = MagicMock() adapter._twilio_client.calls.create.return_value = mock_call - sid = await adapter.initiate_call("+15551111111", "+15552222222", "wss://stream.example.com") + sid = await adapter.initiate_call( + "+15551111111", "+15552222222", "wss://stream.example.com" + ) assert sid == "CA_test_call_sid" @pytest.mark.asyncio @@ -1453,7 +1650,9 @@ async def test_initiate_call_with_extra_params(self) -> None: adapter._twilio_client.calls.create.return_value = mock_call sid = await adapter.initiate_call( - "+15551111111", "+15552222222", "wss://stream.example.com", + "+15551111111", + "+15552222222", + "wss://stream.example.com", extra_params={"machine_detection": "Enable"}, ) assert sid == "CA_test_call_sid" @@ -1468,7 +1667,9 @@ async def test_end_call(self) -> None: adapter._twilio_client = MagicMock() await adapter.end_call("CA_test_call_sid") - adapter._twilio_client.calls.return_value.update.assert_called_once_with(status="completed") + adapter._twilio_client.calls.return_value.update.assert_called_once_with( + status="completed" + ) def test_generate_stream_twiml(self) -> None: from getpatter.providers.twilio_adapter import TwilioAdapter diff --git a/libraries/typescript/src/providers/openai-realtime.ts b/libraries/typescript/src/providers/openai-realtime.ts index c0dfe5a..6a0af0e 100644 --- a/libraries/typescript/src/providers/openai-realtime.ts +++ b/libraries/typescript/src/providers/openai-realtime.ts @@ -140,6 +140,16 @@ export class OpenAIRealtimeAdapter { // wall-clock cap corresponds to the maximum playback that real-time TTS // could have produced, which is what the user actually heard. private currentResponseFirstAudioAt: number | null = null; + // ``sendFirstMessage`` arms a one-shot server-VAD lockout so the + // firstMessage turn cannot be interrupted by the caller's first audio + // frames (which happens reliably on prewarm-adopted sessions where the + // adopt+response.create races the caller's "hello?" by a few hundred ms). + // The flag is consumed inside the message listener when the firstMessage + // ``response.done`` arrives, at which point we re-issue ``session.update`` + // to restore the original ``turn_detection`` block captured in + // ``savedTurnDetection``. See ``sendFirstMessage`` for the full rationale. + private firstMessageProtectionPending = false; + private savedTurnDetection: Record | null = null; private readonly options: OpenAIRealtimeOptions; constructor( @@ -568,6 +578,34 @@ export class OpenAIRealtimeAdapter { this.currentResponseItemId = null; this.currentResponseAudioMs = 0; this.currentResponseFirstAudioAt = null; + // If ``sendFirstMessage`` armed the server-VAD lockout for the + // firstMessage turn, this ``response.done`` signals the firstMessage + // finished streaming and it is safe to restore the original + // ``turn_detection`` so barge-in works for the rest of the call. + // Best-effort: a failed send leaves the session without VAD which + // degrades barge-in but does not break the call — the next + // ``session.update`` (e.g. on a tool turn) would also rearm. See + // ``sendFirstMessage`` for the full rationale. + if ( + this.firstMessageProtectionPending && + this.savedTurnDetection !== null && + this.ws !== null + ) { + try { + this.ws.send( + JSON.stringify({ + type: 'session.update', + session: { turn_detection: this.savedTurnDetection }, + }), + ); + } catch (err) { + getLogger().debug?.( + `first_message: turn_detection restore failed: ${String(err)}`, + ); + } + this.firstMessageProtectionPending = false; + this.savedTurnDetection = null; + } dispatch('response_done', data.response ?? null); } else if (t === 'error') { dispatch('error', data.error); @@ -653,9 +691,65 @@ export class OpenAIRealtimeAdapter { * role-confused openings (e.g. a receptionist agent responding "I'd like * to schedule a haircut" because it took its own first_message as a * customer cue). + * + * Server-VAD lockout during firstMessage + * -------------------------------------- + * + * OpenAI Realtime server-VAD treats any caller audio that arrives before + * the assistant's first audio frame as a barge-in and cancels the + * in-flight ``response.create``. On the prewarm-adopted path + * (``source=adopted ms=0``) the WS→audio bridge opens immediately at call + * pickup; the caller's "Hi" / "Hello?" reliably reaches OpenAI in the + * ~250-450 ms before the firstMessage audio starts streaming back, so the + * configured ``firstMessage`` is *silently cancelled* and the caller + * hears the agent respond to their hello instead of delivering the + * scripted opening. + * + * Fix: send a ``session.update`` that sets ``turn_detection`` to ``null`` + * (OpenAI-documented: disables server-VAD entirely, no audio-driven + * response cancellation), then ``response.create`` the firstMessage. The + * message listener re-arms ``turn_detection`` from the saved snapshot the + * moment ``response.done`` arrives for the firstMessage turn, restoring + * normal barge-in for every subsequent turn. The complementary + * client-side guard (``firstAudioSentAt`` in stream-handler) already + * prevents the caller's outbound clear from firing — this lockout closes + * the gap on the *server* side. + * + * Best-effort: if ``session.update`` cannot be sent we still proceed with + * ``response.create``. The fallback behaviour matches the pre-fix state — + * a higher likelihood of first-message preemption — but never worse, so + * the call still completes. */ async sendFirstMessage(text: string): Promise { - this.ws?.send(JSON.stringify({ + if (!this.ws) return; + // Snapshot the original turn_detection block so the message listener + // can restore it after the firstMessage ``response.done``. We build it + // from the same configured fields ``buildSessionConfig`` uses so the + // restore is byte-identical to the cold connect path. + this.savedTurnDetection = { + type: this.options.vadType ?? OpenAIRealtimeVADType.SERVER_VAD, + threshold: 0.5, + prefix_padding_ms: 300, + silence_duration_ms: this.options.silenceDurationMs ?? 300, + }; + this.firstMessageProtectionPending = true; + try { + this.ws.send( + JSON.stringify({ + type: 'session.update', + session: { turn_detection: null }, + }), + ); + } catch (err) { + getLogger().debug?.( + `sendFirstMessage: turn_detection lockout failed: ${String(err)}`, + ); + // Clear protection state so the message listener doesn't restore a + // turn_detection we never actually disabled. + this.firstMessageProtectionPending = false; + this.savedTurnDetection = null; + } + this.ws.send(JSON.stringify({ type: 'response.create', response: { modalities: ['audio', 'text'], diff --git a/libraries/typescript/tests/unit/openai-realtime.test.ts b/libraries/typescript/tests/unit/openai-realtime.test.ts index 40c3d92..782c7f5 100644 --- a/libraries/typescript/tests/unit/openai-realtime.test.ts +++ b/libraries/typescript/tests/unit/openai-realtime.test.ts @@ -386,6 +386,95 @@ describe('OpenAIRealtimeAdapter (deep)', () => { }); }); + // --- sendFirstMessage (server-VAD lockout) --- + + describe('sendFirstMessage()', () => { + it('disables turn_detection before response.create then restores on response.done', async () => { + const adapter = new OpenAIRealtimeAdapter('sk-test'); + const ws = await connectAdapter(adapter); + ws.send.mockClear(); + + await adapter.sendFirstMessage('Hello! Can you hear me?'); + + // Two sends expected before response.done: + // 1) session.update {turn_detection: null} + // 2) response.create with firstMessage instructions + expect(ws.send).toHaveBeenCalledTimes(2); + + const lockoutMsg = JSON.parse(ws.send.mock.calls[0][0] as string); + expect(lockoutMsg.type).toBe('session.update'); + expect(lockoutMsg.session.turn_detection).toBeNull(); + + const respMsg = JSON.parse(ws.send.mock.calls[1][0] as string); + expect(respMsg.type).toBe('response.create'); + expect(respMsg.response.modalities).toEqual(['audio', 'text']); + expect(respMsg.response.instructions).toContain('Hello! Can you hear me?'); + + // Now simulate the server completing the firstMessage. The adapter + // must re-issue a session.update restoring the original turn_detection + // so subsequent turns barge in normally. + ws.send.mockClear(); + ws.emit('message', Buffer.from(JSON.stringify({ type: 'response.done' }))); + + expect(ws.send).toHaveBeenCalledOnce(); + const restoreMsg = JSON.parse(ws.send.mock.calls[0][0] as string); + expect(restoreMsg.type).toBe('session.update'); + expect(restoreMsg.session.turn_detection).toMatchObject({ + type: 'server_vad', + threshold: 0.5, + prefix_padding_ms: 300, + silence_duration_ms: 300, + }); + }); + + it('restores turn_detection with the configured custom silenceDurationMs', async () => { + const adapter = new OpenAIRealtimeAdapter( + 'sk-test', + undefined, + undefined, + undefined, + undefined, + undefined, + { silenceDurationMs: 750, vadType: 'semantic_vad' }, + ); + const ws = await connectAdapter(adapter); + ws.send.mockClear(); + + await adapter.sendFirstMessage('Hi.'); + ws.send.mockClear(); + + ws.emit('message', Buffer.from(JSON.stringify({ type: 'response.done' }))); + + expect(ws.send).toHaveBeenCalledOnce(); + const restoreMsg = JSON.parse(ws.send.mock.calls[0][0] as string); + expect(restoreMsg.session.turn_detection).toMatchObject({ + type: 'semantic_vad', + silence_duration_ms: 750, + }); + }); + + it('only restores turn_detection once even if response.done arrives twice', async () => { + const adapter = new OpenAIRealtimeAdapter('sk-test'); + const ws = await connectAdapter(adapter); + ws.send.mockClear(); + + await adapter.sendFirstMessage('Hi.'); + ws.send.mockClear(); + + ws.emit('message', Buffer.from(JSON.stringify({ type: 'response.done' }))); + // Spurious second response.done from a subsequent turn must NOT re-send + // the restore — the firstMessage lockout is one-shot. + ws.emit('message', Buffer.from(JSON.stringify({ type: 'response.done' }))); + + expect(ws.send).toHaveBeenCalledOnce(); + }); + + it('does nothing when ws is null', async () => { + const adapter = new OpenAIRealtimeAdapter('sk-test'); + await expect(adapter.sendFirstMessage('Hi.')).resolves.toBeUndefined(); + }); + }); + // --- cancelResponse --- describe('cancelResponse()', () => {