From c60aa466a2b77dd71ce1493f5aaab89085d22729 Mon Sep 17 00:00:00 2001 From: rexdotsh <65942753+rexdotsh@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:37:08 +0000 Subject: [PATCH 1/6] fix: stabilize claude streaming event parsing --- src/providers/proxies/claude-proxy.ts | 149 +++++++++++++++---------- tests/providers/proxy-contract.test.ts | 29 +++++ 2 files changed, 119 insertions(+), 59 deletions(-) diff --git a/src/providers/proxies/claude-proxy.ts b/src/providers/proxies/claude-proxy.ts index 43f1323..576c55e 100644 --- a/src/providers/proxies/claude-proxy.ts +++ b/src/providers/proxies/claude-proxy.ts @@ -179,6 +179,52 @@ const buildUpstreamUrl = (search: string): string => { return upstream.toString(); }; +const runInBackground = (promise: Promise): void => { + promise.catch(() => undefined); +}; + +const parseSseEventData = (chunk: string): string | null => { + const dataLines = chunk + .split("\n") + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trimStart()); + if (!dataLines.length) { + return null; + } + + const data = dataLines.join("\n").trim(); + if (!data || data === "[DONE]") { + return null; + } + + return data; +}; + +const transformSseEventChunk = ( + chunk: string, + toolPrefix: string, + readStreamUsage: (payload: unknown) => void +): string => { + const payload = parseSseEventData(chunk); + if (!payload) { + return chunk; + } + + try { + const jsonBody = JSON.parse(payload) as unknown; + readStreamUsage(jsonBody); + const transformed = transformClaudeResponsePayload(jsonBody, toolPrefix); + + return chunk.replace( + /(^|\n)data:\s*.*(?=\n|$)/gu, + (_match, prefix: string) => + `${prefix}data: ${JSON.stringify(transformed)}` + ); + } catch { + return chunk; + } +}; + const maybeTransformClaudeStreamResponse = ( response: Response, toolPrefix: string, @@ -196,7 +242,7 @@ const maybeTransformClaudeStreamResponse = ( const reader = response.body.getReader(); const encoder = new TextEncoder(); const decoder = new TextDecoder(); - let pendingText = ""; + let buffer = ""; const streamUsage: TokenUsage = { inputTokens: 0, @@ -278,66 +324,51 @@ const maybeTransformClaudeStreamResponse = ( } }; - const transformSseLine = (line: string): string => { - if (!line.startsWith("data:")) { - return line; - } - - const payload = line.slice(5).trimStart(); - if (!payload || payload === "[DONE]") { - return line; - } - - try { - const jsonBody = JSON.parse(payload) as unknown; - readStreamUsage(jsonBody); - const transformed = transformClaudeResponsePayload(jsonBody, toolPrefix); - return `data: ${JSON.stringify(transformed)}`; - } catch { - return line; - } - }; - - const enqueueChunk = ( - controller: ReadableStreamDefaultController, - chunk: string - ): void => { - if (!chunk) { - return; - } - const transformedChunk = chunk - .split("\n") - .map((line) => transformSseLine(line)) - .join("\n"); - - controller.enqueue(encoder.encode(transformedChunk)); - }; - const stream = new ReadableStream({ - async pull(controller): Promise { - const { done, value } = await reader.read(); - if (done) { - pendingText += decoder.decode(); - enqueueChunk(controller, pendingText); - pendingText = ""; - onTokenUsage?.(streamUsage); - controller.close(); - return; - } - - if (!value) { - return; - } - - pendingText += decoder.decode(value, { stream: true }); - const lastLineBreak = pendingText.lastIndexOf("\n"); - if (lastLineBreak === -1) { - return; - } + start(controller): void { + const pump = async (): Promise => { + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + buffer += decoder.decode(); + if (buffer) { + controller.enqueue( + encoder.encode( + transformSseEventChunk(buffer, toolPrefix, readStreamUsage) + ) + ); + buffer = ""; + } + onTokenUsage?.(streamUsage); + controller.close(); + return; + } + + if (!value) { + continue; + } + + buffer += decoder.decode(value, { stream: true }); + + let chunkSeparatorIndex = buffer.indexOf("\n\n"); + while (chunkSeparatorIndex !== -1) { + const chunk = buffer.slice(0, chunkSeparatorIndex + 2); + buffer = buffer.slice(chunkSeparatorIndex + 2); + controller.enqueue( + encoder.encode( + transformSseEventChunk(chunk, toolPrefix, readStreamUsage) + ) + ); + chunkSeparatorIndex = buffer.indexOf("\n\n"); + } + } + } catch (error) { + controller.error(error); + } + }; - const completeChunk = pendingText.slice(0, lastLineBreak + 1); - pendingText = pendingText.slice(lastLineBreak + 1); - enqueueChunk(controller, completeChunk); + runInBackground(pump()); }, cancel(reason): Promise { return reader.cancel(reason); diff --git a/tests/providers/proxy-contract.test.ts b/tests/providers/proxy-contract.test.ts index e005639..cbb9ea0 100644 --- a/tests/providers/proxy-contract.test.ts +++ b/tests/providers/proxy-contract.test.ts @@ -731,6 +731,35 @@ describe("proxy contract: claude", () => { expect(transformedText).toContain('"name":"shell"'); }); + test("rewrites fragmented multiline SSE events at event boundaries", async () => { + const result = prepareClaudeUsageRequest(); + const encoder = new TextEncoder(); + + const sourceResponse = new Response( + new ReadableStream({ + start(controller): void { + controller.enqueue(encoder.encode("event: message\n")); + controller.enqueue(encoder.encode('data: {"type":"tool_use",\n')); + controller.enqueue(encoder.encode('data: "name":"mcp_shell"}\n\n')); + controller.close(); + }, + }), + { + headers: { + "content-type": "text/event-stream", + }, + } + ); + + const transformedResponse = await result.transformResponse(sourceResponse); + const transformedText = await transformedResponse.text(); + + expect(transformedText).toContain("event: message"); + expect(transformedText).toContain( + 'data: {"type":"tool_use","name":"shell"}' + ); + }); + const claudeStreamUsageCases = [ { name: "extracts usage from claude streaming message events", From d4f3730af076f9a3cb1e572e38f65620981e468a Mon Sep 17 00:00:00 2001 From: rexdotsh <65942753+rexdotsh@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:46:11 +0000 Subject: [PATCH 2/6] fix: handle claude SSE event boundaries correctly --- src/providers/proxies/claude-proxy.ts | 64 ++++++++++++++++++++++---- tests/providers/proxy-contract.test.ts | 35 ++++++++++++-- 2 files changed, 87 insertions(+), 12 deletions(-) diff --git a/src/providers/proxies/claude-proxy.ts b/src/providers/proxies/claude-proxy.ts index 576c55e..5324a5d 100644 --- a/src/providers/proxies/claude-proxy.ts +++ b/src/providers/proxies/claude-proxy.ts @@ -183,8 +183,24 @@ const runInBackground = (promise: Promise): void => { promise.catch(() => undefined); }; +const findSseEventBoundary = (buffer: string): number => { + const lfBoundary = buffer.indexOf("\n\n"); + const crlfBoundary = buffer.indexOf("\r\n\r\n"); + + if (lfBoundary === -1) { + return crlfBoundary; + } + + if (crlfBoundary === -1) { + return lfBoundary; + } + + return Math.min(lfBoundary, crlfBoundary); +}; + const parseSseEventData = (chunk: string): string | null => { const dataLines = chunk + .replace(/\r\n/gu, "\n") .split("\n") .filter((line) => line.startsWith("data:")) .map((line) => line.slice(5).trimStart()); @@ -200,6 +216,28 @@ const parseSseEventData = (chunk: string): string | null => { return data; }; +const getSseEventLineEnding = (chunk: string): string => + chunk.includes("\r\n") ? "\r\n" : "\n"; + +const splitSseEventChunk = ( + chunk: string +): { + lines: string[]; + separator: string; + trailer: string; +} => { + const separator = getSseEventLineEnding(chunk); + const trailer = separator + separator; + const hasTrailer = chunk.endsWith(trailer); + const chunkBody = hasTrailer ? chunk.slice(0, -trailer.length) : chunk; + + return { + lines: chunkBody.split(separator), + separator, + trailer: hasTrailer ? trailer : "", + }; +}; + const transformSseEventChunk = ( chunk: string, toolPrefix: string, @@ -214,12 +252,11 @@ const transformSseEventChunk = ( const jsonBody = JSON.parse(payload) as unknown; readStreamUsage(jsonBody); const transformed = transformClaudeResponsePayload(jsonBody, toolPrefix); + const { lines, separator, trailer } = splitSseEventChunk(chunk); + const rebuiltLines = lines.filter((line) => !line.startsWith("data:")); + rebuiltLines.push(`data: ${JSON.stringify(transformed)}`); - return chunk.replace( - /(^|\n)data:\s*.*(?=\n|$)/gu, - (_match, prefix: string) => - `${prefix}data: ${JSON.stringify(transformed)}` - ); + return rebuiltLines.join(separator) + trailer; } catch { return chunk; } @@ -351,16 +388,25 @@ const maybeTransformClaudeStreamResponse = ( buffer += decoder.decode(value, { stream: true }); - let chunkSeparatorIndex = buffer.indexOf("\n\n"); + let chunkSeparatorIndex = findSseEventBoundary(buffer); while (chunkSeparatorIndex !== -1) { - const chunk = buffer.slice(0, chunkSeparatorIndex + 2); - buffer = buffer.slice(chunkSeparatorIndex + 2); + const separatorLength = buffer.startsWith( + "\r\n\r\n", + chunkSeparatorIndex + ) + ? 4 + : 2; + const chunk = buffer.slice( + 0, + chunkSeparatorIndex + separatorLength + ); + buffer = buffer.slice(chunkSeparatorIndex + separatorLength); controller.enqueue( encoder.encode( transformSseEventChunk(chunk, toolPrefix, readStreamUsage) ) ); - chunkSeparatorIndex = buffer.indexOf("\n\n"); + chunkSeparatorIndex = findSseEventBoundary(buffer); } } } catch (error) { diff --git a/tests/providers/proxy-contract.test.ts b/tests/providers/proxy-contract.test.ts index cbb9ea0..cbb92bc 100644 --- a/tests/providers/proxy-contract.test.ts +++ b/tests/providers/proxy-contract.test.ts @@ -754,9 +754,38 @@ describe("proxy contract: claude", () => { const transformedResponse = await result.transformResponse(sourceResponse); const transformedText = await transformedResponse.text(); - expect(transformedText).toContain("event: message"); - expect(transformedText).toContain( - 'data: {"type":"tool_use","name":"shell"}' + expect(transformedText).toBe( + 'event: message\ndata: {"type":"tool_use","name":"shell"}\n\n' + ); + }); + + test("rewrites CRLF-delimited SSE events without buffering until EOF", async () => { + const result = prepareClaudeUsageRequest(); + const encoder = new TextEncoder(); + + const sourceResponse = new Response( + new ReadableStream({ + start(controller): void { + controller.enqueue( + encoder.encode( + 'event: message\r\ndata: {"type":"tool_use","name":"mcp_shell"}\r\n\r\n' + ) + ); + controller.close(); + }, + }), + { + headers: { + "content-type": "text/event-stream", + }, + } + ); + + const transformedResponse = await result.transformResponse(sourceResponse); + const transformedText = await transformedResponse.text(); + + expect(transformedText).toBe( + 'event: message\r\ndata: {"type":"tool_use","name":"shell"}\r\n\r\n' ); }); From 18b2661813364e113e743c6bb11be8fb8a448287 Mon Sep 17 00:00:00 2001 From: rexdotsh <65942753+rexdotsh@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:48:59 +0000 Subject: [PATCH 3/6] fix: harden claude SSE stream rewriting --- src/providers/proxies/claude-proxy.ts | 8 +-- tests/providers/proxy-contract.test.ts | 77 ++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/src/providers/proxies/claude-proxy.ts b/src/providers/proxies/claude-proxy.ts index 5324a5d..37aa255 100644 --- a/src/providers/proxies/claude-proxy.ts +++ b/src/providers/proxies/claude-proxy.ts @@ -179,10 +179,6 @@ const buildUpstreamUrl = (search: string): string => { return upstream.toString(); }; -const runInBackground = (promise: Promise): void => { - promise.catch(() => undefined); -}; - const findSseEventBoundary = (buffer: string): number => { const lfBoundary = buffer.indexOf("\n\n"); const crlfBoundary = buffer.indexOf("\r\n\r\n"); @@ -414,7 +410,9 @@ const maybeTransformClaudeStreamResponse = ( } }; - runInBackground(pump()); + pump().catch((error: unknown) => { + controller.error(error); + }); }, cancel(reason): Promise { return reader.cancel(reason); diff --git a/tests/providers/proxy-contract.test.ts b/tests/providers/proxy-contract.test.ts index cbb92bc..145c234 100644 --- a/tests/providers/proxy-contract.test.ts +++ b/tests/providers/proxy-contract.test.ts @@ -789,6 +789,38 @@ describe("proxy contract: claude", () => { ); }); + test("rewrites multiple SSE events delivered in one chunk", async () => { + const result = prepareClaudeUsageRequest(); + const encoder = new TextEncoder(); + + const sourceResponse = new Response( + new ReadableStream({ + start(controller): void { + controller.enqueue( + encoder.encode( + 'event: message\ndata: {"type":"tool_use","name":"mcp_shell"}\n\n' + + 'event: message\ndata: {"type":"tool_use","name":"mcp_browser"}\n\n' + ) + ); + controller.close(); + }, + }), + { + headers: { + "content-type": "text/event-stream", + }, + } + ); + + const transformedResponse = await result.transformResponse(sourceResponse); + const transformedText = await transformedResponse.text(); + + expect(transformedText).toBe( + 'event: message\ndata: {"type":"tool_use","name":"shell"}\n\n' + + 'event: message\ndata: {"type":"tool_use","name":"browser"}\n\n' + ); + }); + const claudeStreamUsageCases = [ { name: "extracts usage from claude streaming message events", @@ -863,6 +895,51 @@ describe("proxy contract: claude", () => { }); } + test("extracts usage from fragmented streaming events", async () => { + const capture = createUsageCapture(); + const result = prepareClaudeUsageRequest(capture.onTokenUsage); + const encoder = new TextEncoder(); + + const sourceResponse = new Response( + new ReadableStream({ + start(controller): void { + controller.enqueue( + encoder.encode('data: {"type":"message_start","message":{"usage":{') + ); + controller.enqueue( + encoder.encode('"input_tokens":55,"cache_read_input_tokens":11,') + ); + controller.enqueue( + encoder.encode('"cache_creation_input_tokens":5}}}\n\n') + ); + controller.enqueue( + encoder.encode('data: {"type":"message_delta","usage":{') + ); + controller.enqueue( + encoder.encode('"output_tokens":13,"cache_creation_input_tokens":7') + ); + controller.enqueue(encoder.encode("}}\n\n")); + controller.close(); + }, + }), + { + headers: { + "content-type": "text/event-stream", + }, + } + ); + + const transformedResponse = await result.transformResponse(sourceResponse); + await transformedResponse.text(); + + expect(capture.read()).toEqual({ + inputTokens: 55, + outputTokens: 13, + cacheReadTokens: 11, + cacheWriteTokens: 7, + }); + }); + test("does not rewrite non-tool SSE name fields", async () => { const result = prepareClaudeUsageRequest(); From 0489c9633e1ce3d679b8d46b57205630b4933429 Mon Sep 17 00:00:00 2001 From: rexdotsh <65942753+rexdotsh@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:53:47 +0000 Subject: [PATCH 4/6] fix: handle mixed newline SSE boundaries --- src/providers/proxies/claude-proxy.ts | 96 +++++++++++++------------- tests/providers/proxy-contract.test.ts | 36 ++++++++++ 2 files changed, 85 insertions(+), 47 deletions(-) diff --git a/src/providers/proxies/claude-proxy.ts b/src/providers/proxies/claude-proxy.ts index 37aa255..70e7478 100644 --- a/src/providers/proxies/claude-proxy.ts +++ b/src/providers/proxies/claude-proxy.ts @@ -179,19 +179,32 @@ const buildUpstreamUrl = (search: string): string => { return upstream.toString(); }; -const findSseEventBoundary = (buffer: string): number => { - const lfBoundary = buffer.indexOf("\n\n"); - const crlfBoundary = buffer.indexOf("\r\n\r\n"); +const findSseEventBoundary = ( + buffer: string +): { index: number; length: number } | null => { + for (let index = 0; index < buffer.length; index += 1) { + const firstLineEndingLength = buffer.startsWith("\r\n", index) ? 2 : 1; + const isLineEnding = + buffer[index] === "\n" || buffer.startsWith("\r\n", index); + + if (!isLineEnding) { + continue; + } - if (lfBoundary === -1) { - return crlfBoundary; - } + const nextIndex = index + firstLineEndingLength; + const secondLineEndingLength = buffer.startsWith("\r\n", nextIndex) ? 2 : 1; + const hasBlankLine = + buffer[nextIndex] === "\n" || buffer.startsWith("\r\n", nextIndex); - if (crlfBoundary === -1) { - return lfBoundary; + if (hasBlankLine) { + return { + index, + length: firstLineEndingLength + secondLineEndingLength, + }; + } } - return Math.min(lfBoundary, crlfBoundary); + return null; }; const parseSseEventData = (chunk: string): string | null => { @@ -212,26 +225,28 @@ const parseSseEventData = (chunk: string): string | null => { return data; }; -const getSseEventLineEnding = (chunk: string): string => - chunk.includes("\r\n") ? "\r\n" : "\n"; - -const splitSseEventChunk = ( - chunk: string -): { - lines: string[]; - separator: string; - trailer: string; -} => { - const separator = getSseEventLineEnding(chunk); - const trailer = separator + separator; - const hasTrailer = chunk.endsWith(trailer); - const chunkBody = hasTrailer ? chunk.slice(0, -trailer.length) : chunk; +const rewriteSseDataLines = (chunk: string, payload: string): string => { + const eventTrailerMatch = chunk.match(/(?:\r\n|\n){2}$/u); + const eventTrailer = eventTrailerMatch?.[0] ?? ""; + const chunkBody = eventTrailer ? chunk.slice(0, -eventTrailer.length) : chunk; + const dataBlockMatch = chunkBody.match( + /(?:^|\r\n|\n)(data:.*(?:\r\n|\n)?)+$/u + ); + if (!dataBlockMatch || dataBlockMatch.index === undefined) { + return chunk; + } - return { - lines: chunkBody.split(separator), - separator, - trailer: hasTrailer ? trailer : "", - }; + const prefix = chunkBody.slice(0, dataBlockMatch.index); + const dataBlock = dataBlockMatch[0]; + const separator = dataBlock.startsWith("\r\n") + ? "\r\n" + : dataBlock.startsWith("\n") + ? "\n" + : ""; + const lineEndingMatch = dataBlock.match(/(\r\n|\n)$/u); + const lineEnding = lineEndingMatch?.[0] ?? ""; + + return `${prefix}${separator}data: ${payload}${lineEnding}${eventTrailer}`; }; const transformSseEventChunk = ( @@ -248,11 +263,7 @@ const transformSseEventChunk = ( const jsonBody = JSON.parse(payload) as unknown; readStreamUsage(jsonBody); const transformed = transformClaudeResponsePayload(jsonBody, toolPrefix); - const { lines, separator, trailer } = splitSseEventChunk(chunk); - const rebuiltLines = lines.filter((line) => !line.startsWith("data:")); - rebuiltLines.push(`data: ${JSON.stringify(transformed)}`); - - return rebuiltLines.join(separator) + trailer; + return rewriteSseDataLines(chunk, JSON.stringify(transformed)); } catch { return chunk; } @@ -384,25 +395,16 @@ const maybeTransformClaudeStreamResponse = ( buffer += decoder.decode(value, { stream: true }); - let chunkSeparatorIndex = findSseEventBoundary(buffer); - while (chunkSeparatorIndex !== -1) { - const separatorLength = buffer.startsWith( - "\r\n\r\n", - chunkSeparatorIndex - ) - ? 4 - : 2; - const chunk = buffer.slice( - 0, - chunkSeparatorIndex + separatorLength - ); - buffer = buffer.slice(chunkSeparatorIndex + separatorLength); + let boundary = findSseEventBoundary(buffer); + while (boundary) { + const chunk = buffer.slice(0, boundary.index + boundary.length); + buffer = buffer.slice(boundary.index + boundary.length); controller.enqueue( encoder.encode( transformSseEventChunk(chunk, toolPrefix, readStreamUsage) ) ); - chunkSeparatorIndex = findSseEventBoundary(buffer); + boundary = findSseEventBoundary(buffer); } } } catch (error) { diff --git a/tests/providers/proxy-contract.test.ts b/tests/providers/proxy-contract.test.ts index 145c234..a67d59b 100644 --- a/tests/providers/proxy-contract.test.ts +++ b/tests/providers/proxy-contract.test.ts @@ -789,6 +789,42 @@ describe("proxy contract: claude", () => { ); }); + test("rewrites SSE events with mixed newline boundary separators", async () => { + const result = prepareClaudeUsageRequest(); + const encoder = new TextEncoder(); + + const sourceResponse = new Response( + new ReadableStream({ + start(controller): void { + controller.enqueue( + encoder.encode( + 'event: message\ndata: {"type":"tool_use","name":"mcp_shell"}\n\r\n' + ) + ); + controller.enqueue( + encoder.encode( + 'event: message\r\ndata: {"type":"tool_use","name":"mcp_browser"}\r\n\n' + ) + ); + controller.close(); + }, + }), + { + headers: { + "content-type": "text/event-stream", + }, + } + ); + + const transformedResponse = await result.transformResponse(sourceResponse); + const transformedText = await transformedResponse.text(); + + expect(transformedText).toBe( + 'event: message\ndata: {"type":"tool_use","name":"shell"}\n\r\n' + + 'event: message\r\ndata: {"type":"tool_use","name":"browser"}\r\n\n' + ); + }); + test("rewrites multiple SSE events delivered in one chunk", async () => { const result = prepareClaudeUsageRequest(); const encoder = new TextEncoder(); From 4fe516d0fcf898222469088f78f015b05b6be200 Mon Sep 17 00:00:00 2001 From: rexdotsh <65942753+rexdotsh@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:55:01 +0000 Subject: [PATCH 5/6] refactor: simplify claude SSE rewriting --- src/providers/proxies/claude-proxy.ts | 48 ++++++++------------------- 1 file changed, 13 insertions(+), 35 deletions(-) diff --git a/src/providers/proxies/claude-proxy.ts b/src/providers/proxies/claude-proxy.ts index 70e7478..426413c 100644 --- a/src/providers/proxies/claude-proxy.ts +++ b/src/providers/proxies/claude-proxy.ts @@ -182,29 +182,15 @@ const buildUpstreamUrl = (search: string): string => { const findSseEventBoundary = ( buffer: string ): { index: number; length: number } | null => { - for (let index = 0; index < buffer.length; index += 1) { - const firstLineEndingLength = buffer.startsWith("\r\n", index) ? 2 : 1; - const isLineEnding = - buffer[index] === "\n" || buffer.startsWith("\r\n", index); - - if (!isLineEnding) { - continue; - } - - const nextIndex = index + firstLineEndingLength; - const secondLineEndingLength = buffer.startsWith("\r\n", nextIndex) ? 2 : 1; - const hasBlankLine = - buffer[nextIndex] === "\n" || buffer.startsWith("\r\n", nextIndex); - - if (hasBlankLine) { - return { - index, - length: firstLineEndingLength + secondLineEndingLength, - }; - } + const match = /\r\n\r\n|\n\n|\n\r\n|\r\n\n/u.exec(buffer); + if (!match || match.index === undefined) { + return null; } - return null; + return { + index: match.index, + length: match[0].length, + }; }; const parseSseEventData = (chunk: string): string | null => { @@ -229,24 +215,16 @@ const rewriteSseDataLines = (chunk: string, payload: string): string => { const eventTrailerMatch = chunk.match(/(?:\r\n|\n){2}$/u); const eventTrailer = eventTrailerMatch?.[0] ?? ""; const chunkBody = eventTrailer ? chunk.slice(0, -eventTrailer.length) : chunk; - const dataBlockMatch = chunkBody.match( - /(?:^|\r\n|\n)(data:.*(?:\r\n|\n)?)+$/u + const rewrittenBody = chunkBody.replace( + /((?:^|\r\n|\n))(?:data:.*(?:\r\n|\n)?)+$/u, + (_, separator: string) => `${separator}data: ${payload}` ); - if (!dataBlockMatch || dataBlockMatch.index === undefined) { + + if (rewrittenBody === chunkBody) { return chunk; } - const prefix = chunkBody.slice(0, dataBlockMatch.index); - const dataBlock = dataBlockMatch[0]; - const separator = dataBlock.startsWith("\r\n") - ? "\r\n" - : dataBlock.startsWith("\n") - ? "\n" - : ""; - const lineEndingMatch = dataBlock.match(/(\r\n|\n)$/u); - const lineEnding = lineEndingMatch?.[0] ?? ""; - - return `${prefix}${separator}data: ${payload}${lineEnding}${eventTrailer}`; + return `${rewrittenBody}${eventTrailer}`; }; const transformSseEventChunk = ( From 631718f1c930ed3d61b03b83ef33af79794bb381 Mon Sep 17 00:00:00 2001 From: rexdotsh <65942753+rexdotsh@users.noreply.github.com> Date: Fri, 3 Apr 2026 12:58:04 +0000 Subject: [PATCH 6/6] fix: preserve mixed Claude SSE event trailers --- src/providers/proxies/claude-proxy.ts | 7 +++++-- tests/providers/proxy-contract.test.ts | 28 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/providers/proxies/claude-proxy.ts b/src/providers/proxies/claude-proxy.ts index 426413c..699c9f2 100644 --- a/src/providers/proxies/claude-proxy.ts +++ b/src/providers/proxies/claude-proxy.ts @@ -212,8 +212,11 @@ const parseSseEventData = (chunk: string): string | null => { }; const rewriteSseDataLines = (chunk: string, payload: string): string => { - const eventTrailerMatch = chunk.match(/(?:\r\n|\n){2}$/u); - const eventTrailer = eventTrailerMatch?.[0] ?? ""; + const boundary = findSseEventBoundary(chunk); + const eventTrailer = + boundary && boundary.index + boundary.length === chunk.length + ? chunk.slice(boundary.index) + : ""; const chunkBody = eventTrailer ? chunk.slice(0, -eventTrailer.length) : chunk; const rewrittenBody = chunkBody.replace( /((?:^|\r\n|\n))(?:data:.*(?:\r\n|\n)?)+$/u, diff --git a/tests/providers/proxy-contract.test.ts b/tests/providers/proxy-contract.test.ts index a67d59b..e177701 100644 --- a/tests/providers/proxy-contract.test.ts +++ b/tests/providers/proxy-contract.test.ts @@ -825,6 +825,34 @@ describe("proxy contract: claude", () => { ); }); + test("rewrites fragmented SSE events with mixed internal newlines", async () => { + const result = prepareClaudeUsageRequest(); + const encoder = new TextEncoder(); + + const sourceResponse = new Response( + new ReadableStream({ + start(controller): void { + controller.enqueue(encoder.encode("event: message\r\n")); + controller.enqueue(encoder.encode('data: {"type":"tool_use",\n')); + controller.enqueue(encoder.encode('data: "name":"mcp_shell"}\n\r\n')); + controller.close(); + }, + }), + { + headers: { + "content-type": "text/event-stream", + }, + } + ); + + const transformedResponse = await result.transformResponse(sourceResponse); + const transformedText = await transformedResponse.text(); + + expect(transformedText).toBe( + 'event: message\r\ndata: {"type":"tool_use","name":"shell"}\n\r\n' + ); + }); + test("rewrites multiple SSE events delivered in one chunk", async () => { const result = prepareClaudeUsageRequest(); const encoder = new TextEncoder();