diff --git a/crates/codex-plus-core/src/launcher.rs b/crates/codex-plus-core/src/launcher.rs index 75a851c2..e5311936 100644 --- a/crates/codex-plus-core/src/launcher.rs +++ b/crates/codex-plus-core/src/launcher.rs @@ -1249,6 +1249,7 @@ async fn handle_protocol_proxy_connection( stream.shutdown().await?; return Ok(()); } + let mut parser = crate::protocol_proxy::SseBlockParser::new(); let mut converter = request_json .as_ref() .map(crate::protocol_proxy::ChatSseToResponsesConverter::with_request) @@ -1258,13 +1259,42 @@ async fn handle_protocol_proxy_connection( while let Some(chunk) = bytes_stream.next().await { match chunk { Ok(bytes) => { - let converted = converter.push_bytes(&bytes); - if !converted.is_empty() { - stream.write_all(&converted).await?; + for block in parser.push_bytes(&bytes) { + if let Some(data) = block.data.as_deref() { + if data.trim() == "[DONE]" { + let tail = converter.feed_done(); + if !tail.is_empty() { + stream.write_all(&tail).await?; + } + continue; + } + if let Ok(value) = + serde_json::from_str::(data) + { + if block.event.as_deref() == Some("error") + || value.get("error").is_some() + { + let (message, error_type) = + crate::protocol_proxy::extract_chat_sse_error( + &value, + ); + let failed = converter.feed_error(message, error_type); + if !failed.is_empty() { + stream.write_all(&failed).await?; + } + stream_failed = true; + break; + } + let converted = converter.feed_chunk(&value); + if !converted.is_empty() { + stream.write_all(&converted).await?; + } + } + } } } Err(error) => { - let failed = converter.fail( + let failed = converter.feed_error( format!("Stream error: {error}"), Some("stream_error".to_string()), ); @@ -1277,7 +1307,7 @@ async fn handle_protocol_proxy_connection( } } if !stream_failed { - let tail = converter.finish(); + let tail = converter.feed_done(); if !tail.is_empty() { stream.write_all(&tail).await?; } diff --git a/crates/codex-plus-core/src/protocol_proxy.rs b/crates/codex-plus-core/src/protocol_proxy.rs index 94178394..41cb1f2b 100644 --- a/crates/codex-plus-core/src/protocol_proxy.rs +++ b/crates/codex-plus-core/src/protocol_proxy.rs @@ -372,9 +372,51 @@ impl ChatSseToResponsesConverter { } } + // ── New API (cc-switch pattern) ────────────────────────────── + // + // Receive already-parsed JSON chunks. The caller handles SSE + // parsing and JSON deserialisation. This decouples the transport + // layer from the protocol layer. + + /// Process a parsed Chat Completions SSE chunk, returning zero or + /// more bytes of Responses SSE output. + pub fn feed_chunk(&mut self, chunk: &Value) -> Vec { + let mut output = String::new(); + + // Fast path: simple content delta (90%+ of chunks) + if let Some(content) = extract_simple_content_delta(chunk) { + if self.state.is_text_started() { + self.state.push_content_delta_direct(&content, &mut output); + self.state.update_metadata_fields(chunk); + if !output.is_empty() { + return output.into_bytes(); + } + } + } + + self.state.handle_chat_chunk_into(chunk, &mut output); + output.into_bytes() + } + + /// Signal end-of-stream. Emits final done/completed events. + pub fn feed_done(&mut self) -> Vec { + let mut output = String::new(); + if !self.failed { + self.state.finalize_into(&mut output); + } + output.into_bytes() + } + + /// Signal an upstream error. Emits error SSE and marks failed. + pub fn feed_error(&mut self, message: String, error_type: Option) -> Vec { + self.fail(message, error_type) + } + + // ── Legacy API (backward-compatible) ───────────────────────── + pub fn push_bytes(&mut self, bytes: &[u8]) -> Vec { append_utf8_safe(&mut self.buffer, &mut self.utf8_remainder, bytes); - let mut output = String::new(); + let mut output = Vec::new(); while let Some(block) = take_sse_block(&mut self.buffer) { if block.trim().is_empty() { continue; @@ -384,7 +426,7 @@ impl ChatSseToResponsesConverter { break; } } - output.into_bytes() + output } pub fn finish(&mut self) -> Vec { @@ -408,7 +450,7 @@ impl ChatSseToResponsesConverter { output.into_bytes() } - fn handle_block(&mut self, block: &str, output: &mut String) { + fn handle_block(&mut self, block: &str, output: &mut Vec) { let mut event_name: Option = None; let mut data_parts = Vec::new(); for line in block.lines() { @@ -425,7 +467,7 @@ impl ChatSseToResponsesConverter { } let data = data_parts.join("\n"); if data.trim() == "[DONE]" { - self.state.finalize_into(output); + output.extend_from_slice(&self.feed_done()); return; } @@ -434,11 +476,96 @@ impl ChatSseToResponsesConverter { }; if event_name.as_deref() == Some("error") || chunk.get("error").is_some() { let (message, error_type) = extract_chat_sse_error(&chunk); - self.state.failed_into(output, message, error_type); - self.failed = true; + output.extend_from_slice(&self.feed_error(message, error_type)); return; } - self.state.handle_chat_chunk_into(&chunk, output); + output.extend_from_slice(&self.feed_chunk(&chunk)); + } +} + +/// Fast-path: if `chunk` is a simple content delta (choices[0].delta +/// contains *only* a `content` field), returns the content string. +fn extract_simple_content_delta(chunk: &Value) -> Option { + let choices = chunk.get("choices")?.as_array()?; + let delta = choices.first()?.get("delta")?; + let obj = delta.as_object()?; + if obj.len() != 1 { + return None; + } + delta.get("content")?.as_str().map(|s| s.to_string()) +} + +// ── SSE Block Parser ────────────────────────────────────────────── +// +// Extracts structured SSE events from raw byte chunks without any +// protocol conversion. This is the layer that cc-switch's HTTP +// client handles for free — we do it manually because reqwest's +// bytes_stream yields raw bytes. + +/// A single parsed SSE event carrying an optional event name and an +/// optional data payload. +pub struct ParsedSseBlock { + pub event: Option, + pub data: Option, +} + +/// Incrementally parses raw bytes into structured SSE blocks, +/// handling UTF-8 boundaries that may split a multi-byte codepoint +/// across chunks. +#[derive(Default)] +pub struct SseBlockParser { + buffer: String, + utf8_remainder: Vec, +} + +impl SseBlockParser { + pub fn new() -> Self { + Self::default() + } + + /// Push raw bytes into the parser. Returns zero or more complete + /// SSE blocks that were fully received in this chunk. + pub fn push_bytes(&mut self, bytes: &[u8]) -> Vec { + append_utf8_safe(&mut self.buffer, &mut self.utf8_remainder, bytes); + let mut blocks = Vec::new(); + while let Some(block_text) = take_sse_block(&mut self.buffer) { + if block_text.trim().is_empty() { + continue; + } + let mut event: Option = None; + let mut data_parts = Vec::new(); + for line in block_text.lines() { + if let Some(ev) = strip_sse_field(line, "event") { + event = Some(ev.trim().to_string()); + } + if let Some(d) = strip_sse_field(line, "data") { + data_parts.push(d.to_string()); + } + } + let data = if data_parts.is_empty() { + None + } else { + Some(data_parts.join("\n")) + }; + blocks.push(ParsedSseBlock { event, data }); + } + blocks + } + + /// Drain any incomplete data left in the buffer (e.g. trailing + /// bytes after the last `\n\n`). Returns the remainder as a + /// string if non-empty. + pub fn drain_remainder(&mut self) -> Option { + if !self.utf8_remainder.is_empty() { + self.buffer + .push_str(&String::from_utf8_lossy(&self.utf8_remainder)); + self.utf8_remainder.clear(); + } + if self.buffer.is_empty() { + None + } else { + Some(std::mem::take(&mut self.buffer)) + } } } @@ -1100,6 +1227,44 @@ impl ChatSseState { } } + /// Whether the message text item has been initialised (output_item.added sent). + pub fn is_text_started(&self) -> bool { + self.text.added + } + + /// Emit only the `response.output_text.delta` event for a content delta + /// when the text item is already started. Skips item.added and part.added. + pub fn push_content_delta_direct(&mut self, content: &str, output: &mut String) { + self.text.text.push_str(content); + let oi = self.text.output_index.unwrap_or(0); + push_sse(output, "response.output_text.delta", json!({ + "type": "response.output_text.delta", + "item_id": self.text.item_id, + "output_index": oi, + "content_index": 0, + "delta": content + })); + } + + /// Update metadata fields from a chunk (id, model, created_at, usage). + /// Called by the fast path to keep metadata current without the full pipeline. + pub fn update_metadata_fields(&mut self, chunk: &Value) { + if let Some(id) = chunk.get("id").and_then(Value::as_str) { + self.response_id = response_id_from_chat_id(Some(id)); + } + if let Some(model) = chunk.get("model").and_then(Value::as_str) { + if !model.is_empty() { + self.model = model.to_string(); + } + } + if let Some(created) = chunk.get("created").and_then(Value::as_u64) { + self.created_at = created; + } + if let Some(usage) = chunk.get("usage").filter(|v| !v.is_null()) { + self.latest_usage = Some(chat_usage_to_responses_usage(Some(usage))); + } + } + fn push_content_delta_into(&mut self, delta: &str, output: &mut String) { match self.inline_think.mode { InlineThinkMode::Text => { @@ -1681,7 +1846,7 @@ fn leading_think_prefix_decision(buffer: &str) -> ThinkPrefixDecision { ThinkPrefixDecision::Text } -fn extract_chat_sse_error(value: &Value) -> (String, Option) { +pub fn extract_chat_sse_error(value: &Value) -> (String, Option) { let error = value.get("error").unwrap_or(value); let message = error .as_str() diff --git a/crates/codex-plus-core/tests/protocol_proxy.rs b/crates/codex-plus-core/tests/protocol_proxy.rs index 795796ca..492aeb79 100644 --- a/crates/codex-plus-core/tests/protocol_proxy.rs +++ b/crates/codex-plus-core/tests/protocol_proxy.rs @@ -1,5 +1,5 @@ use codex_plus_core::protocol_proxy::{ - ChatSseToResponsesConverter, chat_completion_to_response, + ChatSseToResponsesConverter, ParsedSseBlock, SseBlockParser, chat_completion_to_response, chat_completion_to_response_with_request, chat_completions_url, chat_sse_to_responses_sse, chat_sse_to_responses_sse_with_request, is_chat_completions_proxy_path, is_models_proxy_path, is_responses_proxy_path, models_url, open_chat_completions_proxy_request, @@ -1677,3 +1677,126 @@ fn spawn_chat_server() -> ChatServer { }); ChatServer { base_url, handle } } + + +// ── SseBlockParser unit tests ───────────────────────────────────── + +#[test] +fn test_sse_parser_single_block() { + let mut parser = SseBlockParser::new(); + let blocks = parser.push_bytes(b"data: hello\n\n"); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].data.as_deref(), Some("hello")); + assert!(blocks[0].event.is_none()); +} + +#[test] +fn test_sse_parser_multi_line_data() { + let mut parser = SseBlockParser::new(); + let blocks = parser.push_bytes(b"data: line1\ndata: line2\n\n"); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].data.as_deref(), Some("line1\nline2")); + assert!(blocks[0].event.is_none()); +} + +#[test] +fn test_sse_parser_done_signal() { + let mut parser = SseBlockParser::new(); + let blocks = parser.push_bytes(b"data: [DONE]\n\n"); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].data.as_deref(), Some("[DONE]")); + assert!(blocks[0].event.is_none()); +} + +#[test] +fn test_sse_parser_empty_blocks_skipped() { + let mut parser = SseBlockParser::new(); + // Leading empty block (just \n\n) should be skipped; only "data: hi" remains. + let blocks = parser.push_bytes(b"\n\ndata: hi\n\n"); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].data.as_deref(), Some("hi")); +} + +#[test] +fn test_sse_parser_event_field() { + let mut parser = SseBlockParser::new(); + let blocks = parser.push_bytes(b"event: error\ndata: msg\n\n"); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].event.as_deref(), Some("error")); + assert_eq!(blocks[0].data.as_deref(), Some("msg")); +} + +// ── feed_chunk unit tests ───────────────────────────────────────── + +#[test] +fn test_feed_content_delta_fast_path() { + let mut converter = ChatSseToResponsesConverter::default(); + + // First chunk: initial content delta — goes through the full pipeline, + // emits response.created + response.in_progress + response.output_item.added + // + response.content_part.added + response.output_text.delta. + let chunk1 = serde_json::json!({"choices":[{"delta":{"content":"hello"}}]}); + let output1 = converter.feed_chunk(&chunk1); + let out1 = String::from_utf8(output1).unwrap(); + assert!(out1.contains("event: response.created")); + assert!(out1.contains("event: response.in_progress")); + assert!(out1.contains("event: response.output_item.added")); + assert!(out1.contains("event: response.content_part.added")); + assert!(out1.contains(r#""delta":"hello""#)); + + // Second chunk: simple content delta — takes the fast path, emits only + // response.output_text.delta, no .added events. + let chunk2 = serde_json::json!({"choices":[{"delta":{"content":" world"}}]}); + let output2 = converter.feed_chunk(&chunk2); + let out2 = String::from_utf8(output2).unwrap(); + assert!(out2.contains("event: response.output_text.delta")); + assert!(out2.contains(r#""delta":" world""#)); + assert!(!out2.contains("response.output_item.added")); +} + +// ── Equivalence test ────────────────────────────────────────────── + +#[test] +fn test_push_bytes_equivalent_to_feed() { + // A complete streaming conversation: content + tool call + done. + let sse_input = concat!( + r#"data: {"id":"chatcmpl_eq","created":123,"model":"gpt-5.4","choices":[{"delta":{"content":"Hello"}}]}"#, "\n", + "\n", + r#"data: {"id":"chatcmpl_eq","created":123,"model":"gpt-5.4","choices":[{"delta":{"content":" world"}}]}"#, "\n", + "\n", + r#"data: {"id":"chatcmpl_eq","created":123,"model":"gpt-5.4","choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_1","type":"function","function":{"name":"get_weather"}}]}}]}"#, "\n", + "\n", + r#"data: {"id":"chatcmpl_eq","created":123,"model":"gpt-5.4","choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"city\":\"Tokyo\"}"}}]},"finish_reason":"tool_calls"}]}"#, "\n", + "\n", + "data: [DONE]", "\n", + "\n", + ); + + // Old API: chat_sse_to_responses_sse (push_bytes internally) + let output_old = chat_sse_to_responses_sse(sse_input); + + // New API: SseBlockParser + feed_chunk / feed_done + let mut parser = SseBlockParser::new(); + let mut converter = ChatSseToResponsesConverter::default(); + let blocks = parser.push_bytes(sse_input.as_bytes()); + + let mut new_output = String::new(); + for block in &blocks { + if let Some(data) = &block.data { + if data.trim() == "[DONE]" { + new_output.push_str(&String::from_utf8(converter.feed_done()).unwrap()); + continue; + } + // Event-based errors are not expected in this test input. + if let Ok(chunk) = serde_json::from_str::(data) { + new_output.push_str(&String::from_utf8(converter.feed_chunk(&chunk)).unwrap()); + } + } + } + new_output.push_str(&String::from_utf8(converter.finish()).unwrap()); + + assert_eq!( + output_old, new_output, + "push_bytes-based and feed_chunk-based outputs must be identical" + ); +}