Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions src/auth/session-affinity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ interface AffinityEntry {
entryId: string;
conversationId: string;
turnState?: string;
instructions?: string;
inputTokens?: number;
functionCallIds?: string[];
createdAt: number;
}

Expand All @@ -28,8 +31,24 @@ export class SessionAffinityMap {
}

/** Record that a response was created by a specific account in a conversation. */
record(responseId: string, entryId: string, conversationId: string, turnState?: string): void {
this.map.set(responseId, { entryId, conversationId, turnState, createdAt: Date.now() });
record(
responseId: string,
entryId: string,
conversationId: string,
turnState?: string,
instructions?: string,
inputTokens?: number,
functionCallIds?: string[],
): void {
this.map.set(responseId, {
entryId,
conversationId,
turnState,
instructions,
inputTokens,
functionCallIds: functionCallIds ? [...functionCallIds] : undefined,
createdAt: Date.now(),
});
}

/** Look up which account created a given response. */
Expand All @@ -44,12 +63,49 @@ export class SessionAffinityMap {
return entry?.conversationId ?? null;
}

/** Look up the latest response ID recorded for a conversation. */
lookupLatestResponseIdByConversationId(conversationId: string): string | null {
let latestResponseId: string | null = null;
let latestCreatedAt = -1;
for (const [responseId, entry] of this.map) {
if (entry.conversationId !== conversationId) continue;
const liveEntry = this.getEntry(responseId);
if (!liveEntry) continue;
if (liveEntry.createdAt >= latestCreatedAt) {
latestCreatedAt = liveEntry.createdAt;
latestResponseId = responseId;
}
}
return latestResponseId;
}

/** Look up the upstream turn-state token for a given response. */
lookupTurnState(responseId: string): string | null {
const entry = this.getEntry(responseId);
return entry?.turnState ?? null;
}

lookupInstructions(responseId: string): string | null {
const entry = this.getEntry(responseId);
return entry?.instructions ?? null;
}

lookupLatestInstructionsByConversationId(conversationId: string): string | null {
const responseId = this.lookupLatestResponseIdByConversationId(conversationId);
if (!responseId) return null;
return this.lookupInstructions(responseId);
}

lookupInputTokens(responseId: string): number | null {
const entry = this.getEntry(responseId);
return entry?.inputTokens ?? null;
}

lookupFunctionCallIds(responseId: string): string[] {
const entry = this.getEntry(responseId);
return entry?.functionCallIds ? [...entry.functionCallIds] : [];
}

private getEntry(responseId: string): AffinityEntry | null {
const entry = this.map.get(responseId);
if (!entry) return null;
Expand Down
11 changes: 9 additions & 2 deletions src/proxy/codex-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export { parseSSEBlock, parseSSEStream } from "./codex-sse.js";

import {
CodexApiError,
PreviousResponseWebSocketError,
type CodexResponsesRequest,
type CodexCompactRequest,
type CodexCompactResponse,
Expand Down Expand Up @@ -167,7 +168,7 @@ export class CodexApi {
/**
* Create a response (streaming).
* Routes to WebSocket when previous_response_id is present (HTTP SSE doesn't support it).
* Falls back to HTTP SSE if WebSocket fails.
* 仅当不依赖 previous_response_id 时,WebSocket 失败后才降级到 HTTP SSE
*/
async createResponse(
request: CodexResponsesRequest,
Expand All @@ -179,6 +180,12 @@ export class CodexApi {
return await this.createResponseViaWebSocket(request, signal, onRateLimits);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
if (request.previous_response_id) {
console.warn(
`[CodexApi] WebSocket 失败(${msg}),previous_response_id 不能安全降级到 HTTP SSE`,
);
throw new PreviousResponseWebSocketError(msg);
}
console.warn(`[CodexApi] WebSocket failed (${msg}), falling back to HTTP SSE`);
const { previous_response_id: _, useWebSocket: _ws, ...httpRequest } = request;
return this.createResponseViaHttp(httpRequest as CodexResponsesRequest, signal);
Expand Down Expand Up @@ -356,4 +363,4 @@ export class CodexApi {
}

// Re-export CodexApiError for backward compatibility
export { CodexApiError } from "./codex-types.js";
export { CodexApiError, PreviousResponseWebSocketError } from "./codex-types.js";
19 changes: 15 additions & 4 deletions src/proxy/codex-sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,23 @@ import type { CodexSSEEvent } from "./codex-types.js";
export function parseSSEBlock(block: string): CodexSSEEvent | null {
let event = "";
const dataLines: string[] = [];
let dataStarted = false;

for (const line of block.split("\n")) {
if (line.startsWith("event:")) {
event = line.slice(6).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice(5).trimStart());
const normalizedLine = line.endsWith("\r") ? line.slice(0, -1) : line;
if (normalizedLine.startsWith("event:")) {
event = normalizedLine.slice(6).trim();
} else if (normalizedLine.startsWith("data:")) {
dataStarted = true;
dataLines.push(normalizedLine.slice(5).trimStart());
} else if (
dataStarted &&
!normalizedLine.startsWith("id:") &&
!normalizedLine.startsWith("retry:") &&
!normalizedLine.startsWith(":")
) {
// 兼容非标准上游错误流:JSON 被漂亮打印成多行,但续行没有重复 data: 前缀。
dataLines.push(normalizedLine);
}
}

Expand Down
19 changes: 18 additions & 1 deletion src/proxy/codex-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface CodexResponsesRequest {
/** Optional: tools available to the model */
tools?: unknown[];
/** Optional: tool choice strategy */
tool_choice?: string | { type: string; name: string };
tool_choice?: string | { type: string; name?: string };
/** Optional: text output format (JSON mode / structured outputs) */
text?: {
format: {
Expand Down Expand Up @@ -126,3 +126,20 @@ export class CodexApiError extends Error {
super(`Codex API error (${status}): ${detail}`);
}
}

/** previous_response_id 只能通过 WebSocket 安全续链,失败后不能降级为 HTTP delta-only。 */
export class PreviousResponseWebSocketError extends CodexApiError {
constructor(public readonly causeMessage: string) {
super(
0,
JSON.stringify({
error: {
message:
"WebSocket failed while using previous_response_id; HTTP SSE fallback would drop server-side history: " +
causeMessage,
},
}),
);
this.name = "PreviousResponseWebSocketError";
}
}
2 changes: 1 addition & 1 deletion src/proxy/ws-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export interface WsCreateRequest {
previous_response_id?: string;
reasoning?: { effort?: string; summary?: string };
tools?: unknown[];
tool_choice?: string | { type: string; name: string };
tool_choice?: string | { type: string; name?: string };
text?: {
format: {
type: "text" | "json_object" | "json_schema";
Expand Down
41 changes: 36 additions & 5 deletions src/routes/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import {
handleProxyRequest,
handleDirectRequest,
type FormatAdapter,
type ResponseMetadata,
type UsageHint,
} from "./shared/proxy-handler.js";
import { extractAnthropicClientConversationId } from "./shared/anthropic-session-id.js";
import type { UpstreamRouter } from "../proxy/upstream-router.js";

function makeError(
Expand All @@ -42,10 +45,26 @@ function makeAnthropicFormat(wantThinking: boolean): FormatAdapter {
),
format429: (msg) => makeError("rate_limit_error", msg),
formatError: (_status, msg) => makeError("api_error", msg),
streamTranslator: (api, response, model, onUsage, onResponseId, _tupleSchema) =>
streamCodexToAnthropic(api, response, model, onUsage, onResponseId, wantThinking),
collectTranslator: (api, response, model, _tupleSchema) =>
collectCodexToAnthropicResponse(api, response, model, wantThinking),
streamTranslator: (
api,
response,
model,
onUsage,
onResponseId,
_tupleSchema,
usageHint?: UsageHint,
onResponseMetadata?: (metadata: ResponseMetadata) => void,
) =>
streamCodexToAnthropic(api, response, model, onUsage, onResponseId, wantThinking, usageHint, onResponseMetadata),
collectTranslator: (
api,
response,
model,
_tupleSchema,
usageHint?: UsageHint,
onResponseMetadata?: (metadata: ResponseMetadata) => void,
) =>
collectCodexToAnthropicResponse(api, response, model, wantThinking, usageHint, onResponseMetadata),
};
}

Expand Down Expand Up @@ -102,12 +121,24 @@ export function createMessagesRoutes(
}
}

const codexRequest = translateAnthropicToCodexRequest(req);
const clientConversationId = extractAnthropicClientConversationId(
req,
c.req.header("x-claude-code-session-id"),
);

const codexRequest = translateAnthropicToCodexRequest(req, undefined, {
injectHostedWebSearch: !allowUnauthenticated,
mapClaudeCodeWebSearch: !allowUnauthenticated && clientConversationId !== null,
});
if (!allowUnauthenticated) {
codexRequest.useWebSocket = true;
}
const wantThinking = req.thinking?.type === "enabled" || req.thinking?.type === "adaptive";
const proxyReq = {
codexRequest,
model: buildDisplayModelName(parseModelName(req.model)),
isStreaming: req.stream,
clientConversationId: clientConversationId ?? undefined,
};
const fmt = makeAnthropicFormat(wantThinking);

Expand Down
68 changes: 64 additions & 4 deletions src/routes/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,37 @@ import type { UpstreamRouter } from "../proxy/upstream-router.js";
import { acquireAccount, releaseAccount } from "./shared/account-acquisition.js";
import { handleCodexApiError } from "./shared/proxy-error-handler.js";
import { withRetry } from "../utils/retry.js";
import { extractCodexError } from "../types/codex-events.js";

// ── Helpers ────────────────────────────────────────────────────────

function isRecord(v: unknown): v is Record<string, unknown> {
return typeof v === "object" && v !== null && !Array.isArray(v);
}

function extractOutputTextFromItem(item: unknown): string {
if (!isRecord(item) || !Array.isArray(item.content)) return "";
const chunks: string[] = [];
for (const part of item.content) {
if (
isRecord(part) &&
(part.type === "output_text" || part.type === "text") &&
typeof part.text === "string"
) {
chunks.push(part.text);
}
}
return chunks.join("");
}

function syncOutputTextFromOutput(response: Record<string, unknown>): void {
if (!Array.isArray(response.output)) return;
const outputText = (response.output as unknown[])
.map(extractOutputTextFromItem)
.join("");
if (outputText) response.output_text = outputText;
}

// ── Passthrough stream translator ──────────────────────────────────

async function* streamPassthrough(
Expand Down Expand Up @@ -82,7 +106,11 @@ async function* streamPassthrough(
for (const item of resp.output as unknown[]) {
if (isRecord(item) && Array.isArray(item.content)) {
for (const part of item.content as unknown[]) {
if (isRecord(part) && part.type === "output_text" && typeof part.text === "string") {
if (
isRecord(part) &&
(part.type === "output_text" || part.type === "text") &&
typeof part.text === "string"
) {
try {
const parsed = JSON.parse(part.text) as unknown;
part.text = JSON.stringify(reconvertTupleValues(parsed, tupleSchema));
Expand Down Expand Up @@ -134,6 +162,8 @@ export async function collectPassthrough(
let finalResponse: unknown = null;
let usage = { input_tokens: 0, output_tokens: 0 };
let responseId: string | null = null;
const outputItems: unknown[] = [];
let textDeltas = "";

try {
for await (const raw of api.parseStream(response)) {
Expand All @@ -145,7 +175,32 @@ export async function collectPassthrough(
if (resp && typeof resp.id === "string") responseId = resp.id;
}

if (raw.event === "response.output_text.delta" && typeof data.delta === "string") {
textDeltas += data.delta;
}

if (raw.event === "response.output_item.done" && isRecord(data.item)) {
outputItems.push(data.item);
}

if (raw.event === "response.completed" && resp) {
// Codex hosted search 经常完整流出 output_item.done/text delta,
// 但 completed.response.output 为空。这里用流式事件回填最终 JSON。
if (Array.isArray(resp.output) && resp.output.length === 0) {
if (outputItems.length > 0) {
resp.output = outputItems;
} else if (textDeltas) {
resp.output = [{
type: "message",
role: "assistant",
status: "completed",
content: [{ type: "output_text", text: textDeltas }],
}];
}
}
if (typeof resp.output_text !== "string" || !resp.output_text) {
syncOutputTextFromOutput(resp);
}
finalResponse = resp;
if (typeof resp.id === "string") responseId = resp.id;
if (isRecord(resp.usage)) {
Expand All @@ -157,9 +212,9 @@ export async function collectPassthrough(
}

if (raw.event === "error" || raw.event === "response.failed") {
const err = isRecord(data.error) ? data.error : data;
const err = extractCodexError(data);
throw new Error(
`Codex API error: ${typeof err.code === "string" ? err.code : "unknown"}: ${typeof err.message === "string" ? err.message : JSON.stringify(data)}`,
`Codex API error: ${err.code}: ${err.message}`,
);
}
}
Expand All @@ -181,7 +236,11 @@ export async function collectPassthrough(
for (const item of resp.output as unknown[]) {
if (isRecord(item) && Array.isArray(item.content)) {
for (const part of item.content as unknown[]) {
if (isRecord(part) && part.type === "output_text" && typeof part.text === "string") {
if (
isRecord(part) &&
(part.type === "output_text" || part.type === "text") &&
typeof part.text === "string"
) {
try {
const parsed = JSON.parse(part.text) as unknown;
part.text = JSON.stringify(reconvertTupleValues(parsed, tupleSchema));
Expand All @@ -192,6 +251,7 @@ export async function collectPassthrough(
}
}
}
syncOutputTextFromOutput(resp);
}
}

Expand Down
Loading