Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions crates/codex-plus-core/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,7 @@ async fn handle_protocol_proxy_connection(
stream.shutdown().await?;
return Ok(());
}
let mut parser = crate::protocol_proxy::SseBlockParser::new();
let mut converter = request_json
.as_ref()
.map(crate::protocol_proxy::ChatSseToResponsesConverter::with_request)
Expand All @@ -1258,13 +1259,42 @@ async fn handle_protocol_proxy_connection(
while let Some(chunk) = bytes_stream.next().await {
match chunk {
Ok(bytes) => {
let converted = converter.push_bytes(&bytes);
if !converted.is_empty() {
stream.write_all(&converted).await?;
for block in parser.push_bytes(&bytes) {
if let Some(data) = block.data.as_deref() {
if data.trim() == "[DONE]" {
let tail = converter.feed_done();
if !tail.is_empty() {
stream.write_all(&tail).await?;
}
continue;
}
if let Ok(value) =
serde_json::from_str::<serde_json::Value>(data)
{
if block.event.as_deref() == Some("error")
|| value.get("error").is_some()
{
let (message, error_type) =
crate::protocol_proxy::extract_chat_sse_error(
&value,
);
let failed = converter.feed_error(message, error_type);
if !failed.is_empty() {
stream.write_all(&failed).await?;
}
stream_failed = true;
break;
}
let converted = converter.feed_chunk(&value);
if !converted.is_empty() {
stream.write_all(&converted).await?;
}
}
}
}
}
Err(error) => {
let failed = converter.fail(
let failed = converter.feed_error(
format!("Stream error: {error}"),
Some("stream_error".to_string()),
);
Expand All @@ -1277,7 +1307,7 @@ async fn handle_protocol_proxy_connection(
}
}
if !stream_failed {
let tail = converter.finish();
let tail = converter.feed_done();
if !tail.is_empty() {
stream.write_all(&tail).await?;
}
Expand Down
181 changes: 173 additions & 8 deletions crates/codex-plus-core/src/protocol_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,51 @@ impl ChatSseToResponsesConverter {
}
}

// ── New API (cc-switch pattern) ──────────────────────────────
//
// Receive already-parsed JSON chunks. The caller handles SSE
// parsing and JSON deserialisation. This decouples the transport
// layer from the protocol layer.

/// Process a parsed Chat Completions SSE chunk, returning zero or
/// more bytes of Responses SSE output.
pub fn feed_chunk(&mut self, chunk: &Value) -> Vec<u8> {
let mut output = String::new();

// Fast path: simple content delta (90%+ of chunks)
if let Some(content) = extract_simple_content_delta(chunk) {
if self.state.is_text_started() {
self.state.push_content_delta_direct(&content, &mut output);
self.state.update_metadata_fields(chunk);
if !output.is_empty() {
return output.into_bytes();
}
}
}

self.state.handle_chat_chunk_into(chunk, &mut output);
output.into_bytes()
}

/// Signal end-of-stream. Emits final done/completed events.
pub fn feed_done(&mut self) -> Vec<u8> {
let mut output = String::new();
if !self.failed {
self.state.finalize_into(&mut output);
}
output.into_bytes()
}

/// Signal an upstream error. Emits error SSE and marks failed.
pub fn feed_error(&mut self, message: String, error_type: Option<String>) -> Vec<u8> {
self.fail(message, error_type)
}

// ── Legacy API (backward-compatible) ─────────────────────────

pub fn push_bytes(&mut self, bytes: &[u8]) -> Vec<u8> {
append_utf8_safe(&mut self.buffer, &mut self.utf8_remainder, bytes);
let mut output = String::new();
let mut output = Vec::new();
while let Some(block) = take_sse_block(&mut self.buffer) {
if block.trim().is_empty() {
continue;
Expand All @@ -384,7 +426,7 @@ impl ChatSseToResponsesConverter {
break;
}
}
output.into_bytes()
output
}

pub fn finish(&mut self) -> Vec<u8> {
Expand All @@ -408,7 +450,7 @@ impl ChatSseToResponsesConverter {
output.into_bytes()
}

fn handle_block(&mut self, block: &str, output: &mut String) {
fn handle_block(&mut self, block: &str, output: &mut Vec<u8>) {
let mut event_name: Option<String> = None;
let mut data_parts = Vec::new();
for line in block.lines() {
Expand All @@ -425,7 +467,7 @@ impl ChatSseToResponsesConverter {
}
let data = data_parts.join("\n");
if data.trim() == "[DONE]" {
self.state.finalize_into(output);
output.extend_from_slice(&self.feed_done());
return;
}

Expand All @@ -434,11 +476,96 @@ impl ChatSseToResponsesConverter {
};
if event_name.as_deref() == Some("error") || chunk.get("error").is_some() {
let (message, error_type) = extract_chat_sse_error(&chunk);
self.state.failed_into(output, message, error_type);
self.failed = true;
output.extend_from_slice(&self.feed_error(message, error_type));
return;
}
self.state.handle_chat_chunk_into(&chunk, output);
output.extend_from_slice(&self.feed_chunk(&chunk));
}
}

/// Fast-path: if `chunk` is a simple content delta (choices[0].delta
/// contains *only* a `content` field), returns the content string.
fn extract_simple_content_delta(chunk: &Value) -> Option<String> {
let choices = chunk.get("choices")?.as_array()?;
let delta = choices.first()?.get("delta")?;
let obj = delta.as_object()?;
if obj.len() != 1 {
return None;
}
delta.get("content")?.as_str().map(|s| s.to_string())
}

// ── SSE Block Parser ──────────────────────────────────────────────
//
// Extracts structured SSE events from raw byte chunks without any
// protocol conversion. This is the layer that cc-switch's HTTP
// client handles for free — we do it manually because reqwest's
// bytes_stream yields raw bytes.

/// A single parsed SSE event carrying an optional event name and an
/// optional data payload.
pub struct ParsedSseBlock {
pub event: Option<String>,
pub data: Option<String>,
}

/// Incrementally parses raw bytes into structured SSE blocks,
/// handling UTF-8 boundaries that may split a multi-byte codepoint
/// across chunks.
#[derive(Default)]
pub struct SseBlockParser {
buffer: String,
utf8_remainder: Vec<u8>,
}

impl SseBlockParser {
pub fn new() -> Self {
Self::default()
}

/// Push raw bytes into the parser. Returns zero or more complete
/// SSE blocks that were fully received in this chunk.
pub fn push_bytes(&mut self, bytes: &[u8]) -> Vec<ParsedSseBlock> {
append_utf8_safe(&mut self.buffer, &mut self.utf8_remainder, bytes);
let mut blocks = Vec::new();
while let Some(block_text) = take_sse_block(&mut self.buffer) {
if block_text.trim().is_empty() {
continue;
}
let mut event: Option<String> = None;
let mut data_parts = Vec::new();
for line in block_text.lines() {
if let Some(ev) = strip_sse_field(line, "event") {
event = Some(ev.trim().to_string());
}
if let Some(d) = strip_sse_field(line, "data") {
data_parts.push(d.to_string());
}
}
let data = if data_parts.is_empty() {
None
} else {
Some(data_parts.join("\n"))
};
blocks.push(ParsedSseBlock { event, data });
}
blocks
}

/// Drain any incomplete data left in the buffer (e.g. trailing
/// bytes after the last `\n\n`). Returns the remainder as a
/// string if non-empty.
pub fn drain_remainder(&mut self) -> Option<String> {
if !self.utf8_remainder.is_empty() {
self.buffer
.push_str(&String::from_utf8_lossy(&self.utf8_remainder));
self.utf8_remainder.clear();
}
if self.buffer.is_empty() {
None
} else {
Some(std::mem::take(&mut self.buffer))
}
}
}

Expand Down Expand Up @@ -1100,6 +1227,44 @@ impl ChatSseState {
}
}

/// Whether the message text item has been initialised (output_item.added sent).
pub fn is_text_started(&self) -> bool {
self.text.added
}

/// Emit only the `response.output_text.delta` event for a content delta
/// when the text item is already started. Skips item.added and part.added.
pub fn push_content_delta_direct(&mut self, content: &str, output: &mut String) {
self.text.text.push_str(content);
let oi = self.text.output_index.unwrap_or(0);
push_sse(output, "response.output_text.delta", json!({
"type": "response.output_text.delta",
"item_id": self.text.item_id,
"output_index": oi,
"content_index": 0,
"delta": content
}));
}

/// Update metadata fields from a chunk (id, model, created_at, usage).
/// Called by the fast path to keep metadata current without the full pipeline.
pub fn update_metadata_fields(&mut self, chunk: &Value) {
if let Some(id) = chunk.get("id").and_then(Value::as_str) {
self.response_id = response_id_from_chat_id(Some(id));
}
if let Some(model) = chunk.get("model").and_then(Value::as_str) {
if !model.is_empty() {
self.model = model.to_string();
}
}
if let Some(created) = chunk.get("created").and_then(Value::as_u64) {
self.created_at = created;
}
if let Some(usage) = chunk.get("usage").filter(|v| !v.is_null()) {
self.latest_usage = Some(chat_usage_to_responses_usage(Some(usage)));
}
}

fn push_content_delta_into(&mut self, delta: &str, output: &mut String) {
match self.inline_think.mode {
InlineThinkMode::Text => {
Expand Down Expand Up @@ -1681,7 +1846,7 @@ fn leading_think_prefix_decision(buffer: &str) -> ThinkPrefixDecision {
ThinkPrefixDecision::Text
}

fn extract_chat_sse_error(value: &Value) -> (String, Option<String>) {
pub fn extract_chat_sse_error(value: &Value) -> (String, Option<String>) {
let error = value.get("error").unwrap_or(value);
let message = error
.as_str()
Expand Down
Loading
Loading