From 4776517470b96e92ad53c38c96549bbfe4eaa6c0 Mon Sep 17 00:00:00 2001 From: key4ng Date: Thu, 21 May 2026 11:53:56 -0700 Subject: [PATCH 1/3] fix(realtime): drop OpenAI-Beta header rejected by GA Realtime API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OpenAI's GA Realtime API now rejects `OpenAI-Beta: realtime=v1` with `beta_api_shape_disabled` ("The Realtime Beta API is no longer supported. Please use /v1/realtime for the GA API."). The upstream accepts the WebSocket upgrade (HTTP 101), then immediately sends an error frame and closes — which the test sees as a 1005 close / TCP RST. This is the actual cause of the openai-realtime E2E failures that #1504 only partially addressed: bumping the model to the GA alias was necessary but not sufficient, because the hardcoded beta header on the upstream connection still triggers GA-shape rejection. Remove the beta header from the upstream request in proxy.rs and from the test's ws_headers fixture (the test header was never proxied upstream anyway, but is no longer meaningful). Signed-off-by: key4ng --- e2e_test/realtime/test_realtime_ws.py | 1 - model_gateway/src/routers/openai/realtime/proxy.rs | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/e2e_test/realtime/test_realtime_ws.py b/e2e_test/realtime/test_realtime_ws.py index 763f19daa..44885f4b4 100644 --- a/e2e_test/realtime/test_realtime_ws.py +++ b/e2e_test/realtime/test_realtime_ws.py @@ -145,7 +145,6 @@ def ws_headers(): """Build the WebSocket connection headers.""" return { "Authorization": f"Bearer {OPENAI_API_KEY}", - "OpenAI-Beta": "realtime=v1", } diff --git a/model_gateway/src/routers/openai/realtime/proxy.rs b/model_gateway/src/routers/openai/realtime/proxy.rs index b7b74bc8a..107b2cd2b 100644 --- a/model_gateway/src/routers/openai/realtime/proxy.rs +++ b/model_gateway/src/routers/openai/realtime/proxy.rs @@ -44,14 +44,15 @@ pub async fn run_ws_proxy( // Connect to upstream WebSocket with auth. // Let tungstenite auto-add WebSocket handshake headers (Connection, Upgrade, // Sec-WebSocket-Version, Sec-WebSocket-Key); we only add app-specific headers. + // + // Do not send `OpenAI-Beta: realtime=v1` — OpenAI's GA Realtime API rejects + // it with `beta_api_shape_disabled` ("The Realtime Beta API is no longer + // supported. Please use /v1/realtime for the GA API."). use tokio_tungstenite::tungstenite::client::IntoClientRequest; let mut request = upstream_url.into_client_request()?; request .headers_mut() .insert("Authorization", auth_header.parse()?); - request - .headers_mut() - .insert("OpenAI-Beta", "realtime=v1".parse()?); // Build an explicit rustls TLS connector so we don't depend on the // process-level CryptoProvider being installed. From d1a7b9f44b7296dea8606f6c480383e2420d4ea3 Mon Sep 17 00:00:00 2001 From: key4ng Date: Thu, 21 May 2026 12:23:52 -0700 Subject: [PATCH 2/3] test(realtime): migrate E2E events from beta to GA shape After dropping the `OpenAI-Beta: realtime=v1` header (so the upstream WS now negotiates the GA Realtime API), the post-connect tests started timing out / asserting wrong fields because they were still using beta event shapes that GA renamed or relocated. Confirmed from the GA `session.created` payload OpenAI now returns (captured in the failing CI log): { "type": "realtime", "object": "realtime.session", "model": "gpt-realtime", "output_modalities": ["audio"], "audio": { "input": {"turn_detection": {...}, ...}, "output": {"voice": "alloy", ...} }, ... } Migrations applied: - `session.update` payload: send `{"type": "realtime", "output_modalities": ["text"]}` instead of `{"modalities": ["text"]}` - `response.create` params: `output_modalities` instead of `modalities` - Streaming delta event: `response.output_text.delta` instead of the beta `response.text.delta` - `response.done` output content type: `output_text` instead of `text` - `session.created` schema check: `output_modalities` at the top, plus `audio.input.turn_detection` and `audio.output.voice` (formerly `session.turn_detection` and `session.voice`) The 3 tests that did not use the realtime shape (basic connect, invalid-event, missing-model, missing-auth) were already passing and are untouched. The `OPENAI-Beta` header removal in proxy.rs is what unblocked the connection; this commit makes the rest of the suite match the GA wire format. Signed-off-by: key4ng --- e2e_test/realtime/test_realtime_ws.py | 69 ++++++++++++++++++--------- 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/e2e_test/realtime/test_realtime_ws.py b/e2e_test/realtime/test_realtime_ws.py index 44885f4b4..667d4a0a2 100644 --- a/e2e_test/realtime/test_realtime_ws.py +++ b/e2e_test/realtime/test_realtime_ws.py @@ -4,7 +4,7 @@ - Session lifecycle (connect, session.created, session.update) - Text generation (single-turn and multi-turn conversations) - Response cancellation mid-stream -- Response format validation (session.created, response.done, response.text.delta) +- Response format validation (session.created, response.done, response.output_text.delta) - Error handling (invalid events, missing model, missing auth) Prerequisites: @@ -97,7 +97,7 @@ async def _collect_response_text(ws, *, timeout: float = RECV_TIMEOUT) -> str: if event is None: continue etype = event.get("type", "") - if etype == "response.text.delta" and event.get("delta"): + if etype == "response.output_text.delta" and event.get("delta"): parts.append(event["delta"]) elif etype == "response.done": break @@ -111,7 +111,12 @@ async def _realtime_session(ws_url: str, ws_headers: dict): """Connect, wait for session.created, configure text modality, yield ws.""" async with websockets.connect(ws_url, additional_headers=ws_headers) as ws: await _recv_event(ws, event_type="session.created") - await ws.send(_make_event("session.update", session={"modalities": ["text"]})) + await ws.send( + _make_event( + "session.update", + session={"type": "realtime", "output_modalities": ["text"]}, + ) + ) await _recv_event(ws, event_type="session.updated") yield ws @@ -178,11 +183,16 @@ def test_session_update(self, ws_url, ws_headers): async def _run(): async with websockets.connect(ws_url, additional_headers=ws_headers) as ws: await _recv_event(ws, event_type="session.created") - await ws.send(_make_event("session.update", session={"modalities": ["text"]})) + await ws.send( + _make_event( + "session.update", + session={"type": "realtime", "output_modalities": ["text"]}, + ) + ) event = await _recv_event(ws, event_type="session.updated") assert event["type"] == "session.updated" assert "session" in event - assert event["session"].get("modalities") == ["text"] + assert event["session"].get("output_modalities") == ["text"] logger.info("Session updated successfully") asyncio.run(_run()) @@ -196,7 +206,7 @@ async def _run(): await ws.send( _make_event( "response.create", - response={"modalities": ["text"]}, + response={"output_modalities": ["text"]}, ) ) @@ -213,14 +223,18 @@ async def _run(): async with _realtime_session(ws_url, ws_headers) as ws: # Turn 1 await ws.send(_make_user_message("My name is Alice.")) - await ws.send(_make_event("response.create", response={"modalities": ["text"]})) + await ws.send( + _make_event("response.create", response={"output_modalities": ["text"]}) + ) text1 = await _collect_response_text(ws) assert len(text1) > 0 logger.info("Turn 1: %s", text1[:100]) # Turn 2 — model should remember the name await ws.send(_make_user_message("What is my name?")) - await ws.send(_make_event("response.create", response={"modalities": ["text"]})) + await ws.send( + _make_event("response.create", response={"output_modalities": ["text"]}) + ) text2 = await _collect_response_text(ws) assert "alice" in text2.lower(), f"Expected 'Alice' in response, got: {text2}" logger.info("Turn 2: %s", text2[:100]) @@ -248,10 +262,12 @@ async def _run(): await ws.send( _make_user_message("Write a very long essay about the history of computing.") ) - await ws.send(_make_event("response.create", response={"modalities": ["text"]})) + await ws.send( + _make_event("response.create", response={"output_modalities": ["text"]}) + ) # Wait for first delta to confirm streaming started - await _recv_event(ws, event_type="response.text.delta") + await _recv_event(ws, event_type="response.output_text.delta") # Cancel mid-stream await ws.send(_make_event("response.cancel")) @@ -272,15 +288,20 @@ async def _run(): # Top-level fields assert "event_id" in event, "Missing event_id" assert event["type"] == "session.created" - # Session object + # Session object (GA shape) session = event["session"] assert isinstance(session, dict) assert isinstance(session.get("id"), str) assert len(session["id"]) > 0 assert isinstance(session.get("model"), str) - assert isinstance(session.get("modalities"), list) - assert isinstance(session.get("voice"), str) - assert isinstance(session.get("turn_detection"), (dict, type(None))) + assert isinstance(session.get("output_modalities"), list) + # In GA, voice/turn_detection moved under audio.{output,input}. + audio = session.get("audio") + assert isinstance(audio, dict), f"Expected session.audio dict, got: {audio!r}" + output = audio.get("output") or {} + assert isinstance(output.get("voice"), str) + input_cfg = audio.get("input") or {} + assert isinstance(input_cfg.get("turn_detection"), (dict, type(None))) logger.info( "session.created schema OK: id=%s model=%s", session["id"], @@ -295,7 +316,9 @@ def test_response_done_format(self, ws_url, ws_headers): async def _run(): async with _realtime_session(ws_url, ws_headers) as ws: await ws.send(_make_user_message("Say hi.")) - await ws.send(_make_event("response.create", response={"modalities": ["text"]})) + await ws.send( + _make_event("response.create", response={"output_modalities": ["text"]}) + ) event = await _recv_event(ws, event_type="response.done") # Top-level @@ -308,14 +331,14 @@ async def _run(): assert resp.get("status") == "completed" assert isinstance(resp.get("output"), list) assert len(resp["output"]) > 0 - # Output item + # Output item — GA shape uses content type "output_text". item = resp["output"][0] assert item.get("type") == "message" assert item.get("role") == "assistant" assert isinstance(item.get("content"), list) assert len(item["content"]) > 0 content = item["content"][0] - assert content.get("type") == "text" + assert content.get("type") == "output_text" assert isinstance(content.get("text"), str) assert len(content["text"]) > 0 # Usage @@ -332,12 +355,14 @@ async def _run(): asyncio.run(_run()) def test_response_text_delta_format(self, ws_url, ws_headers): - """Validate response.text.delta events have the expected schema.""" + """Validate response.output_text.delta events have the expected schema.""" async def _run(): async with _realtime_session(ws_url, ws_headers) as ws: await ws.send(_make_user_message("Say hello.")) - await ws.send(_make_event("response.create", response={"modalities": ["text"]})) + await ws.send( + _make_event("response.create", response={"output_modalities": ["text"]}) + ) # Collect a few deltas and validate schema delta_count = 0 @@ -350,7 +375,7 @@ async def _run(): event = _parse_event(raw) if event is None: continue - if event.get("type") == "response.text.delta": + if event.get("type") == "response.output_text.delta": assert "event_id" in event assert isinstance(event.get("delta"), str) assert len(event["delta"]) > 0 @@ -362,8 +387,8 @@ async def _run(): elif event.get("type") == "response.done": break - assert delta_count > 0, "Expected at least one response.text.delta" - logger.info("response.text.delta schema OK: %d deltas received", delta_count) + assert delta_count > 0, "Expected at least one response.output_text.delta" + logger.info("response.output_text.delta schema OK: %d deltas received", delta_count) asyncio.run(_run()) From cab68e74156bffc1d7201755cc167a0f4e634d38 Mon Sep 17 00:00:00 2001 From: key4ng Date: Thu, 21 May 2026 12:47:22 -0700 Subject: [PATCH 3/3] test(realtime): rename test to conversation.item.added (GA event name) GA renamed `conversation.item.created` to `conversation.item.added` (emitted when an item is added to the default conversation; the old name is now a legacy event the server no longer sends for plain `conversation.item.create` requests). The previous run timed out 30s waiting for the old event name. Switch to the new name to match the GA wire format. Signed-off-by: key4ng --- e2e_test/realtime/test_realtime_ws.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/e2e_test/realtime/test_realtime_ws.py b/e2e_test/realtime/test_realtime_ws.py index 667d4a0a2..781ece7fd 100644 --- a/e2e_test/realtime/test_realtime_ws.py +++ b/e2e_test/realtime/test_realtime_ws.py @@ -241,16 +241,21 @@ async def _run(): asyncio.run(_run()) - def test_conversation_item_created_event(self, ws_url, ws_headers): - """Sending conversation.item.create should echo conversation.item.created.""" + def test_conversation_item_added_event(self, ws_url, ws_headers): + """Sending conversation.item.create should echo conversation.item.added. + + GA renamed the legacy `conversation.item.created` event to + `conversation.item.added` (emitted when an item is added to the default + conversation). + """ async def _run(): async with _realtime_session(ws_url, ws_headers) as ws: await ws.send(_make_user_message("Hi")) - event = await _recv_event(ws, event_type="conversation.item.created") - assert event["type"] == "conversation.item.created" + event = await _recv_event(ws, event_type="conversation.item.added") + assert event["type"] == "conversation.item.added" assert event["item"]["role"] == "user" - logger.info("conversation.item.created received: id=%s", event["item"].get("id")) + logger.info("conversation.item.added received: id=%s", event["item"].get("id")) asyncio.run(_run())