From 94671cd87e2fa08ea570650fbda4d5695beadd74 Mon Sep 17 00:00:00 2001 From: Hango Liang Date: Thu, 16 Apr 2026 22:03:01 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E5=92=8C=20WebSearch=20=E8=BD=AC=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复 Claude / Anthropic 请求经过 Codex proxy 时的缓存统计问题,并补齐 hosted WebSearch 工具到 Codex web_search 的转换。 - 补齐 Anthropic 响应中的缓存创建用量字段 - 使用请求内容派生稳定的 prompt_cache_key - 加强 session affinity 和 previous_response 续链稳定性 - 支持 OpenAI、Anthropic、Gemini 与 Claude Code WebSearch 的 hosted web_search 转换 - 增加缓存统计、稳定会话 key、隐式续链和工具转换相关单测 --- src/auth/session-affinity.ts | 60 ++- src/proxy/codex-api.ts | 11 +- src/proxy/codex-sse.ts | 19 +- src/proxy/codex-types.ts | 19 +- src/proxy/ws-transport.ts | 2 +- src/routes/messages.ts | 38 +- src/routes/responses.ts | 68 +++- src/routes/shared/anthropic-session-id.ts | 25 ++ src/routes/shared/proxy-handler.ts | 261 ++++++++++++- src/routes/shared/response-processor.ts | 7 +- src/routes/shared/stable-conversation-key.ts | 53 +++ src/translation/anthropic-to-codex.ts | 30 +- src/translation/codex-event-extractor.ts | 4 + src/translation/codex-to-anthropic.ts | 65 +++- src/translation/tool-format.ts | 119 +++++- src/types/anthropic.ts | 28 +- src/types/codex-events.ts | 123 +++++- src/types/gemini.ts | 5 +- src/types/openai.ts | 25 +- tests/unit/auth/session-affinity.test.ts | 41 ++ tests/unit/cache-reporting.test.ts | 354 ++++++++++++++++++ tests/unit/proxy/codex-api-headers.test.ts | 28 ++ tests/unit/proxy/codex-sse.test.ts | 22 ++ .../routes/responses-premature-close.test.ts | 129 +++++++ .../shared/anthropic-session-id.test.ts | 54 +++ .../proxy-handler-implicit-resume.test.ts | 78 ++++ .../unit/routes/upstream-auth-bypass.test.ts | 7 + tests/unit/stable-conversation-key.test.ts | 274 ++++++++++++++ .../translation/anthropic-to-codex.test.ts | 93 +++-- tests/unit/translation/tool-format.test.ts | 204 +++++++++- tests/unit/types/codex-events.test.ts | 32 ++ 31 files changed, 2157 insertions(+), 121 deletions(-) create mode 100644 src/routes/shared/anthropic-session-id.ts create mode 100644 src/routes/shared/stable-conversation-key.ts create mode 100644 tests/unit/cache-reporting.test.ts create mode 100644 tests/unit/routes/shared/anthropic-session-id.test.ts create mode 100644 tests/unit/routes/shared/proxy-handler-implicit-resume.test.ts create mode 100644 tests/unit/stable-conversation-key.test.ts diff --git a/src/auth/session-affinity.ts b/src/auth/session-affinity.ts index cc9c8067..e06bc391 100644 --- a/src/auth/session-affinity.ts +++ b/src/auth/session-affinity.ts @@ -11,6 +11,9 @@ interface AffinityEntry { entryId: string; conversationId: string; turnState?: string; + instructions?: string; + inputTokens?: number; + functionCallIds?: string[]; createdAt: number; } @@ -28,8 +31,24 @@ export class SessionAffinityMap { } /** Record that a response was created by a specific account in a conversation. */ - record(responseId: string, entryId: string, conversationId: string, turnState?: string): void { - this.map.set(responseId, { entryId, conversationId, turnState, createdAt: Date.now() }); + record( + responseId: string, + entryId: string, + conversationId: string, + turnState?: string, + instructions?: string, + inputTokens?: number, + functionCallIds?: string[], + ): void { + this.map.set(responseId, { + entryId, + conversationId, + turnState, + instructions, + inputTokens, + functionCallIds: functionCallIds ? [...functionCallIds] : undefined, + createdAt: Date.now(), + }); } /** Look up which account created a given response. */ @@ -44,12 +63,49 @@ export class SessionAffinityMap { return entry?.conversationId ?? null; } + /** Look up the latest response ID recorded for a conversation. */ + lookupLatestResponseIdByConversationId(conversationId: string): string | null { + let latestResponseId: string | null = null; + let latestCreatedAt = -1; + for (const [responseId, entry] of this.map) { + if (entry.conversationId !== conversationId) continue; + const liveEntry = this.getEntry(responseId); + if (!liveEntry) continue; + if (liveEntry.createdAt >= latestCreatedAt) { + latestCreatedAt = liveEntry.createdAt; + latestResponseId = responseId; + } + } + return latestResponseId; + } + /** Look up the upstream turn-state token for a given response. */ lookupTurnState(responseId: string): string | null { const entry = this.getEntry(responseId); return entry?.turnState ?? null; } + lookupInstructions(responseId: string): string | null { + const entry = this.getEntry(responseId); + return entry?.instructions ?? null; + } + + lookupLatestInstructionsByConversationId(conversationId: string): string | null { + const responseId = this.lookupLatestResponseIdByConversationId(conversationId); + if (!responseId) return null; + return this.lookupInstructions(responseId); + } + + lookupInputTokens(responseId: string): number | null { + const entry = this.getEntry(responseId); + return entry?.inputTokens ?? null; + } + + lookupFunctionCallIds(responseId: string): string[] { + const entry = this.getEntry(responseId); + return entry?.functionCallIds ? [...entry.functionCallIds] : []; + } + private getEntry(responseId: string): AffinityEntry | null { const entry = this.map.get(responseId); if (!entry) return null; diff --git a/src/proxy/codex-api.ts b/src/proxy/codex-api.ts index 5cb1c932..3c53a080 100644 --- a/src/proxy/codex-api.ts +++ b/src/proxy/codex-api.ts @@ -41,6 +41,7 @@ export { parseSSEBlock, parseSSEStream } from "./codex-sse.js"; import { CodexApiError, + PreviousResponseWebSocketError, type CodexResponsesRequest, type CodexCompactRequest, type CodexCompactResponse, @@ -167,7 +168,7 @@ export class CodexApi { /** * Create a response (streaming). * Routes to WebSocket when previous_response_id is present (HTTP SSE doesn't support it). - * Falls back to HTTP SSE if WebSocket fails. + * 仅当不依赖 previous_response_id 时,WebSocket 失败后才降级到 HTTP SSE。 */ async createResponse( request: CodexResponsesRequest, @@ -179,6 +180,12 @@ export class CodexApi { return await this.createResponseViaWebSocket(request, signal, onRateLimits); } catch (err) { const msg = err instanceof Error ? err.message : String(err); + if (request.previous_response_id) { + console.warn( + `[CodexApi] WebSocket 失败(${msg}),previous_response_id 不能安全降级到 HTTP SSE`, + ); + throw new PreviousResponseWebSocketError(msg); + } console.warn(`[CodexApi] WebSocket failed (${msg}), falling back to HTTP SSE`); const { previous_response_id: _, useWebSocket: _ws, ...httpRequest } = request; return this.createResponseViaHttp(httpRequest as CodexResponsesRequest, signal); @@ -356,4 +363,4 @@ export class CodexApi { } // Re-export CodexApiError for backward compatibility -export { CodexApiError } from "./codex-types.js"; +export { CodexApiError, PreviousResponseWebSocketError } from "./codex-types.js"; diff --git a/src/proxy/codex-sse.ts b/src/proxy/codex-sse.ts index c309b91f..1a27caa6 100644 --- a/src/proxy/codex-sse.ts +++ b/src/proxy/codex-sse.ts @@ -8,12 +8,23 @@ import type { CodexSSEEvent } from "./codex-types.js"; export function parseSSEBlock(block: string): CodexSSEEvent | null { let event = ""; const dataLines: string[] = []; + let dataStarted = false; for (const line of block.split("\n")) { - if (line.startsWith("event:")) { - event = line.slice(6).trim(); - } else if (line.startsWith("data:")) { - dataLines.push(line.slice(5).trimStart()); + const normalizedLine = line.endsWith("\r") ? line.slice(0, -1) : line; + if (normalizedLine.startsWith("event:")) { + event = normalizedLine.slice(6).trim(); + } else if (normalizedLine.startsWith("data:")) { + dataStarted = true; + dataLines.push(normalizedLine.slice(5).trimStart()); + } else if ( + dataStarted && + !normalizedLine.startsWith("id:") && + !normalizedLine.startsWith("retry:") && + !normalizedLine.startsWith(":") + ) { + // 兼容非标准上游错误流:JSON 被漂亮打印成多行,但续行没有重复 data: 前缀。 + dataLines.push(normalizedLine); } } diff --git a/src/proxy/codex-types.ts b/src/proxy/codex-types.ts index 5152de36..9a9f60bf 100644 --- a/src/proxy/codex-types.ts +++ b/src/proxy/codex-types.ts @@ -16,7 +16,7 @@ export interface CodexResponsesRequest { /** Optional: tools available to the model */ tools?: unknown[]; /** Optional: tool choice strategy */ - tool_choice?: string | { type: string; name: string }; + tool_choice?: string | { type: string; name?: string }; /** Optional: text output format (JSON mode / structured outputs) */ text?: { format: { @@ -126,3 +126,20 @@ export class CodexApiError extends Error { super(`Codex API error (${status}): ${detail}`); } } + +/** previous_response_id 只能通过 WebSocket 安全续链,失败后不能降级为 HTTP delta-only。 */ +export class PreviousResponseWebSocketError extends CodexApiError { + constructor(public readonly causeMessage: string) { + super( + 0, + JSON.stringify({ + error: { + message: + "WebSocket failed while using previous_response_id; HTTP SSE fallback would drop server-side history: " + + causeMessage, + }, + }), + ); + this.name = "PreviousResponseWebSocketError"; + } +} diff --git a/src/proxy/ws-transport.ts b/src/proxy/ws-transport.ts index 1618648a..a875354f 100644 --- a/src/proxy/ws-transport.ts +++ b/src/proxy/ws-transport.ts @@ -42,7 +42,7 @@ export interface WsCreateRequest { previous_response_id?: string; reasoning?: { effort?: string; summary?: string }; tools?: unknown[]; - tool_choice?: string | { type: string; name: string }; + tool_choice?: string | { type: string; name?: string }; text?: { format: { type: "text" | "json_object" | "json_schema"; diff --git a/src/routes/messages.ts b/src/routes/messages.ts index 20d4afff..d4e1f870 100644 --- a/src/routes/messages.ts +++ b/src/routes/messages.ts @@ -21,7 +21,10 @@ import { handleProxyRequest, handleDirectRequest, type FormatAdapter, + type ResponseMetadata, + type UsageHint, } from "./shared/proxy-handler.js"; +import { extractAnthropicClientConversationId } from "./shared/anthropic-session-id.js"; import type { UpstreamRouter } from "../proxy/upstream-router.js"; function makeError( @@ -42,10 +45,26 @@ function makeAnthropicFormat(wantThinking: boolean): FormatAdapter { ), format429: (msg) => makeError("rate_limit_error", msg), formatError: (_status, msg) => makeError("api_error", msg), - streamTranslator: (api, response, model, onUsage, onResponseId, _tupleSchema) => - streamCodexToAnthropic(api, response, model, onUsage, onResponseId, wantThinking), - collectTranslator: (api, response, model, _tupleSchema) => - collectCodexToAnthropicResponse(api, response, model, wantThinking), + streamTranslator: ( + api, + response, + model, + onUsage, + onResponseId, + _tupleSchema, + usageHint?: UsageHint, + onResponseMetadata?: (metadata: ResponseMetadata) => void, + ) => + streamCodexToAnthropic(api, response, model, onUsage, onResponseId, wantThinking, usageHint, onResponseMetadata), + collectTranslator: ( + api, + response, + model, + _tupleSchema, + usageHint?: UsageHint, + onResponseMetadata?: (metadata: ResponseMetadata) => void, + ) => + collectCodexToAnthropicResponse(api, response, model, wantThinking, usageHint, onResponseMetadata), }; } @@ -102,12 +121,21 @@ export function createMessagesRoutes( } } - const codexRequest = translateAnthropicToCodexRequest(req); + const clientConversationId = extractAnthropicClientConversationId( + req, + c.req.header("x-claude-code-session-id"), + ); + + const codexRequest = translateAnthropicToCodexRequest(req, undefined, { + injectHostedWebSearch: !allowUnauthenticated, + }); + codexRequest.useWebSocket = true; const wantThinking = req.thinking?.type === "enabled" || req.thinking?.type === "adaptive"; const proxyReq = { codexRequest, model: buildDisplayModelName(parseModelName(req.model)), isStreaming: req.stream, + clientConversationId: clientConversationId ?? undefined, }; const fmt = makeAnthropicFormat(wantThinking); diff --git a/src/routes/responses.ts b/src/routes/responses.ts index d5d1f5f4..a36478b4 100644 --- a/src/routes/responses.ts +++ b/src/routes/responses.ts @@ -29,6 +29,7 @@ import type { UpstreamRouter } from "../proxy/upstream-router.js"; import { acquireAccount, releaseAccount } from "./shared/account-acquisition.js"; import { handleCodexApiError } from "./shared/proxy-error-handler.js"; import { withRetry } from "../utils/retry.js"; +import { extractCodexError } from "../types/codex-events.js"; // ── Helpers ──────────────────────────────────────────────────────── @@ -36,6 +37,29 @@ function isRecord(v: unknown): v is Record { return typeof v === "object" && v !== null && !Array.isArray(v); } +function extractOutputTextFromItem(item: unknown): string { + if (!isRecord(item) || !Array.isArray(item.content)) return ""; + const chunks: string[] = []; + for (const part of item.content) { + if ( + isRecord(part) && + (part.type === "output_text" || part.type === "text") && + typeof part.text === "string" + ) { + chunks.push(part.text); + } + } + return chunks.join(""); +} + +function syncOutputTextFromOutput(response: Record): void { + if (!Array.isArray(response.output)) return; + const outputText = (response.output as unknown[]) + .map(extractOutputTextFromItem) + .join(""); + if (outputText) response.output_text = outputText; +} + // ── Passthrough stream translator ────────────────────────────────── async function* streamPassthrough( @@ -82,7 +106,11 @@ async function* streamPassthrough( for (const item of resp.output as unknown[]) { if (isRecord(item) && Array.isArray(item.content)) { for (const part of item.content as unknown[]) { - if (isRecord(part) && part.type === "output_text" && typeof part.text === "string") { + if ( + isRecord(part) && + (part.type === "output_text" || part.type === "text") && + typeof part.text === "string" + ) { try { const parsed = JSON.parse(part.text) as unknown; part.text = JSON.stringify(reconvertTupleValues(parsed, tupleSchema)); @@ -134,6 +162,8 @@ export async function collectPassthrough( let finalResponse: unknown = null; let usage = { input_tokens: 0, output_tokens: 0 }; let responseId: string | null = null; + const outputItems: unknown[] = []; + let textDeltas = ""; try { for await (const raw of api.parseStream(response)) { @@ -145,7 +175,32 @@ export async function collectPassthrough( if (resp && typeof resp.id === "string") responseId = resp.id; } + if (raw.event === "response.output_text.delta" && typeof data.delta === "string") { + textDeltas += data.delta; + } + + if (raw.event === "response.output_item.done" && isRecord(data.item)) { + outputItems.push(data.item); + } + if (raw.event === "response.completed" && resp) { + // Codex hosted search 经常完整流出 output_item.done/text delta, + // 但 completed.response.output 为空。这里用流式事件回填最终 JSON。 + if (Array.isArray(resp.output) && resp.output.length === 0) { + if (outputItems.length > 0) { + resp.output = outputItems; + } else if (textDeltas) { + resp.output = [{ + type: "message", + role: "assistant", + status: "completed", + content: [{ type: "output_text", text: textDeltas }], + }]; + } + } + if (typeof resp.output_text !== "string" || !resp.output_text) { + syncOutputTextFromOutput(resp); + } finalResponse = resp; if (typeof resp.id === "string") responseId = resp.id; if (isRecord(resp.usage)) { @@ -157,9 +212,9 @@ export async function collectPassthrough( } if (raw.event === "error" || raw.event === "response.failed") { - const err = isRecord(data.error) ? data.error : data; + const err = extractCodexError(data); throw new Error( - `Codex API error: ${typeof err.code === "string" ? err.code : "unknown"}: ${typeof err.message === "string" ? err.message : JSON.stringify(data)}`, + `Codex API error: ${err.code}: ${err.message}`, ); } } @@ -181,7 +236,11 @@ export async function collectPassthrough( for (const item of resp.output as unknown[]) { if (isRecord(item) && Array.isArray(item.content)) { for (const part of item.content as unknown[]) { - if (isRecord(part) && part.type === "output_text" && typeof part.text === "string") { + if ( + isRecord(part) && + (part.type === "output_text" || part.type === "text") && + typeof part.text === "string" + ) { try { const parsed = JSON.parse(part.text) as unknown; part.text = JSON.stringify(reconvertTupleValues(parsed, tupleSchema)); @@ -192,6 +251,7 @@ export async function collectPassthrough( } } } + syncOutputTextFromOutput(resp); } } diff --git a/src/routes/shared/anthropic-session-id.ts b/src/routes/shared/anthropic-session-id.ts new file mode 100644 index 00000000..24144cbf --- /dev/null +++ b/src/routes/shared/anthropic-session-id.ts @@ -0,0 +1,25 @@ +import type { AnthropicMessagesRequest } from "../../types/anthropic.js"; + +function parseMetadataSessionId(userId: string | undefined): string | null { + if (!userId) return null; + try { + const parsed = JSON.parse(userId) as { session_id?: unknown; device_id?: unknown }; + return typeof parsed.session_id === "string" && + parsed.session_id && + typeof parsed.device_id === "string" && + parsed.device_id + ? parsed.session_id + : null; + } catch { + return null; + } +} + +export function extractAnthropicClientConversationId( + req: AnthropicMessagesRequest, + headerSessionId: string | undefined, +): string | null { + const normalizedHeaderSessionId = headerSessionId?.trim(); + if (normalizedHeaderSessionId) return normalizedHeaderSessionId; + return parseMetadataSessionId(req.metadata?.user_id); +} diff --git a/src/routes/shared/proxy-handler.ts b/src/routes/shared/proxy-handler.ts index 3d36a37b..76aff7a6 100644 --- a/src/routes/shared/proxy-handler.ts +++ b/src/routes/shared/proxy-handler.ts @@ -12,7 +12,11 @@ import crypto from "crypto"; import type { Context } from "hono"; import type { StatusCode } from "hono/utils/http-status"; import { stream } from "hono/streaming"; -import { CodexApi, CodexApiError } from "../../proxy/codex-api.js"; +import { + CodexApi, + CodexApiError, + PreviousResponseWebSocketError, +} from "../../proxy/codex-api.js"; import type { CodexResponsesRequest } from "../../proxy/codex-api.js"; import type { UpstreamAdapter } from "../../proxy/upstream-adapter.js"; import { EmptyResponseError } from "../../translation/codex-event-extractor.js"; @@ -28,14 +32,27 @@ import { parseRateLimitHeaders, rateLimitToQuota, type ParsedRateLimit } from ". import { getConfig } from "../../config.js"; import { jitterInt } from "../../utils/jitter.js"; import { getSessionAffinityMap, type SessionAffinityMap } from "../../auth/session-affinity.js"; +import { deriveStableConversationKey } from "./stable-conversation-key.js"; /** Data prepared by each route after parsing and translating the request. */ export interface ProxyRequest { codexRequest: CodexResponsesRequest; model: string; isStreaming: boolean; + /** Stable client-side conversation/session identifier when the upstream client provides one. */ + clientConversationId?: string; /** Original schema before tuple→object conversion (for response reconversion). */ tupleSchema?: Record | null; + /** Whether this is a new conversation (no previous_response_id) — used for cache reporting. */ + isNewConversation?: boolean; +} + +export interface UsageHint { + reusedInputTokensUpperBound?: number; +} + +export interface ResponseMetadata { + functionCallIds?: string[]; } /** Format-specific adapter provided by each route. */ @@ -52,12 +69,16 @@ export interface FormatAdapter { onUsage: (u: { input_tokens: number; output_tokens: number; cached_tokens?: number; reasoning_tokens?: number }) => void, onResponseId: (id: string) => void, tupleSchema?: Record | null, + usageHint?: UsageHint, + onResponseMetadata?: (metadata: ResponseMetadata) => void, ) => AsyncGenerator; collectTranslator: ( api: UpstreamAdapter, response: Response, model: string, tupleSchema?: Record | null, + usageHint?: UsageHint, + onResponseMetadata?: (metadata: ResponseMetadata) => void, ) => Promise<{ response: unknown; usage: { input_tokens: number; output_tokens: number; cached_tokens?: number; reasoning_tokens?: number }; @@ -67,6 +88,66 @@ export interface FormatAdapter { const MAX_EMPTY_RETRIES = 2; +function normalizeInstructions(instructions: string | null | undefined): string { + return instructions ?? ""; +} + +export function shouldActivateImplicitResume(opts: { + implicitPrevRespId: string | null; + continuationInputStart: number; + inputLength: number; + preferredEntryId: string | null; + acquiredEntryId: string; + currentInstructions: string | null | undefined; + storedInstructions: string | null; + requiredFunctionCallOutputIds?: string[]; + storedFunctionCallIds?: string[]; +}): boolean { + const storedFunctionCallIds = new Set(opts.storedFunctionCallIds ?? []); + const requiredFunctionCallOutputIds = opts.requiredFunctionCallOutputIds ?? []; + const hasAllRequiredToolCalls = requiredFunctionCallOutputIds.every((callId) => + storedFunctionCallIds.has(callId), + ); + + return Boolean( + opts.implicitPrevRespId && + opts.continuationInputStart < opts.inputLength && + opts.preferredEntryId && + opts.acquiredEntryId === opts.preferredEntryId && + normalizeInstructions(opts.currentInstructions) === normalizeInstructions(opts.storedInstructions) && + hasAllRequiredToolCalls, + ); +} + +export function shouldReplayFullInputAfterImplicitResumeError( + err: unknown, + implicitResumeActive: boolean, +): err is PreviousResponseWebSocketError { + return implicitResumeActive && err instanceof PreviousResponseWebSocketError; +} + +function getContinuationInputStartIndex(input: CodexResponsesRequest["input"]): number { + let lastModelOutputIndex = -1; + for (let i = 0; i < input.length; i++) { + const item = input[i]; + if ("role" in item) { + if (item.role === "assistant") lastModelOutputIndex = i; + continue; + } + if (item.type === "function_call") { + lastModelOutputIndex = i; + } + } + return lastModelOutputIndex >= 0 ? lastModelOutputIndex + 1 : 0; +} + +function getFunctionCallOutputIds(input: CodexResponsesRequest["input"]): string[] { + return input + .filter((item): item is { type: "function_call_output"; call_id: string; output: string } => + !("role" in item) && item.type === "function_call_output") + .map((item) => item.call_id); +} + /** Sleep if this account had a recent request, to stagger upstream traffic. */ export async function staggerIfNeeded(prevSlotMs: number | null): Promise { const intervalMs = getConfig().auth.request_interval_ms; @@ -96,19 +177,58 @@ export async function handleProxyRequest( fmt: FormatAdapter, proxyPool?: ProxyPool, ): Promise { - // Session affinity: prefer the account that created the previous response const affinityMap = getSessionAffinityMap(); - const prevRespId = req.codexRequest.previous_response_id; - const preferredEntryId = prevRespId ? affinityMap.lookup(prevRespId) : null; + if (!Array.isArray(req.codexRequest.input)) { + req.codexRequest.input = []; + } + const originalInput = req.codexRequest.input; + const originalPreviousResponseId = req.codexRequest.previous_response_id; + const originalTurnState = req.codexRequest.turnState; + const originalUseWebSocket = req.codexRequest.useWebSocket; + const currentInstructions = req.codexRequest.instructions; + const explicitPrevRespId = req.codexRequest.previous_response_id; + const derivedConversationId = deriveStableConversationKey(req.codexRequest); + const promptCacheKey = derivedConversationId ?? req.clientConversationId ?? crypto.randomUUID(); + const continuationInputStart = explicitPrevRespId ? 0 : getContinuationInputStartIndex(req.codexRequest.input); + const explicitConversationId = explicitPrevRespId ? affinityMap.lookupConversationId(explicitPrevRespId) : null; + const chainConversationId = explicitConversationId ?? req.clientConversationId ?? promptCacheKey; + const implicitPrevRespId = + !explicitPrevRespId && + continuationInputStart > 0 && + req.clientConversationId + ? affinityMap.lookupLatestResponseIdByConversationId(req.clientConversationId) + : null; + const prevRespId = explicitPrevRespId ?? implicitPrevRespId; + const implicitStoredInstructions = implicitPrevRespId + ? affinityMap.lookupInstructions(implicitPrevRespId) + : null; + const implicitContinuationInput = req.codexRequest.input.slice(continuationInputStart); + const requiredFunctionCallOutputIds = implicitPrevRespId + ? getFunctionCallOutputIds(implicitContinuationInput) + : []; + const implicitStoredFunctionCallIds = implicitPrevRespId + ? affinityMap.lookupFunctionCallIds(implicitPrevRespId) + : []; + const missingFunctionCallOutputIds = requiredFunctionCallOutputIds.filter( + (callId) => !implicitStoredFunctionCallIds.includes(callId), + ); - // Conversation ID: inherit from previous response chain, or generate new - const conversationId = (prevRespId ? affinityMap.lookupConversationId(prevRespId) : null) - ?? crypto.randomUUID(); - req.codexRequest.prompt_cache_key = conversationId; + // Session affinity: prefer the account that created the previous response + const preferredEntryId = + explicitPrevRespId + ? affinityMap.lookup(explicitPrevRespId) + : implicitPrevRespId && normalizeInstructions(currentInstructions) === normalizeInstructions(implicitStoredInstructions) + ? affinityMap.lookup(implicitPrevRespId) + : null; + + // Conversation ID: inherit from previous response chain, or derive from + // content hash (enables cache hits across turns even without previous_response_id), + // or fall back to a random UUID. + req.codexRequest.prompt_cache_key = promptCacheKey; // Turn state: sticky routing token from upstream, echoed back on subsequent requests - const prevTurnState = prevRespId ? affinityMap.lookupTurnState(prevRespId) : null; - if (prevTurnState) req.codexRequest.turnState = prevTurnState; + const explicitTurnState = explicitPrevRespId ? affinityMap.lookupTurnState(explicitPrevRespId) : null; + if (explicitTurnState) req.codexRequest.turnState = explicitTurnState; // Set include for reasoning-enabled requests (matches Codex CLI behavior) if (req.codexRequest.reasoning && !req.codexRequest.include?.length) { @@ -128,9 +248,52 @@ export async function handleProxyRequest( let modelRetried = false; let usageInfo: UsageInfo | undefined; let capturedResponseId: string | null = null; + const responseFunctionCallIds = new Set(); + let activeUsageHint: UsageHint | undefined; + let implicitResumeActive = false; // Idempotent-release guard: prevents double-release across retry branches const released = new Set(); + if (implicitPrevRespId && missingFunctionCallOutputIds.length > 0) { + console.warn( + `[${fmt.tag}] 隐式续链跳过:上一轮 response 未记录 tool_result 对应的 call_id=` + + missingFunctionCallOutputIds.slice(0, 3).join(","), + ); + } + + if (shouldActivateImplicitResume({ + implicitPrevRespId, + continuationInputStart, + inputLength: req.codexRequest.input.length, + preferredEntryId, + acquiredEntryId: entryId, + currentInstructions, + storedInstructions: implicitStoredInstructions, + requiredFunctionCallOutputIds, + storedFunctionCallIds: implicitStoredFunctionCallIds, + })) { + req.codexRequest.previous_response_id = implicitPrevRespId!; + req.codexRequest.useWebSocket = true; + req.codexRequest.input = req.codexRequest.input.slice(continuationInputStart); + const implicitTurnState = affinityMap.lookupTurnState(implicitPrevRespId!); + if (implicitTurnState) req.codexRequest.turnState = implicitTurnState; + activeUsageHint = { + reusedInputTokensUpperBound: affinityMap.lookupInputTokens(implicitPrevRespId!) ?? undefined, + }; + implicitResumeActive = true; + } + + const restoreImplicitResumeRequest = (): void => { + if (!implicitResumeActive) return; + req.codexRequest.previous_response_id = originalPreviousResponseId; + req.codexRequest.turnState = originalTurnState; + req.codexRequest.useWebSocket = originalUseWebSocket; + req.codexRequest.input = originalInput; + req.codexRequest.instructions = currentInstructions; + activeUsageHint = undefined; + implicitResumeActive = false; + }; + { const reqJson = JSON.stringify(req.codexRequest); const inputItems = req.codexRequest.input?.length ?? 0; @@ -206,18 +369,41 @@ export async function handleProxyRequest( return stream(c, async (s) => { s.onAbort(() => abortController.abort()); + const recordStreamAffinity = (): void => { + if (!capturedResponseId) return; + affinityMap.record( + capturedResponseId, + capturedEntryId, + chainConversationId, + upstreamTurnState, + req.codexRequest.instructions ?? undefined, + usageInfo?.input_tokens, + Array.from(responseFunctionCallIds), + ); + }; try { await streamResponse( s, capturedApi, rawResponse, req.model, fmt, - (u) => { usageInfo = u; }, + (u) => { + usageInfo = u; + recordStreamAffinity(); + }, req.tupleSchema, - (id) => { capturedResponseId = id; }, + (id) => { + capturedResponseId = id; + recordStreamAffinity(); + }, + activeUsageHint, + (metadata) => { + for (const callId of metadata.functionCallIds ?? []) { + responseFunctionCallIds.add(callId); + } + recordStreamAffinity(); + }, ); } finally { abortController.abort(); - if (capturedResponseId) { - affinityMap.record(capturedResponseId, capturedEntryId, conversationId, upstreamTurnState); - } + recordStreamAffinity(); if (usageInfo) { const uncached = usageInfo.cached_tokens ? usageInfo.input_tokens - usageInfo.cached_tokens @@ -243,7 +429,16 @@ export async function handleProxyRequest( // ── Non-streaming path (with empty-response retry) ── return await handleNonStreaming( c, accountPool, cookieJar, req, fmt, proxyPool, - codexApi, rawResponse, entryId, abortController, released, affinityMap, conversationId, upstreamTurnState, + codexApi, + rawResponse, + entryId, + abortController, + released, + affinityMap, + chainConversationId, + upstreamTurnState, + () => activeUsageHint, + restoreImplicitResumeRequest, ); } catch (err) { if (!(err instanceof CodexApiError)) { @@ -251,6 +446,14 @@ export async function handleProxyRequest( throw err; } + if (shouldReplayFullInputAfterImplicitResumeError(err, implicitResumeActive)) { + console.warn( + `[${fmt.tag}] 隐式续链 WebSocket 失败,回退为完整历史重放:${err.causeMessage}`, + ); + restoreImplicitResumeRequest(); + continue; + } + const decision = handleCodexApiError( err, accountPool, entryId, req.codexRequest.model, fmt.tag, modelRetried, ); @@ -264,6 +467,7 @@ export async function handleProxyRequest( if (decision.releaseBeforeRetry) { releaseAccount(accountPool, entryId, undefined, released); } + restoreImplicitResumeRequest(); if (decision.markModelRetried) { modelRetried = true; } @@ -326,6 +530,8 @@ async function handleNonStreaming( affinityMap?: SessionAffinityMap, conversationId?: string, turnState?: string, + getUsageHint?: () => UsageHint | undefined, + restoreImplicitResumeRequest?: () => void, ): Promise { let currentEntryId = initialEntryId; let currentApi = initialApi; @@ -333,11 +539,29 @@ async function handleNonStreaming( for (let attempt = 1; ; attempt++) { try { + const responseFunctionCallIds = new Set(); const result = await fmt.collectTranslator( - currentApi, currentRawResponse, req.model, req.tupleSchema, + currentApi, + currentRawResponse, + req.model, + req.tupleSchema, + getUsageHint?.(), + (metadata) => { + for (const callId of metadata.functionCallIds ?? []) { + responseFunctionCallIds.add(callId); + } + }, ); if (result.responseId && affinityMap && conversationId) { - affinityMap.record(result.responseId, currentEntryId, conversationId, turnState); + affinityMap.record( + result.responseId, + currentEntryId, + conversationId, + turnState, + req.codexRequest.instructions ?? undefined, + result.usage.input_tokens, + Array.from(responseFunctionCallIds), + ); } if (result.usage) { const u = result.usage; @@ -362,6 +586,7 @@ async function handleNonStreaming( ); accountPool.recordEmptyResponse(currentEntryId); releaseAccount(accountPool, currentEntryId, collectErr.usage, released); + restoreImplicitResumeRequest?.(); const newAcquired = acquireAccount(accountPool, req.codexRequest.model, undefined, fmt.tag); if (!newAcquired) { diff --git a/src/routes/shared/response-processor.ts b/src/routes/shared/response-processor.ts index 60b0c8d9..c90b5997 100644 --- a/src/routes/shared/response-processor.ts +++ b/src/routes/shared/response-processor.ts @@ -5,7 +5,7 @@ */ import type { UpstreamAdapter } from "../../proxy/upstream-adapter.js"; -import type { FormatAdapter } from "./proxy-handler.js"; +import type { FormatAdapter, ResponseMetadata, UsageHint } from "./proxy-handler.js"; import type { UsageInfo } from "../../translation/codex-event-extractor.js"; /** Minimal subset of Hono's StreamingApi that we actually use. */ @@ -29,6 +29,8 @@ export async function streamResponse( onUsage: (u: UsageInfo) => void, tupleSchema?: Record | null, onResponseId?: (id: string) => void, + usageHint?: UsageHint, + onResponseMetadata?: (metadata: ResponseMetadata) => void, ): Promise { try { for await (const chunk of adapter.streamTranslator( @@ -38,6 +40,8 @@ export async function streamResponse( onUsage, onResponseId ?? (() => {}), tupleSchema, + usageHint, + onResponseMetadata, )) { try { await s.write(chunk); @@ -56,4 +60,3 @@ export async function streamResponse( } catch { /* client already gone */ } } } - diff --git a/src/routes/shared/stable-conversation-key.ts b/src/routes/shared/stable-conversation-key.ts new file mode 100644 index 00000000..4a7ad0f7 --- /dev/null +++ b/src/routes/shared/stable-conversation-key.ts @@ -0,0 +1,53 @@ +import { createHash } from "crypto"; +import type { CodexResponsesRequest } from "../../proxy/codex-api.js"; + +const LEADING_SYSTEM_REMINDER_RE = /^(?:[\s\S]*?<\/system-reminder>\s*)+/i; + +function normalizeConversationAnchorText(text: string): string { + return text.replace(LEADING_SYSTEM_REMINDER_RE, "").trimStart(); +} + +export function extractStableConversationSeed( + req: CodexResponsesRequest, +): { instructions: string; firstUserText: string } { + const instructions = (req.instructions ?? "").slice(0, 2000); + const input = Array.isArray(req.input) ? req.input : []; + + let firstUserText = ""; + for (const item of input) { + if (!("role" in item) || item.role !== "user") continue; + const content = item.content; + if (typeof content === "string") { + firstUserText = content; + } else if (Array.isArray(content)) { + firstUserText = content + .filter((part): part is { type: "input_text"; text: string } => + !!part && + typeof part === "object" && + "type" in part && + part.type === "input_text" && + "text" in part && + typeof part.text === "string") + .map((part) => part.text) + .join(""); + } + break; + } + + const normalizedFirstUserText = normalizeConversationAnchorText(firstUserText); + return { + instructions, + firstUserText: normalizedFirstUserText || firstUserText, + }; +} + +export function deriveStableConversationKey(req: CodexResponsesRequest): string | null { + const { instructions, firstUserText } = extractStableConversationSeed(req); + const model = req.model ?? ""; + if (!instructions && !firstUserText) return null; + + const seed = `${model}\x00${instructions}\x00${firstUserText}`; + const hash = createHash("sha256").update(seed).digest("hex"); + + return `${hash.slice(0, 8)}-${hash.slice(8, 12)}-${hash.slice(12, 16)}-${hash.slice(16, 20)}-${hash.slice(20, 32)}`; +} diff --git a/src/translation/anthropic-to-codex.ts b/src/translation/anthropic-to-codex.ts index 43b02f78..a3b272aa 100644 --- a/src/translation/anthropic-to-codex.ts +++ b/src/translation/anthropic-to-codex.ts @@ -14,6 +14,14 @@ import { buildInstructions, budgetToEffort } from "./shared-utils.js"; import type { ModelConfigOverride } from "./shared-utils.js"; import { anthropicToolsToCodex, anthropicToolChoiceToCodex } from "./tool-format.js"; +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function hasHostedWebSearchTool(tools: unknown[]): boolean { + return tools.some((tool) => isRecord(tool) && tool.type === "web_search"); +} + /** * Map Anthropic thinking budget_tokens to Codex reasoning effort. */ @@ -41,6 +49,13 @@ function extractTextContent( .join("\n"); } +const BILLING_HEADER_PREFIX = "x-anthropic-billing-header:"; + +function normalizeSystemInstructionText(text: string): string { + const trimmed = text.trim(); + return trimmed.startsWith(BILLING_HEADER_PREFIX) ? "" : trimmed; +} + /** * Build multimodal content (text + images) from Anthropic blocks. * Returns plain string if text-only, or CodexContentPart[] if images present. @@ -174,14 +189,18 @@ function contentToInputItems( export function translateAnthropicToCodexRequest( req: AnthropicMessagesRequest, modelConfig?: ModelConfigOverride, + options?: { injectHostedWebSearch?: boolean }, ): CodexResponsesRequest { // Extract system instructions let userInstructions: string; if (req.system) { if (typeof req.system === "string") { - userInstructions = req.system; + userInstructions = normalizeSystemInstructionText(req.system); } else { - userInstructions = req.system.map((b) => b.text).join("\n\n"); + userInstructions = req.system + .map((b) => normalizeSystemInstructionText(b.text)) + .filter(Boolean) + .join("\n\n"); } } else { userInstructions = "You are a helpful assistant."; @@ -211,7 +230,12 @@ export function translateAnthropicToCodexRequest( // Convert tools to Codex format const codexTools = req.tools?.length ? anthropicToolsToCodex(req.tools) : []; - const codexToolChoice = anthropicToolChoiceToCodex(req.tool_choice); + // Claude Code 在非 Anthropic 官方 base URL 下会禁用自身 ToolSearch。 + // 只有走本地 Codex 后端时才默认交给 Codex hosted web_search。 + if (options?.injectHostedWebSearch === true && !hasHostedWebSearchTool(codexTools)) { + codexTools.push({ type: "web_search" }); + } + const codexToolChoice = anthropicToolChoiceToCodex(req.tool_choice, req.tools); // Build request const request: CodexResponsesRequest = { diff --git a/src/translation/codex-event-extractor.ts b/src/translation/codex-event-extractor.ts index 8423e477..69e0915b 100644 --- a/src/translation/codex-event-extractor.ts +++ b/src/translation/codex-event-extractor.ts @@ -131,6 +131,10 @@ export async function* iterateCodexEvents( case "response.output_item.done": case "response.content_part.added": case "response.content_part.done": + case "response.output_text.annotation.added": + case "response.web_search_call.in_progress": + case "response.web_search_call.searching": + case "response.web_search_call.completed": // Lifecycle markers — no data extraction needed break; diff --git a/src/translation/codex-to-anthropic.ts b/src/translation/codex-to-anthropic.ts index f332c766..56aac097 100644 --- a/src/translation/codex-to-anthropic.ts +++ b/src/translation/codex-to-anthropic.ts @@ -19,6 +19,32 @@ import type { } from "../types/anthropic.js"; import { iterateCodexEvents, EmptyResponseError, type UsageInfo } from "./codex-event-extractor.js"; +interface CacheUsageHint { + reusedInputTokensUpperBound?: number; +} + +interface ResponseMetadata { + functionCallIds?: string[]; +} + +function resolveCacheUsage( + inputTokens: number, + cachedTokens: number | undefined, + usageHint?: CacheUsageHint, +): { cacheReadTokens: number; cacheCreationTokens: number } { + let cacheReadTokens = cachedTokens ?? 0; + if ( + cacheReadTokens <= 0 && + inputTokens > 0 && + usageHint?.reusedInputTokensUpperBound && + usageHint.reusedInputTokensUpperBound > 0 + ) { + cacheReadTokens = Math.min(usageHint.reusedInputTokensUpperBound, inputTokens); + } + const cacheCreationTokens = inputTokens > 0 ? Math.max(0, inputTokens - cacheReadTokens) : 0; + return { cacheReadTokens, cacheCreationTokens }; +} + /** Format an Anthropic SSE event with named event type */ function formatSSE(eventType: string, data: unknown): string { return `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`; @@ -38,6 +64,8 @@ export async function* streamCodexToAnthropic( onUsage?: (usage: UsageInfo) => void, onResponseId?: (id: string) => void, wantThinking?: boolean, + usageHint?: CacheUsageHint, + onResponseMetadata?: (metadata: ResponseMetadata) => void, ): AsyncGenerator { const msgId = `msg_${randomUUID().replace(/-/g, "").slice(0, 24)}`; let outputTokens = 0; @@ -48,8 +76,15 @@ export async function* streamCodexToAnthropic( let contentIndex = 0; let textBlockStarted = false; let thinkingBlockStarted = false; + const functionCallIds = new Set(); const callIdsWithDeltas = new Set(); + const publishFunctionCallId = (callId: string): void => { + if (functionCallIds.has(callId)) return; + functionCallIds.add(callId); + onResponseMetadata?.({ functionCallIds: [callId] }); + }; + // Helper: close an open block and advance the index function* closeBlock(blockType: "thinking" | "text"): Generator { yield formatSSE("content_block_stop", { @@ -147,6 +182,7 @@ export async function* streamCodexToAnthropic( if (evt.functionCallStart) { hasToolCalls = true; hasContent = true; + publishFunctionCallId(evt.functionCallStart.callId); yield* closeThinkingIfOpen(); yield* closeTextIfOpen(); @@ -176,6 +212,7 @@ export async function* streamCodexToAnthropic( } if (evt.functionCallDone) { + publishFunctionCallId(evt.functionCallDone.callId); // Emit full arguments if no deltas were streamed if (!callIdsWithDeltas.has(evt.functionCallDone.callId)) { yield formatSSE("content_block_delta", { @@ -214,8 +251,14 @@ export async function* streamCodexToAnthropic( if (evt.usage) { inputTokens = evt.usage.input_tokens; outputTokens = evt.usage.output_tokens; - cachedTokens = evt.usage.cached_tokens; - onUsage?.({ input_tokens: inputTokens, output_tokens: outputTokens, cached_tokens: cachedTokens, reasoning_tokens: evt.usage.reasoning_tokens }); + const adjusted = resolveCacheUsage(inputTokens, evt.usage.cached_tokens, usageHint); + cachedTokens = adjusted.cacheReadTokens || undefined; + onUsage?.({ + input_tokens: inputTokens, + output_tokens: outputTokens, + cached_tokens: cachedTokens, + reasoning_tokens: evt.usage.reasoning_tokens, + }); } // Inject error text if stream completed with no content if (!hasContent) { @@ -236,13 +279,17 @@ export async function* streamCodexToAnthropic( yield* closeTextIfOpen(); // 4. message_delta with stop_reason and usage + // cache_creation_input_tokens: tokens not served from cache (will be cached for next turn) + // cache_read_input_tokens: tokens served from cache (Codex cached_tokens) + const { cacheReadTokens, cacheCreationTokens } = resolveCacheUsage(inputTokens, cachedTokens, usageHint); yield formatSSE("message_delta", { type: "message_delta", delta: { stop_reason: hasToolCalls ? "tool_use" : "end_turn" }, usage: { input_tokens: inputTokens, output_tokens: outputTokens, - ...(cachedTokens != null ? { cache_read_input_tokens: cachedTokens } : {}), + ...(cacheCreationTokens > 0 ? { cache_creation_input_tokens: cacheCreationTokens } : {}), + ...(cacheReadTokens > 0 ? { cache_read_input_tokens: cacheReadTokens } : {}), }, }); @@ -261,6 +308,8 @@ export async function collectCodexToAnthropicResponse( rawResponse: Response, model: string, wantThinking?: boolean, + usageHint?: CacheUsageHint, + onResponseMetadata?: (metadata: ResponseMetadata) => void, ): Promise<{ response: AnthropicMessagesResponse; usage: UsageInfo; @@ -273,6 +322,7 @@ export async function collectCodexToAnthropicResponse( let outputTokens = 0; let cachedTokens: number | undefined; let responseId: string | null = null; + const functionCallIds = new Set(); // Collect tool calls const toolUseBlocks: AnthropicContentBlock[] = []; @@ -290,6 +340,7 @@ export async function collectCodexToAnthropicResponse( cachedTokens = evt.usage.cached_tokens; } if (evt.functionCallDone) { + functionCallIds.add(evt.functionCallDone.callId); let parsedInput: Record = {}; try { parsedInput = JSON.parse(evt.functionCallDone.arguments) as Record; @@ -309,6 +360,9 @@ export async function collectCodexToAnthropicResponse( } const hasToolCalls = toolUseBlocks.length > 0; + if (functionCallIds.size > 0) { + onResponseMetadata?.({ functionCallIds: Array.from(functionCallIds) }); + } const content: AnthropicContentBlock[] = []; // Thinking block comes first if requested and available if (wantThinking && fullReasoning) { @@ -323,10 +377,13 @@ export async function collectCodexToAnthropicResponse( content.push({ type: "text", text: "" }); } + const { cacheReadTokens: cacheRead, cacheCreationTokens: cacheCreation } = + resolveCacheUsage(inputTokens, cachedTokens, usageHint); const usage: AnthropicUsage = { input_tokens: inputTokens, output_tokens: outputTokens, - ...(cachedTokens != null ? { cache_read_input_tokens: cachedTokens } : {}), + ...(cacheCreation > 0 ? { cache_creation_input_tokens: cacheCreation } : {}), + ...(cacheRead > 0 ? { cache_read_input_tokens: cacheRead } : {}), }; return { diff --git a/src/translation/tool-format.ts b/src/translation/tool-format.ts index 8db25e58..729647bc 100644 --- a/src/translation/tool-format.ts +++ b/src/translation/tool-format.ts @@ -31,32 +31,105 @@ export interface CodexToolDefinition { strict?: boolean; } +export interface CodexHostedWebSearchTool { + type: "web_search"; + search_context_size?: "low" | "medium" | "high"; + user_location?: Record; +} + +export type CodexTool = CodexToolDefinition | CodexHostedWebSearchTool; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function isHostedWebSearchType(type: unknown): boolean { + return type === "web_search" || type === "web_search_preview"; +} + +function normalizeHostedWebSearchTool(tool: Record): CodexHostedWebSearchTool | null { + if (!isHostedWebSearchType(tool.type)) return null; + + const def: CodexHostedWebSearchTool = { type: "web_search" }; + if ( + tool.search_context_size === "low" || + tool.search_context_size === "medium" || + tool.search_context_size === "high" + ) { + def.search_context_size = tool.search_context_size; + } + if (isRecord(tool.user_location)) { + def.user_location = tool.user_location; + } + return def; +} + +function hasGeminiHostedSearch(tool: Record): boolean { + return isRecord(tool.googleSearch) || isRecord(tool.googleSearchRetrieval); +} + +function isAnthropicHostedSearchTool(tool: Record): boolean { + if (tool.type === "web_search_20250305" || tool.type === "web_search") return true; + // Claude Code 内置工具名是 WebSearch;这里改走 Codex hosted search, + // 避免把搜索降级成需要客户端执行的普通 function tool。 + return tool.name === "WebSearch"; +} + +function hasAnthropicHostedSearchToolChoice( + choiceName: string, + tools: AnthropicMessagesRequest["tools"], +): boolean { + if (choiceName === "WebSearch") return true; + if (!tools) return false; + return tools.some((tool) => { + if (!isRecord(tool)) return false; + if (tool.type !== "web_search_20250305" && tool.type !== "web_search") { + return false; + } + return typeof tool.name !== "string" || tool.name === choiceName; + }); +} + // ── OpenAI → Codex ────────────────────────────────────────────── export function openAIToolsToCodex( tools: NonNullable, -): CodexToolDefinition[] { - return tools.map((t) => { +): CodexTool[] { + const defs: CodexTool[] = []; + for (const t of tools) { + const hosted = normalizeHostedWebSearchTool(t); + if (hosted) { + defs.push(hosted); + continue; + } + + if (t.type !== "function") continue; const def: CodexToolDefinition = { type: "function", name: t.function.name, }; if (t.function.description) def.description = t.function.description; if (t.function.parameters) def.parameters = normalizeSchema(t.function.parameters); - return def; - }); + defs.push(def); + } + return defs; } export function openAIToolChoiceToCodex( choice: ChatCompletionRequest["tool_choice"], -): string | { type: "function"; name: string } | undefined { +): string | { type: "function"; name: string } | { type: "web_search" } | undefined { if (!choice) return undefined; if (typeof choice === "string") { // "none" | "auto" | "required" → pass through return choice; } + if (isHostedWebSearchType(choice.type)) { + return { type: "web_search" }; + } // { type: "function", function: { name } } → { type: "function", name } - return { type: "function", name: choice.function.name }; + const fn = isRecord(choice.function) ? choice.function : null; + const name = typeof fn?.name === "string" ? fn.name : ""; + return { type: "function", name }; } /** @@ -80,21 +153,30 @@ export function openAIFunctionsToCodex( export function anthropicToolsToCodex( tools: NonNullable, -): CodexToolDefinition[] { - return tools.map((t) => { +): CodexTool[] { + const defs: CodexTool[] = []; + for (const t of tools) { + if (isRecord(t) && isAnthropicHostedSearchTool(t)) { + defs.push({ type: "web_search" }); + continue; + } + + if (!("name" in t) || typeof t.name !== "string") continue; const def: CodexToolDefinition = { type: "function", name: t.name, }; - if (t.description) def.description = t.description; - if (t.input_schema) def.parameters = normalizeSchema(t.input_schema); - return def; - }); + if (isRecord(t) && typeof t.description === "string") def.description = t.description; + if (isRecord(t) && isRecord(t.input_schema)) def.parameters = normalizeSchema(t.input_schema); + defs.push(def); + } + return defs; } export function anthropicToolChoiceToCodex( choice: AnthropicMessagesRequest["tool_choice"], -): string | { type: "function"; name: string } | undefined { + tools?: AnthropicMessagesRequest["tools"], +): string | { type: "function"; name: string } | { type: "web_search" } | undefined { if (!choice) return undefined; switch (choice.type) { case "auto": @@ -102,6 +184,9 @@ export function anthropicToolChoiceToCodex( case "any": return "required"; case "tool": + if (hasAnthropicHostedSearchToolChoice(choice.name, tools)) { + return { type: "web_search" }; + } return { type: "function", name: choice.name }; default: return undefined; @@ -112,9 +197,13 @@ export function anthropicToolChoiceToCodex( export function geminiToolsToCodex( tools: NonNullable, -): CodexToolDefinition[] { - const defs: CodexToolDefinition[] = []; +): CodexTool[] { + const defs: CodexTool[] = []; for (const toolGroup of tools) { + if (hasGeminiHostedSearch(toolGroup)) { + defs.push({ type: "web_search" }); + } + if (toolGroup.functionDeclarations) { for (const fd of toolGroup.functionDeclarations) { const def: CodexToolDefinition = { diff --git a/src/types/anthropic.ts b/src/types/anthropic.ts index 93aa5a83..aec3f5bf 100644 --- a/src/types/anthropic.ts +++ b/src/types/anthropic.ts @@ -5,9 +5,14 @@ import { z } from "zod"; // --- Request --- +const AnthropicCacheControlSchema = z.object({ + type: z.string(), +}).passthrough(); + const AnthropicTextContentSchema = z.object({ type: z.literal("text"), text: z.string(), + cache_control: AnthropicCacheControlSchema.optional(), }); const AnthropicImageContentSchema = z.object({ @@ -113,12 +118,23 @@ export const AnthropicMessagesRequestSchema = z.object({ AnthropicThinkingAdaptiveSchema, ]) .optional(), - // Tool-related fields (accepted for compatibility, not forwarded to Codex) - tools: z.array(z.object({ - name: z.string(), - description: z.string().optional(), - input_schema: z.record(z.unknown()).optional(), - }).passthrough()).optional(), + // Tool-related fields. Custom tools are converted to Codex function tools; + // Anthropic hosted web search is converted to Codex hosted web_search. + tools: z.array(z.union([ + z.object({ + name: z.string(), + description: z.string().optional(), + input_schema: z.record(z.unknown()).optional(), + }).passthrough(), + z.object({ + type: z.enum(["web_search_20250305", "web_search"]), + name: z.string().optional(), + max_uses: z.number().int().positive().optional(), + allowed_domains: z.array(z.string()).optional(), + blocked_domains: z.array(z.string()).optional(), + user_location: z.record(z.unknown()).optional(), + }).passthrough(), + ])).optional(), tool_choice: z.union([ z.object({ type: z.literal("auto") }), z.object({ type: z.literal("any") }), diff --git a/src/types/codex-events.ts b/src/types/codex-events.ts index 9c10a84a..5fdb873a 100644 --- a/src/types/codex-events.ts +++ b/src/types/codex-events.ts @@ -123,10 +123,29 @@ export interface CodexOutputItemDoneEvent { call_id?: string; name?: string; arguments?: string; + content?: unknown[]; + actions?: unknown[]; [key: string]: unknown; }; } +export interface CodexOutputTextAnnotationAddedEvent { + type: "response.output_text.annotation.added"; + outputIndex: number; + contentIndex: number; + annotationIndex: number; + annotation: Record; +} + +export interface CodexWebSearchCallEvent { + type: + | "response.web_search_call.in_progress" + | "response.web_search_call.searching" + | "response.web_search_call.completed"; + outputIndex: number; + itemId: string; +} + export interface CodexIncompleteEvent { type: "response.incomplete"; response: CodexResponseData; @@ -165,6 +184,8 @@ export type TypedCodexEvent = | CodexCompletedEvent | CodexOutputItemAddedEvent | CodexOutputItemDoneEvent + | CodexOutputTextAnnotationAddedEvent + | CodexWebSearchCallEvent | CodexContentPartAddedEvent | CodexContentPartDoneEvent | CodexIncompleteEvent @@ -181,6 +202,62 @@ function isRecord(v: unknown): v is Record { return typeof v === "object" && v !== null && !Array.isArray(v); } +function safeStringify(value: unknown): string { + if (typeof value === "string") return value; + try { + const json = JSON.stringify(value); + return json ?? String(value); + } catch { + return String(value); + } +} + +function firstString(...values: unknown[]): string | undefined { + for (const value of values) { + if (typeof value === "string" && value.length > 0) return value; + } + return undefined; +} + +function getErrorRecord(data: unknown): Record | undefined { + if (!isRecord(data)) return undefined; + if (isRecord(data.error)) return data.error; + if (isRecord(data.response) && isRecord(data.response.error)) return data.response.error; + return data; +} + +export function extractCodexError(data: unknown): { type: string; code: string; message: string } { + const err = getErrorRecord(data); + if (!err) { + const message = safeStringify(data); + return { + type: "error", + code: message.trimStart().startsWith("{") ? "malformed_error_event" : "unknown", + message, + }; + } + + const dataRecord = isRecord(data) ? data : {}; + const response = isRecord(dataRecord.response) ? dataRecord.response : {}; + const message = + firstString( + err.message, + err.detail, + err.error_description, + dataRecord.message, + dataRecord.detail, + response.message, + response.detail, + ) ?? safeStringify(data); + + const type = firstString(err.type, dataRecord.type) ?? "error"; + const code = + firstString(err.code, response.code) ?? + (type !== "error" && type !== "response.failed" ? type : "unknown"); + + return { type, code, message }; +} + function parseResponseData(data: unknown): CodexResponseData | undefined { if (!isRecord(data)) return undefined; const resp = data.response; @@ -238,6 +315,30 @@ export function parseCodexEvent(evt: CodexSSEEvent): TypedCodexEvent { } return { type: "unknown", raw: data }; } + case "response.output_text.annotation.added": { + if (isRecord(data) && isRecord(data.annotation)) { + return { + type: "response.output_text.annotation.added", + outputIndex: typeof data.output_index === "number" ? data.output_index : 0, + contentIndex: typeof data.content_index === "number" ? data.content_index : 0, + annotationIndex: typeof data.annotation_index === "number" ? data.annotation_index : 0, + annotation: data.annotation, + }; + } + return { type: "unknown", raw: data }; + } + case "response.web_search_call.in_progress": + case "response.web_search_call.searching": + case "response.web_search_call.completed": { + if (isRecord(data)) { + return { + type: evt.event, + outputIndex: typeof data.output_index === "number" ? data.output_index : 0, + itemId: typeof data.item_id === "string" ? data.item_id : "", + }; + } + return { type: "unknown", raw: data }; + } case "response.reasoning_summary_text.delta": { if (isRecord(data) && typeof data.delta === "string") { return { type: "response.reasoning_summary_text.delta", delta: data.delta }; @@ -346,33 +447,17 @@ export function parseCodexEvent(evt: CodexSSEEvent): TypedCodexEvent { return { type: "unknown", raw: data }; } case "error": { - if (isRecord(data)) { - const err = isRecord(data.error) ? data.error : data; - return { - type: "error", - error: { - type: typeof err.type === "string" ? err.type : "error", - code: typeof err.code === "string" ? err.code : "unknown", - message: typeof err.message === "string" ? err.message : JSON.stringify(data), - }, - }; - } return { type: "error", - error: { type: "error", code: "unknown", message: String(data) }, + error: extractCodexError(data), }; } case "response.failed": { const resp = parseResponseData(data); if (isRecord(data)) { - const err = isRecord(data.error) ? data.error : {}; return { type: "response.failed", - error: { - type: typeof err.type === "string" ? err.type : "error", - code: typeof err.code === "string" ? err.code : "unknown", - message: typeof err.message === "string" ? err.message : JSON.stringify(data), - }, + error: extractCodexError(data), response: resp ?? {}, }; } @@ -389,6 +474,8 @@ export function parseCodexEvent(evt: CodexSSEEvent): TypedCodexEvent { ...(typeof data.item.call_id === "string" ? { call_id: data.item.call_id } : {}), ...(typeof data.item.name === "string" ? { name: data.item.name } : {}), ...(typeof data.item.arguments === "string" ? { arguments: data.item.arguments } : {}), + ...(Array.isArray(data.item.content) ? { content: data.item.content } : {}), + ...(Array.isArray(data.item.actions) ? { actions: data.item.actions } : {}), }, }; } diff --git a/src/types/gemini.ts b/src/types/gemini.ts index bf460694..ad808c40 100644 --- a/src/types/gemini.ts +++ b/src/types/gemini.ts @@ -48,13 +48,16 @@ export const GeminiGenerateContentRequestSchema = z.object({ contents: z.array(GeminiContentSchema).min(1), systemInstruction: GeminiContentSchema.optional(), generationConfig: GeminiGenerationConfigSchema.optional(), - // Tool-related fields (accepted for compatibility, not forwarded to Codex) + // Tool-related fields. googleSearch/googleSearchRetrieval are converted to + // Codex hosted web_search; functionDeclarations become function tools. tools: z.array(z.object({ functionDeclarations: z.array(z.object({ name: z.string(), description: z.string().optional(), parameters: z.record(z.unknown()).optional(), })).optional(), + googleSearch: z.record(z.unknown()).optional(), + googleSearchRetrieval: z.record(z.unknown()).optional(), }).passthrough()).optional(), toolConfig: z.object({ functionCallingConfig: z.object({ diff --git a/src/types/openai.ts b/src/types/openai.ts index 968bb73f..2ac7d276 100644 --- a/src/types/openai.ts +++ b/src/types/openai.ts @@ -46,18 +46,27 @@ export const ChatCompletionRequestSchema = z.object({ // Codex-specific extensions reasoning_effort: z.enum(["low", "medium", "high", "xhigh"]).optional(), service_tier: z.enum(["fast", "flex"]).nullable().optional(), - // New tool format (accepted for compatibility, not forwarded to Codex) - tools: z.array(z.object({ - type: z.literal("function"), - function: z.object({ - name: z.string(), - description: z.string().optional(), - parameters: z.record(z.unknown()).optional(), + // New tool format. In addition to function tools, accept hosted web search + // tools so OpenAI-compatible clients can ask Codex to search natively. + tools: z.array(z.union([ + z.object({ + type: z.literal("function"), + function: z.object({ + name: z.string(), + description: z.string().optional(), + parameters: z.record(z.unknown()).optional(), + }), }), - })).optional(), + z.object({ + type: z.enum(["web_search", "web_search_preview"]), + search_context_size: z.enum(["low", "medium", "high"]).optional(), + user_location: z.record(z.unknown()).optional(), + }).passthrough(), + ])).optional(), tool_choice: z.union([ z.enum(["none", "auto", "required"]), z.object({ type: z.literal("function"), function: z.object({ name: z.string() }) }), + z.object({ type: z.enum(["web_search", "web_search_preview"]) }).passthrough(), ]).optional(), parallel_tool_calls: z.boolean().optional(), // Structured output format (JSON mode / JSON Schema) diff --git a/tests/unit/auth/session-affinity.test.ts b/tests/unit/auth/session-affinity.test.ts index 452931aa..66a145b8 100644 --- a/tests/unit/auth/session-affinity.test.ts +++ b/tests/unit/auth/session-affinity.test.ts @@ -65,6 +65,47 @@ describe("SessionAffinityMap", () => { expect(map.lookupConversationId("resp_unknown")).toBeNull(); }); + it("looks up the latest response ID by conversation ID", () => { + map = new SessionAffinityMap(); + map.record("resp_1", "entry_1", "conv_same"); + map.record("resp_2", "entry_1", "conv_same"); + map.record("resp_3", "entry_1", "conv_other"); + expect(map.lookupLatestResponseIdByConversationId("conv_same")).toBe("resp_2"); + expect(map.lookupLatestResponseIdByConversationId("conv_other")).toBe("resp_3"); + }); + + it("looks up the latest instructions by conversation ID", () => { + map = new SessionAffinityMap(); + map.record("resp_1", "entry_1", "conv_same", undefined, "old instructions"); + map.record("resp_2", "entry_1", "conv_same", undefined, "sticky instructions"); + expect(map.lookupLatestInstructionsByConversationId("conv_same")).toBe("sticky instructions"); + expect(map.lookupLatestInstructionsByConversationId("conv_missing")).toBeNull(); + }); + + it("looks up stored input tokens for a response", () => { + map = new SessionAffinityMap(); + map.record("resp_1", "entry_1", "conv_same", undefined, undefined, 1234); + expect(map.lookupInputTokens("resp_1")).toBe(1234); + expect(map.lookupInputTokens("resp_missing")).toBeNull(); + }); + + it("looks up stored function call IDs for a response", () => { + map = new SessionAffinityMap(); + map.record("resp_1", "entry_1", "conv_same", undefined, undefined, undefined, [ + "call_a", + "call_b", + ]); + expect(map.lookupFunctionCallIds("resp_1")).toEqual(["call_a", "call_b"]); + expect(map.lookupFunctionCallIds("resp_missing")).toEqual([]); + }); + + it("looks up stored instructions for a response", () => { + map = new SessionAffinityMap(); + map.record("resp_1", "entry_1", "conv_same", undefined, "system prompt"); + expect(map.lookupInstructions("resp_1")).toBe("system prompt"); + expect(map.lookupInstructions("resp_missing")).toBeNull(); + }); + it("conversation ID is inherited across response chain", () => { map = new SessionAffinityMap(); // Turn 1: new conversation diff --git a/tests/unit/cache-reporting.test.ts b/tests/unit/cache-reporting.test.ts new file mode 100644 index 00000000..41442568 --- /dev/null +++ b/tests/unit/cache-reporting.test.ts @@ -0,0 +1,354 @@ +/** + * 验证缓存字段修复:cache_creation_input_tokens 和 cache_read_input_tokens + * 正确从 Codex cached_tokens 映射到 Anthropic usage 格式。 + * + * 不需要启动服务器,不需要真实账号,直接测翻译层。 + */ + +import { vi, describe, it, expect } from "vitest"; +import type { CodexApi } from "@src/proxy/codex-api.js"; +import { + collectCodexToAnthropicResponse, + streamCodexToAnthropic, +} from "@src/translation/codex-to-anthropic.js"; + +// ── Mock helpers ────────────────────────────────────────────────── + +/** + * 构造一个伪造的 Codex SSE 流 Response,包含指定的 usage 数据。 + * 模拟 response.output_text.delta + response.completed 事件序列。 + */ +function makeCodexResponse(opts: { + inputTokens: number; + outputTokens: number; + cachedTokens?: number; + text?: string; +}): Response { + const { inputTokens, outputTokens, cachedTokens, text = "hello" } = opts; + + const events = [ + // 响应创建事件 + `event: response.created\ndata: ${JSON.stringify({ + response: { id: "resp_test_001" }, + })}\n\n`, + // 文本输出 + `event: response.output_text.delta\ndata: ${JSON.stringify({ + delta: text, + })}\n\n`, + // 完成事件,携带 usage(含 cached_tokens) + `event: response.completed\ndata: ${JSON.stringify({ + response: { + id: "resp_test_001", + usage: { + input_tokens: inputTokens, + output_tokens: outputTokens, + input_tokens_details: cachedTokens != null ? { cached_tokens: cachedTokens } : {}, + output_tokens_details: { reasoning_tokens: 0 }, + }, + }, + })}\n\n`, + ]; + + const body = new ReadableStream({ + start(controller) { + for (const event of events) { + controller.enqueue(new TextEncoder().encode(event)); + } + controller.close(); + }, + }); + + return new Response(body, { status: 200 }); +} + +function makeToolCallCodexResponse(callId = "call_test_001"): Response { + const itemId = "fc_item_001"; + const events = [ + `event: response.created\ndata: ${JSON.stringify({ + response: { id: "resp_tool_001" }, + })}\n\n`, + `event: response.output_item.added\ndata: ${JSON.stringify({ + output_index: 0, + item: { + type: "function_call", + id: itemId, + call_id: callId, + name: "test_tool", + }, + })}\n\n`, + `event: response.function_call_arguments.done\ndata: ${JSON.stringify({ + item_id: itemId, + arguments: "{\"ok\":true}", + })}\n\n`, + `event: response.completed\ndata: ${JSON.stringify({ + response: { + id: "resp_tool_001", + usage: { + input_tokens: 100, + output_tokens: 10, + input_tokens_details: {}, + output_tokens_details: { reasoning_tokens: 0 }, + }, + }, + })}\n\n`, + ]; + + const body = new ReadableStream({ + start(controller) { + for (const event of events) { + controller.enqueue(new TextEncoder().encode(event)); + } + controller.close(); + }, + }); + + return new Response(body, { status: 200 }); +} + +/** + * 构造一个最小化的 CodexApi mock,parseStream 直接解析 SSE 文本。 + */ +function makeCodexApiMock(): CodexApi { + return { + parseStream: async function* (response: Response) { + const reader = response.body!.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + } + + // 简单解析 SSE 格式 + const blocks = buffer.split("\n\n").filter(Boolean); + for (const block of blocks) { + const lines = block.split("\n"); + let event = ""; + let data = ""; + for (const line of lines) { + if (line.startsWith("event: ")) event = line.slice(7); + if (line.startsWith("data: ")) data = line.slice(6); + } + if (event && data) { + yield { event, data: JSON.parse(data) }; + } + } + }, + } as unknown as CodexApi; +} + +// ── 非流式路径测试 ──────────────────────────────────────────────── + +describe("非流式响应 collectCodexToAnthropicResponse", () => { + it("无缓存时:cache_creation = input_tokens,cache_read 不出现", async () => { + const api = makeCodexApiMock(); + const res = makeCodexResponse({ inputTokens: 10000, outputTokens: 500 }); + + const { response } = await collectCodexToAnthropicResponse(api, res, "gpt-5.4"); + + expect(response.usage.cache_creation_input_tokens).toBe(10000); + expect(response.usage.cache_read_input_tokens).toBeUndefined(); + expect(response.usage.input_tokens).toBe(10000); + expect(response.usage.output_tokens).toBe(500); + }); + + it("缓存全部命中时:cache_read = cached_tokens,cache_creation = input - cached", async () => { + const api = makeCodexApiMock(); + const res = makeCodexResponse({ + inputTokens: 10000, + outputTokens: 500, + cachedTokens: 7000, + }); + + const { response } = await collectCodexToAnthropicResponse(api, res, "gpt-5.4"); + + expect(response.usage.cache_read_input_tokens).toBe(7000); + expect(response.usage.cache_creation_input_tokens).toBe(3000); // 10000 - 7000 + expect(response.usage.input_tokens).toBe(10000); + }); + + it("cachedTokens = 0 时:与无缓存情况一致,cache_read 不出现", async () => { + const api = makeCodexApiMock(); + const res = makeCodexResponse({ + inputTokens: 5000, + outputTokens: 200, + cachedTokens: 0, + }); + + const { response } = await collectCodexToAnthropicResponse(api, res, "gpt-5.4"); + + expect(response.usage.cache_creation_input_tokens).toBe(5000); + expect(response.usage.cache_read_input_tokens).toBeUndefined(); + }); + + it("大请求场景:70M input,1.18M cached(模拟 04-03 gpt-5.4)", async () => { + const api = makeCodexApiMock(); + const res = makeCodexResponse({ + inputTokens: 70_000_000, + outputTokens: 300_000, + cachedTokens: 1_183_600, + }); + + const { response } = await collectCodexToAnthropicResponse(api, res, "gpt-5.4"); + + expect(response.usage.cache_read_input_tokens).toBe(1_183_600); + expect(response.usage.cache_creation_input_tokens).toBe(70_000_000 - 1_183_600); + // cache_read / input 命中率 + const hitRate = 1_183_600 / 70_000_000; + expect(hitRate).toBeCloseTo(0.017, 2); // 约 1.7%,对应实际数据 + }); + + it("隐式续链但上游未返回 cached_tokens 时:使用复用上限推导 cache_read", async () => { + const api = makeCodexApiMock(); + const res = makeCodexResponse({ + inputTokens: 15_243, + outputTokens: 7, + cachedTokens: 0, + }); + + const { response } = await collectCodexToAnthropicResponse( + api, + res, + "gpt-5.4-mini", + false, + { reusedInputTokensUpperBound: 15_240 }, + ); + + expect(response.usage.cache_read_input_tokens).toBe(15_240); + expect(response.usage.cache_creation_input_tokens).toBe(3); + }); + + it("工具调用响应会回传 call_id 元数据,供隐式续链接力校验", async () => { + const api = makeCodexApiMock(); + const res = makeToolCallCodexResponse("call_for_collect"); + const metadataCallIds: string[] = []; + + await collectCodexToAnthropicResponse( + api, + res, + "gpt-5.4-mini", + false, + undefined, + (metadata) => metadataCallIds.push(...(metadata.functionCallIds ?? [])), + ); + + expect(metadataCallIds).toEqual(["call_for_collect"]); + }); +}); + +// ── 流式路径测试 ────────────────────────────────────────────────── + +describe("流式响应 streamCodexToAnthropic", () => { + async function collectSSE(gen: AsyncGenerator): Promise { + let result = ""; + for await (const chunk of gen) result += chunk; + return result; + } + + function parseMessageDelta(sseText: string): Record | null { + const lines = sseText.split("\n"); + for (let i = 0; i < lines.length; i++) { + if (lines[i] === "event: message_delta" && lines[i + 1]?.startsWith("data: ")) { + return JSON.parse(lines[i + 1].slice(6)); + } + } + return null; + } + + it("无缓存时:message_delta.usage 包含 cache_creation,无 cache_read", async () => { + const api = makeCodexApiMock(); + const res = makeCodexResponse({ inputTokens: 8000, outputTokens: 400 }); + + const sseText = await collectSSE(streamCodexToAnthropic(api, res, "gpt-5.4")); + const delta = parseMessageDelta(sseText); + + expect(delta).not.toBeNull(); + const usage = (delta as any).usage; + expect(usage.cache_creation_input_tokens).toBe(8000); + expect(usage.cache_read_input_tokens).toBeUndefined(); + }); + + it("缓存命中时:message_delta.usage 同时包含 cache_creation 和 cache_read", async () => { + const api = makeCodexApiMock(); + const res = makeCodexResponse({ + inputTokens: 8000, + outputTokens: 400, + cachedTokens: 6000, + }); + + const sseText = await collectSSE(streamCodexToAnthropic(api, res, "gpt-5.4")); + const delta = parseMessageDelta(sseText); + + const usage = (delta as any).usage; + expect(usage.cache_read_input_tokens).toBe(6000); + expect(usage.cache_creation_input_tokens).toBe(2000); // 8000 - 6000 + }); + + it("多轮对话场景:第二轮缓存命中率应显著高于第一轮", async () => { + const api = makeCodexApiMock(); + + // 第一轮:全新对话,无缓存 + const res1 = makeCodexResponse({ inputTokens: 5000, outputTokens: 300 }); + const sse1 = await collectSSE(streamCodexToAnthropic(api, res1, "gpt-5.4")); + const delta1 = parseMessageDelta(sse1) as any; + expect(delta1.usage.cache_creation_input_tokens).toBe(5000); + expect(delta1.usage.cache_read_input_tokens).toBeUndefined(); + + // 第二轮:同一对话,大量缓存命中 + const res2 = makeCodexResponse({ + inputTokens: 5500, // 多了 500 新 token + outputTokens: 280, + cachedTokens: 4800, // 前 4800 命中缓存 + }); + const sse2 = await collectSSE(streamCodexToAnthropic(api, res2, "gpt-5.4")); + const delta2 = parseMessageDelta(sse2) as any; + expect(delta2.usage.cache_read_input_tokens).toBe(4800); + expect(delta2.usage.cache_creation_input_tokens).toBe(700); // 5500 - 4800 + + // 第二轮缓存命中率 >> 0 + const hitRate2 = delta2.usage.cache_read_input_tokens / 5500; + expect(hitRate2).toBeGreaterThan(0.8); // 87% 命中率 + }); + + it("流式场景下也会为隐式续链补齐 cache_read", async () => { + const api = makeCodexApiMock(); + const res = makeCodexResponse({ + inputTokens: 15_243, + outputTokens: 7, + cachedTokens: 0, + }); + + const sseText = await collectSSE( + streamCodexToAnthropic(api, res, "gpt-5.4-mini", undefined, undefined, false, { + reusedInputTokensUpperBound: 15_240, + }), + ); + const delta = parseMessageDelta(sseText) as any; + + expect(delta.usage.cache_read_input_tokens).toBe(15_240); + expect(delta.usage.cache_creation_input_tokens).toBe(3); + }); + + it("流式工具调用响应会回传 call_id 元数据", async () => { + const api = makeCodexApiMock(); + const res = makeToolCallCodexResponse("call_for_stream"); + const metadataCallIds: string[] = []; + + await collectSSE( + streamCodexToAnthropic( + api, + res, + "gpt-5.4-mini", + undefined, + undefined, + false, + undefined, + (metadata) => metadataCallIds.push(...(metadata.functionCallIds ?? [])), + ), + ); + + expect(metadataCallIds).toEqual(["call_for_stream"]); + }); +}); diff --git a/tests/unit/proxy/codex-api-headers.test.ts b/tests/unit/proxy/codex-api-headers.test.ts index 2d272b46..31962017 100644 --- a/tests/unit/proxy/codex-api-headers.test.ts +++ b/tests/unit/proxy/codex-api-headers.test.ts @@ -148,5 +148,33 @@ describe("codex-api headers", () => { ); expect(headers["x-codex-turn-state"]).toBe("ws_turn_abc"); }); + + it("previous_response_id 场景下 WebSocket 失败不会降级成 HTTP delta-only", async () => { + const { PreviousResponseWebSocketError } = await import("@src/proxy/codex-api.js"); + mockCreateWebSocketResponse.mockRejectedValue(new Error("ws down")); + + const api = await createApi(); + await expect(api.createResponse( + makeRequest({ + previous_response_id: "resp_prev", + useWebSocket: true, + input: [{ type: "function_call_output", call_id: "call_1", output: "ok" }], + }), + )).rejects.toBeInstanceOf(PreviousResponseWebSocketError); + + expect(transport.post).not.toHaveBeenCalled(); + }); + + it("没有 previous_response_id 时 WebSocket 失败仍可安全降级到 HTTP", async () => { + mockCreateWebSocketResponse.mockRejectedValue(new Error("ws down")); + + const api = await createApi(); + await api.createResponse(makeRequest({ useWebSocket: true })); + + expect(transport.post).toHaveBeenCalledOnce(); + const body = JSON.parse(transport.lastBody!) as Record; + expect(body.previous_response_id).toBeUndefined(); + expect(body.useWebSocket).toBeUndefined(); + }); }); }); diff --git a/tests/unit/proxy/codex-sse.test.ts b/tests/unit/proxy/codex-sse.test.ts index d283d023..dec131ce 100644 --- a/tests/unit/proxy/codex-sse.test.ts +++ b/tests/unit/proxy/codex-sse.test.ts @@ -44,6 +44,28 @@ describe("parseSSEBlock", () => { const result = parseSSEBlock(block); expect(result?.data).toBe("plain text error"); }); + + it("parses non-standard pretty-printed JSON continuations", () => { + const block = [ + "event: error", + "data: {", + ' "error": {', + ' "code": "server_error",', + ' "message": "upstream failed"', + " }", + "}", + ].join("\n"); + const result = parseSSEBlock(block); + expect(result).toEqual({ + event: "error", + data: { + error: { + code: "server_error", + message: "upstream failed", + }, + }, + }); + }); }); describe("parseSSEStream", () => { diff --git a/tests/unit/routes/responses-premature-close.test.ts b/tests/unit/routes/responses-premature-close.test.ts index 7a07d47e..433669bc 100644 --- a/tests/unit/routes/responses-premature-close.test.ts +++ b/tests/unit/routes/responses-premature-close.test.ts @@ -91,6 +91,135 @@ describe("collectPassthrough premature close handling", () => { expect(result.usage).toEqual({ input_tokens: 10, output_tokens: 20 }); }); + it("backfills completed response output from streamed web_search and message items", async () => { + const api = createMockApi([ + { event: "response.created", data: { response: { id: "resp_search" } } }, + { + event: "response.output_item.done", + data: { + output_index: 0, + item: { + id: "ws_1", + type: "web_search_call", + status: "completed", + actions: [{ type: "search", query: "codex proxy" }], + }, + }, + }, + { + event: "response.output_item.done", + data: { + output_index: 1, + item: { + id: "msg_1", + type: "message", + role: "assistant", + status: "completed", + content: [{ type: "output_text", text: "搜索完成" }], + }, + }, + }, + { + event: "response.completed", + data: { + response: { + id: "resp_search", + output: [], + usage: { input_tokens: 11, output_tokens: 22 }, + }, + }, + }, + ]); + + const result = await collectPassthrough(api as never, new Response("ok"), "test-model"); + const response = result.response as { output: unknown[]; output_text?: string }; + expect(response.output).toHaveLength(2); + expect(response.output[0]).toMatchObject({ type: "web_search_call" }); + expect(response.output_text).toBe("搜索完成"); + }); + + it("synthesizes completed response output from text deltas when output items are absent", async () => { + const api = createMockApi([ + { event: "response.created", data: { response: { id: "resp_delta" } } }, + { event: "response.output_text.delta", data: { delta: "搜索" } }, + { event: "response.output_text.delta", data: { delta: "完成" } }, + { + event: "response.completed", + data: { + response: { + id: "resp_delta", + output: [], + usage: { input_tokens: 3, output_tokens: 4 }, + }, + }, + }, + ]); + + const result = await collectPassthrough(api as never, new Response("ok"), "test-model"); + const response = result.response as { output: Array<{ content: Array<{ text: string }> }>; output_text?: string }; + expect(response.output[0].content[0].text).toBe("搜索完成"); + expect(response.output_text).toBe("搜索完成"); + }); + + it("keeps output_text synchronized after tuple reconversion", async () => { + const api = createMockApi([ + { event: "response.created", data: { response: { id: "resp_tuple" } } }, + { + event: "response.output_item.done", + data: { + output_index: 0, + item: { + id: "msg_tuple", + type: "message", + role: "assistant", + status: "completed", + content: [ + { + type: "output_text", + text: "{\"point\":{\"0\":42,\"1\":\"hello\"}}", + }, + ], + }, + }, + }, + { + event: "response.completed", + data: { + response: { + id: "resp_tuple", + output: [], + usage: { input_tokens: 6, output_tokens: 7 }, + }, + }, + }, + ]); + + const tupleSchema = { + type: "object", + properties: { + point: { + type: "array", + prefixItems: [{ type: "number" }, { type: "string" }], + items: false, + }, + }, + }; + + const result = await collectPassthrough( + api as never, + new Response("ok"), + "test-model", + tupleSchema, + ); + const response = result.response as { + output: Array<{ content: Array<{ text: string }> }>; + output_text?: string; + }; + + expect(response.output[0].content[0].text).toBe("{\"point\":[42,\"hello\"]}"); + expect(response.output_text).toBe("{\"point\":[42,\"hello\"]}"); + }); + it("rethrows original error if response.completed was already received", async () => { const api = createMockApi( [ diff --git a/tests/unit/routes/shared/anthropic-session-id.test.ts b/tests/unit/routes/shared/anthropic-session-id.test.ts new file mode 100644 index 00000000..10d1c82d --- /dev/null +++ b/tests/unit/routes/shared/anthropic-session-id.test.ts @@ -0,0 +1,54 @@ +import { describe, it, expect } from "vitest"; +import { extractAnthropicClientConversationId } from "@src/routes/shared/anthropic-session-id.js"; +import type { AnthropicMessagesRequest } from "@src/types/anthropic.js"; + +function makeRequest(): AnthropicMessagesRequest { + return { + model: "gpt-5.4-mini", + max_tokens: 16, + messages: [{ role: "user", content: "hello" }], + stream: false, + }; +} + +describe("extractAnthropicClientConversationId", () => { + it("优先使用 x-claude-code-session-id 头", () => { + const req = { + ...makeRequest(), + metadata: { + user_id: JSON.stringify({ session_id: "body-session" }), + }, + }; + expect(extractAnthropicClientConversationId(req, "header-session")).toBe("header-session"); + }); + + it("头不存在时回退到 metadata.user_id.session_id", () => { + const req = { + ...makeRequest(), + metadata: { + user_id: JSON.stringify({ + session_id: "body-session", + device_id: "device-1", + }), + }, + }; + expect(extractAnthropicClientConversationId(req, undefined)).toBe("body-session"); + }); + + it("无可用 session_id 时返回 null", () => { + expect(extractAnthropicClientConversationId(makeRequest(), undefined)).toBeNull(); + expect(extractAnthropicClientConversationId({ + ...makeRequest(), + metadata: { user_id: "not-json" }, + }, undefined)).toBeNull(); + }); + + it("metadata 缺少 Claude 设备字段时不启用回退解析", () => { + expect(extractAnthropicClientConversationId({ + ...makeRequest(), + metadata: { + user_id: JSON.stringify({ session_id: "generic-session" }), + }, + }, undefined)).toBeNull(); + }); +}); diff --git a/tests/unit/routes/shared/proxy-handler-implicit-resume.test.ts b/tests/unit/routes/shared/proxy-handler-implicit-resume.test.ts new file mode 100644 index 00000000..8efd6ed2 --- /dev/null +++ b/tests/unit/routes/shared/proxy-handler-implicit-resume.test.ts @@ -0,0 +1,78 @@ +import { describe, it, expect } from "vitest"; +import { PreviousResponseWebSocketError } from "@src/proxy/codex-api.js"; +import { + shouldActivateImplicitResume, + shouldReplayFullInputAfterImplicitResumeError, +} from "@src/routes/shared/proxy-handler.js"; + +describe("shouldActivateImplicitResume", () => { + it("同账号且 system 未变化时允许隐式续链", () => { + expect(shouldActivateImplicitResume({ + implicitPrevRespId: "resp_prev", + continuationInputStart: 2, + inputLength: 3, + preferredEntryId: "entry_1", + acquiredEntryId: "entry_1", + currentInstructions: "system-a", + storedInstructions: "system-a", + })).toBe(true); + }); + + it("system 变化时禁止隐式续链", () => { + expect(shouldActivateImplicitResume({ + implicitPrevRespId: "resp_prev", + continuationInputStart: 2, + inputLength: 3, + preferredEntryId: "entry_1", + acquiredEntryId: "entry_1", + currentInstructions: "system-b", + storedInstructions: "system-a", + })).toBe(false); + }); + + it("回退到非 affinity 账号时禁止隐式续链", () => { + expect(shouldActivateImplicitResume({ + implicitPrevRespId: "resp_prev", + continuationInputStart: 2, + inputLength: 3, + preferredEntryId: "entry_1", + acquiredEntryId: "entry_2", + currentInstructions: "system-a", + storedInstructions: "system-a", + })).toBe(false); + }); + + it("tool_result 里的 call_id 属于上一轮 response 时允许隐式续链", () => { + expect(shouldActivateImplicitResume({ + implicitPrevRespId: "resp_prev", + continuationInputStart: 2, + inputLength: 3, + preferredEntryId: "entry_1", + acquiredEntryId: "entry_1", + currentInstructions: "system-a", + storedInstructions: "system-a", + requiredFunctionCallOutputIds: ["call_ok"], + storedFunctionCallIds: ["call_ok", "call_other"], + })).toBe(true); + }); + + it("tool_result 里的 call_id 不属于上一轮 response 时禁止隐式续链", () => { + expect(shouldActivateImplicitResume({ + implicitPrevRespId: "resp_prev", + continuationInputStart: 2, + inputLength: 3, + preferredEntryId: "entry_1", + acquiredEntryId: "entry_1", + currentInstructions: "system-a", + storedInstructions: "system-a", + requiredFunctionCallOutputIds: ["call_missing"], + storedFunctionCallIds: ["call_ok"], + })).toBe(false); + }); + + it("隐式续链 WebSocket 失败时会触发完整历史重放", () => { + const err = new PreviousResponseWebSocketError("ws down"); + expect(shouldReplayFullInputAfterImplicitResumeError(err, true)).toBe(true); + expect(shouldReplayFullInputAfterImplicitResumeError(err, false)).toBe(false); + }); +}); diff --git a/tests/unit/routes/upstream-auth-bypass.test.ts b/tests/unit/routes/upstream-auth-bypass.test.ts index d4a3b877..bfd0c697 100644 --- a/tests/unit/routes/upstream-auth-bypass.test.ts +++ b/tests/unit/routes/upstream-auth-bypass.test.ts @@ -105,6 +105,13 @@ describe("upstream direct routing without Codex auth", () => { expect(res.status).toBe(200); expect(mockHandleDirectRequest).toHaveBeenCalledTimes(1); + const [, , directReq] = mockHandleDirectRequest.mock.calls[0] as [ + unknown, + unknown, + { codexRequest: { tools?: unknown[] } }, + unknown, + ]; + expect(directReq.codexRequest.tools).toEqual([]); pool.destroy(); }); diff --git a/tests/unit/stable-conversation-key.test.ts b/tests/unit/stable-conversation-key.test.ts new file mode 100644 index 00000000..3350636f --- /dev/null +++ b/tests/unit/stable-conversation-key.test.ts @@ -0,0 +1,274 @@ +/** + * 验证 deriveStableConversationKey 的稳定性和正确性。 + * + * 这个函数是让 Codex 缓存真正工作的关键: + * 同一对话的所有轮次必须得到相同的 prompt_cache_key, + * 不同对话必须得到不同的 key。 + */ + +import { describe, it, expect } from "vitest"; +import type { CodexResponsesRequest } from "@src/proxy/codex-types.js"; +import { deriveStableConversationKey } from "@src/routes/shared/stable-conversation-key.js"; + +function makeReq(opts: { + instructions?: string; + messages: Array<{ role: "user" | "assistant"; text: string }>; +}): CodexResponsesRequest { + return { + model: "gpt-5.4", + input: opts.messages.map((m) => ({ + role: m.role, + content: m.text, + })), + stream: true as const, + store: false as const, + instructions: opts.instructions, + }; +} + +// ── 稳定性测试 ──────────────────────────────────────────────────────── + +describe("deriveStableConversationKey 稳定性", () => { + it("相同输入 → 相同 key(幂等性)", () => { + const req = makeReq({ + instructions: "你是一个助手", + messages: [{ role: "user", text: "你好" }], + }); + const k1 = deriveStableConversationKey(req); + const k2 = deriveStableConversationKey(req); + expect(k1).not.toBeNull(); + expect(k1).toBe(k2); + }); + + it("同一对话多轮次 → key 不变(固定 model + system + 第一条 user 消息)", () => { + const turn1 = makeReq({ + instructions: "你是代码助手", + messages: [{ role: "user", text: "帮我写排序" }], + }); + // 第二轮带上历史 + const turn2 = makeReq({ + instructions: "你是代码助手", + messages: [ + { role: "user", text: "帮我写排序" }, + { role: "assistant", text: "好的,这是冒泡排序..." }, + { role: "user", text: "换成快排" }, + ], + }); + // 第三轮更多历史 + const turn3 = makeReq({ + instructions: "你是代码助手", + messages: [ + { role: "user", text: "帮我写排序" }, + { role: "assistant", text: "好的,这是冒泡排序..." }, + { role: "user", text: "换成快排" }, + { role: "assistant", text: "这是快排实现..." }, + { role: "user", text: "加注释" }, + ], + }); + + const k1 = deriveStableConversationKey(turn1); + const k2 = deriveStableConversationKey(turn2); + const k3 = deriveStableConversationKey(turn3); + + expect(k1).toBe(k2); + expect(k2).toBe(k3); + }); + + it("不同对话(不同第一条消息)→ 不同 key", () => { + const conv1 = makeReq({ + instructions: "你是助手", + messages: [{ role: "user", text: "帮我写 Python 代码" }], + }); + const conv2 = makeReq({ + instructions: "你是助手", + messages: [{ role: "user", text: "解释量子计算" }], + }); + + const k1 = deriveStableConversationKey(conv1); + const k2 = deriveStableConversationKey(conv2); + expect(k1).not.toBe(k2); + }); + + it("相同第一条消息但系统提示不同 → 使用不同 key", () => { + const req1 = makeReq({ + instructions: "你是 Python 专家", + messages: [{ role: "user", text: "你好" }], + }); + const req2 = makeReq({ + instructions: "你是 Rust 专家", + messages: [{ role: "user", text: "你好" }], + }); + + expect(deriveStableConversationKey(req1)).not.toBe( + deriveStableConversationKey(req2) + ); + }); + + it("不同模型 → 不同 key(即使第一条消息相同)", () => { + const req1 = makeReq({ + instructions: "你是助手", + messages: [{ role: "user", text: "你好" }], + }); + const req2 = { + ...req1, + model: "gpt-5.4-mini", + } satisfies CodexResponsesRequest; + + expect(deriveStableConversationKey(req1)).not.toBe( + deriveStableConversationKey(req2) + ); + }); + + it("没有内容时返回 null(降级到随机 UUID)", () => { + const empty = makeReq({ messages: [] }); + expect(deriveStableConversationKey(empty)).toBeNull(); + }); + + it("input 缺失时返回 null,避免测试或异常请求触发 500", () => { + const req = { + model: "gpt-5.4", + stream: true as const, + store: false as const, + } as CodexResponsesRequest; + expect(deriveStableConversationKey(req)).toBeNull(); + }); + + it("input 缺失但有 instructions 时仍能生成 key", () => { + const req = { + model: "gpt-5.4", + instructions: "system prompt", + stream: true as const, + store: false as const, + } as CodexResponsesRequest; + + expect(deriveStableConversationKey(req)).toMatch( + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, + ); + }); + + it("input 不是数组时不抛异常", () => { + const req = { + model: "gpt-5.4", + instructions: "system prompt", + input: "not-an-array", + stream: true as const, + store: false as const, + } as unknown as CodexResponsesRequest; + + expect(() => deriveStableConversationKey(req)).not.toThrow(); + expect(deriveStableConversationKey(req)).not.toBeNull(); + }); + + it("没有 instructions 但有消息时仍能生成 key", () => { + const req = makeReq({ + messages: [{ role: "user", text: "简单问题" }], + }); + expect(deriveStableConversationKey(req)).not.toBeNull(); + }); +}); + +// ── 格式测试 ────────────────────────────────────────────────────────── + +describe("key 格式", () => { + it("生成 UUID 格式的字符串", () => { + const req = makeReq({ + instructions: "system", + messages: [{ role: "user", text: "hello" }], + }); + const key = deriveStableConversationKey(req); + expect(key).toMatch(/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/); + }); +}); + +// ── 关键场景:Claude Code 实际使用方式 ─────────────────────────────── + +describe("Claude Code 实际场景", () => { + const CLAUDE_CODE_SYSTEM = "You are Claude Code, Anthropic's official CLI for Claude. " + + "You are an interactive agent that helps users with software engineering tasks. ".repeat(5); + + it("Claude Code 多轮对话:所有轮次使用同一个缓存 key", () => { + // 模拟 Claude Code 实际发送的请求结构 + const firstTurn = makeReq({ + instructions: CLAUDE_CODE_SYSTEM, + messages: [ + { role: "user", text: "帮我优化这个函数" }, + ], + }); + + const secondTurn = makeReq({ + instructions: CLAUDE_CODE_SYSTEM, + messages: [ + { role: "user", text: "帮我优化这个函数" }, + { role: "assistant", text: "好的,我来看一下..." }, + { role: "user", text: "能加上类型注解吗" }, + ], + }); + + const thirdTurn = makeReq({ + instructions: CLAUDE_CODE_SYSTEM, + messages: [ + { role: "user", text: "帮我优化这个函数" }, + { role: "assistant", text: "好的,我来看一下..." }, + { role: "user", text: "能加上类型注解吗" }, + { role: "assistant", text: "加好了..." }, + { role: "user", text: "现在写测试" }, + ], + }); + + const k1 = deriveStableConversationKey(firstTurn); + const k2 = deriveStableConversationKey(secondTurn); + const k3 = deriveStableConversationKey(thirdTurn); + + // 所有轮次 key 相同 → Codex 可以命中缓存 + expect(k1).toBe(k2); + expect(k2).toBe(k3); + + // 确认不是 null + expect(k1).not.toBeNull(); + }); + + it("新的 Claude Code session(新问题)→ 不同 key", () => { + const session1Turn1 = makeReq({ + instructions: CLAUDE_CODE_SYSTEM, + messages: [{ role: "user", text: "帮我优化这个函数" }], + }); + const session2Turn1 = makeReq({ + instructions: CLAUDE_CODE_SYSTEM, + messages: [{ role: "user", text: "解释这段代码的作用" }], + }); + + expect(deriveStableConversationKey(session1Turn1)).not.toBe( + deriveStableConversationKey(session2Turn1) + ); + }); + + it("忽略 Claude 注入的 system-reminder 前缀变化", () => { + const prompt = "帮我优化这个函数,并且只给出最终答案。"; + const turn1 = makeReq({ + instructions: CLAUDE_CODE_SYSTEM, + messages: [{ + role: "user", + text: + "\nAs you answer the user's questions, you can use the following context:\n# currentDate\nToday's date is 2026-04-10\n\n" + + prompt, + }], + }); + const turn2 = makeReq({ + instructions: CLAUDE_CODE_SYSTEM, + messages: [ + { + role: "user", + text: + "\nAs you answer the user's questions, you can use the following context:\n# currentDate\nToday's date is 2026-04-11\n# cwd\n/tmp/demo\n\n" + + prompt, + }, + { role: "assistant", text: "ACK-1" }, + { role: "user", text: "继续" }, + ], + }); + + expect(deriveStableConversationKey(turn1)).toBe( + deriveStableConversationKey(turn2) + ); + }); +}); diff --git a/tests/unit/translation/anthropic-to-codex.test.ts b/tests/unit/translation/anthropic-to-codex.test.ts index 0aba56c6..f62bddaa 100644 --- a/tests/unit/translation/anthropic-to-codex.test.ts +++ b/tests/unit/translation/anthropic-to-codex.test.ts @@ -72,21 +72,36 @@ describe("translateAnthropicToCodexRequest", () => { expect(result.instructions).toBe("Be concise."); }); - it("joins text block array system into instructions", () => { - const result = translateAnthropicToCodexRequest( - makeRequest({ + it("joins text block array system into instructions", () => { + const result = translateAnthropicToCodexRequest( + makeRequest({ system: [ { type: "text" as const, text: "First paragraph." }, { type: "text" as const, text: "Second paragraph." }, ], }), - ); - expect(result.instructions).toBe("First paragraph.\n\nSecond paragraph."); - }); - - it("falls back to default instructions when no system provided", () => { - const result = translateAnthropicToCodexRequest(makeRequest()); - expect(result.instructions).toBe("You are a helpful assistant."); + ); + expect(result.instructions).toBe("First paragraph.\n\nSecond paragraph."); + }); + + it("strips Claude billing header noise from system blocks", () => { + const result = translateAnthropicToCodexRequest( + makeRequest({ + system: [ + { + type: "text" as const, + text: "x-anthropic-billing-header: cc_version=2.1.100.db0; cch=abcd1;", + }, + { type: "text" as const, text: "Keep answers short." }, + ], + }), + ); + expect(result.instructions).toBe("Keep answers short."); + }); + + it("falls back to default instructions when no system provided", () => { + const result = translateAnthropicToCodexRequest(makeRequest()); + expect(result.instructions).toBe("You are a helpful assistant."); }); }); @@ -316,23 +331,59 @@ describe("translateAnthropicToCodexRequest", () => { // ── Tools ──────────────────────────────────────────────────────────── - describe("tools", () => { - it("delegates tools array to anthropicToolsToCodex", () => { - const tools = [ - { name: "search", description: "Search the web", input_schema: {} }, + describe("tools", () => { + it("delegates tools array to anthropicToolsToCodex", () => { + const tools = [ + { name: "search", description: "Search the web", input_schema: {} }, ]; translateAnthropicToCodexRequest(makeRequest({ tools })); expect(anthropicToolsToCodex).toHaveBeenCalledWith(tools); }); - it("delegates tool_choice to anthropicToolChoiceToCodex", () => { - const toolChoice = { type: "auto" as const }; - translateAnthropicToCodexRequest(makeRequest({ tool_choice: toolChoice })); - - expect(anthropicToolChoiceToCodex).toHaveBeenCalledWith(toolChoice); - }); - }); + it("delegates tool_choice to anthropicToolChoiceToCodex", () => { + const toolChoice = { type: "auto" as const }; + translateAnthropicToCodexRequest(makeRequest({ tool_choice: toolChoice })); + + expect(anthropicToolChoiceToCodex).toHaveBeenCalledWith(toolChoice, undefined); + }); + + it("passes tools context when converting tool_choice", () => { + const tools = [ + { name: "web_search", description: "Custom search", input_schema: {} }, + ]; + const toolChoice = { type: "tool" as const, name: "web_search" }; + translateAnthropicToCodexRequest(makeRequest({ tools, tool_choice: toolChoice })); + + expect(anthropicToolChoiceToCodex).toHaveBeenCalledWith(toolChoice, tools); + }); + + it("does not inject hosted web_search by default", () => { + const result = translateAnthropicToCodexRequest(makeRequest()); + + expect(result.tools).toEqual([]); + }); + + it("injects hosted web_search when explicitly requested", () => { + const result = translateAnthropicToCodexRequest( + makeRequest(), + undefined, + { injectHostedWebSearch: true }, + ); + + expect(result.tools).toEqual([{ type: "web_search" }]); + }); + + it("does not duplicate hosted web_search when injected and already present", () => { + const result = translateAnthropicToCodexRequest( + makeRequest({ tools: [{ type: "web_search" as const, name: "web_search" }] }), + undefined, + { injectHostedWebSearch: true }, + ); + + expect(result.tools).toEqual([{ type: "web_search", name: "web_search" }]); + }); + }); // ── Fixed fields ───────────────────────────────────────────────────── diff --git a/tests/unit/translation/tool-format.test.ts b/tests/unit/translation/tool-format.test.ts index ca1071c5..4aa8ef01 100644 --- a/tests/unit/translation/tool-format.test.ts +++ b/tests/unit/translation/tool-format.test.ts @@ -1,13 +1,16 @@ -import { describe, it, expect } from "vitest"; -import { - openAIToolsToCodex, - openAIToolChoiceToCodex, +import { describe, it, expect } from "vitest"; +import { + openAIToolsToCodex, + openAIToolChoiceToCodex, openAIFunctionsToCodex, anthropicToolsToCodex, anthropicToolChoiceToCodex, geminiToolsToCodex, - geminiToolConfigToCodex, -} from "@src/translation/tool-format.js"; + geminiToolConfigToCodex, +} from "@src/translation/tool-format.js"; +import type { ChatCompletionRequest } from "@src/types/openai.js"; +import type { AnthropicMessagesRequest } from "@src/types/anthropic.js"; +import type { GeminiGenerateContentRequest } from "@src/types/gemini.js"; // ── openAIToolsToCodex ────────────────────────────────────────── @@ -464,3 +467,192 @@ describe("anthropicToolsToCodex additional edge cases", () => { expect(result[0].parameters).toEqual({ type: "object", properties: {} }); }); }); +// ── hosted web_search tool conversion ─────────────────────────────── + +describe("hosted web_search tool conversion", () => { + it("converts OpenAI hosted web_search_preview to Codex hosted web_search", () => { + const tools = [ + { + type: "web_search_preview", + search_context_size: "high", + user_location: { type: "approximate", country: "US" }, + }, + { + type: "function", + function: { + name: "lookup", + parameters: { type: "object" }, + }, + }, + ] satisfies NonNullable; + + expect(openAIToolsToCodex(tools)).toEqual([ + { + type: "web_search", + search_context_size: "high", + user_location: { type: "approximate", country: "US" }, + }, + { + type: "function", + name: "lookup", + parameters: { type: "object", properties: {} }, + }, + ]); + }); + + it("converts OpenAI hosted web_search tool_choice", () => { + expect(openAIToolChoiceToCodex({ type: "web_search_preview" })).toEqual({ + type: "web_search", + }); + }); + + it("converts Anthropic Claude Code WebSearch tool_choice to hosted web_search", () => { + expect(anthropicToolChoiceToCodex({ type: "tool", name: "WebSearch" })).toEqual({ + type: "web_search", + }); + }); + + it("converts Anthropic hosted web_search tool_choice to hosted web_search", () => { + expect( + anthropicToolChoiceToCodex( + { type: "tool", name: "web_search" }, + [{ type: "web_search_20250305", name: "web_search" }], + ), + ).toEqual({ type: "web_search" }); + }); + + it("preserves Anthropic lowercase custom web_search tool_choice as function tool", () => { + expect( + anthropicToolChoiceToCodex( + { type: "tool", name: "web_search" }, + [ + { + name: "web_search", + description: "Project-local search implementation", + input_schema: { type: "object", properties: { query: { type: "string" } } }, + }, + ], + ), + ).toEqual({ type: "function", name: "web_search" }); + }); + + it("preserves Anthropic custom tool_choice as function tool", () => { + expect(anthropicToolChoiceToCodex({ type: "tool", name: "lookup" })).toEqual({ + type: "function", + name: "lookup", + }); + }); + + it("converts Anthropic hosted web search to Codex hosted web_search", () => { + const tools = [ + { + type: "web_search_20250305", + name: "web_search", + max_uses: 3, + }, + { + name: "read_file", + input_schema: { type: "object" }, + }, + ] satisfies NonNullable; + + expect(anthropicToolsToCodex(tools)).toEqual([ + { type: "web_search" }, + { + type: "function", + name: "read_file", + parameters: { type: "object", properties: {} }, + }, + ]); + }); + + it("converts Claude Code WebSearch tool to Codex hosted web_search", () => { + const tools = [ + { + name: "WebSearch", + description: "Search the web", + input_schema: { + type: "object", + properties: { query: { type: "string" } }, + }, + }, + ] satisfies NonNullable; + + expect(anthropicToolsToCodex(tools)).toEqual([ + { type: "web_search" }, + ]); + }); + + it("preserves a lowercase custom web_search tool as a function tool", () => { + const tools = [ + { + name: "web_search", + description: "Project-local search implementation", + input_schema: { + type: "object", + properties: { query: { type: "string" } }, + }, + }, + ] satisfies NonNullable; + + expect(anthropicToolsToCodex(tools)).toEqual([ + { + type: "function", + name: "web_search", + description: "Project-local search implementation", + parameters: { + type: "object", + properties: { query: { type: "string" } }, + }, + }, + ]); + }); + + it("preserves other Claude Code tools as function tools", () => { + const tools = [ + { + name: "Bash", + description: "Run shell commands", + input_schema: { + type: "object", + properties: { command: { type: "string" } }, + }, + }, + ] satisfies NonNullable; + + expect(anthropicToolsToCodex(tools)).toEqual([ + { + type: "function", + name: "Bash", + description: "Run shell commands", + parameters: { + type: "object", + properties: { command: { type: "string" } }, + }, + }, + ]); + }); + + it("converts Gemini googleSearch to Codex hosted web_search", () => { + const tools = [ + { + googleSearch: {}, + functionDeclarations: [ + { + name: "lookup", + parameters: { type: "object" }, + }, + ], + }, + ] satisfies NonNullable; + + expect(geminiToolsToCodex(tools)).toEqual([ + { type: "web_search" }, + { + type: "function", + name: "lookup", + parameters: { type: "object", properties: {} }, + }, + ]); + }); +}); diff --git a/tests/unit/types/codex-events.test.ts b/tests/unit/types/codex-events.test.ts index 8e6805b4..40b8d6ac 100644 --- a/tests/unit/types/codex-events.test.ts +++ b/tests/unit/types/codex-events.test.ts @@ -73,3 +73,35 @@ describe("parseCodexEvent — content_part events", () => { expect(result.type).toBe("response.content_part.done"); }); }); + +describe("parseCodexEvent — error events", () => { + it("extracts response.failed errors from response.error", () => { + const raw = makeRaw("response.failed", { + type: "response.failed", + response: { + id: "resp_1", + error: { + code: "server_error", + message: "upstream failed", + }, + }, + }); + const result = parseCodexEvent(raw); + expect(result.type).toBe("response.failed"); + if (result.type === "response.failed") { + expect(result.error.code).toBe("server_error"); + expect(result.error.message).toBe("upstream failed"); + expect(result.response.id).toBe("resp_1"); + } + }); + + it("marks partial JSON error payloads as malformed", () => { + const raw = makeRaw("error", "{"); + const result = parseCodexEvent(raw); + expect(result.type).toBe("error"); + if (result.type === "error") { + expect(result.error.code).toBe("malformed_error_event"); + expect(result.error.message).toBe("{"); + } + }); +}); From 956c706dc4ffb538761cc1807c7bb9afc90cda38 Mon Sep 17 00:00:00 2001 From: Hango Date: Fri, 17 Apr 2026 14:12:06 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20Anthropic=20WebSearch?= =?UTF-8?q?=20=E7=9B=B4=E8=BF=9E=E8=BD=AC=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/routes/messages.ts | 5 +- src/translation/anthropic-to-codex.ts | 20 +++++-- src/translation/tool-format.ts | 40 ++++++++++--- .../unit/routes/upstream-auth-bypass.test.ts | 60 +++++++++++++++++++ .../translation/anthropic-to-codex.test.ts | 22 +++++++ tests/unit/translation/tool-format.test.ts | 55 +++++++++++++++-- 6 files changed, 186 insertions(+), 16 deletions(-) diff --git a/src/routes/messages.ts b/src/routes/messages.ts index d4e1f870..d16eab68 100644 --- a/src/routes/messages.ts +++ b/src/routes/messages.ts @@ -128,8 +128,11 @@ export function createMessagesRoutes( const codexRequest = translateAnthropicToCodexRequest(req, undefined, { injectHostedWebSearch: !allowUnauthenticated, + mapClaudeCodeWebSearch: !allowUnauthenticated && clientConversationId !== null, }); - codexRequest.useWebSocket = true; + if (!allowUnauthenticated) { + codexRequest.useWebSocket = true; + } const wantThinking = req.thinking?.type === "enabled" || req.thinking?.type === "adaptive"; const proxyReq = { codexRequest, diff --git a/src/translation/anthropic-to-codex.ts b/src/translation/anthropic-to-codex.ts index a3b272aa..a0b8c99f 100644 --- a/src/translation/anthropic-to-codex.ts +++ b/src/translation/anthropic-to-codex.ts @@ -12,7 +12,11 @@ import { parseModelName, getModelInfo } from "../models/model-store.js"; import { getConfig } from "../config.js"; import { buildInstructions, budgetToEffort } from "./shared-utils.js"; import type { ModelConfigOverride } from "./shared-utils.js"; -import { anthropicToolsToCodex, anthropicToolChoiceToCodex } from "./tool-format.js"; +import { + anthropicToolsToCodex, + anthropicToolChoiceToCodex, + type AnthropicToolConversionOptions, +} from "./tool-format.js"; function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value); @@ -189,7 +193,7 @@ function contentToInputItems( export function translateAnthropicToCodexRequest( req: AnthropicMessagesRequest, modelConfig?: ModelConfigOverride, - options?: { injectHostedWebSearch?: boolean }, + options?: { injectHostedWebSearch?: boolean; mapClaudeCodeWebSearch?: boolean }, ): CodexResponsesRequest { // Extract system instructions let userInstructions: string; @@ -229,13 +233,21 @@ export function translateAnthropicToCodexRequest( const modelInfo = getModelInfo(modelId); // Convert tools to Codex format - const codexTools = req.tools?.length ? anthropicToolsToCodex(req.tools) : []; + const toolConversionOptions: AnthropicToolConversionOptions | undefined = + options?.mapClaudeCodeWebSearch === true ? { mapClaudeCodeWebSearch: true } : undefined; + const codexTools = req.tools?.length + ? toolConversionOptions + ? anthropicToolsToCodex(req.tools, toolConversionOptions) + : anthropicToolsToCodex(req.tools) + : []; // Claude Code 在非 Anthropic 官方 base URL 下会禁用自身 ToolSearch。 // 只有走本地 Codex 后端时才默认交给 Codex hosted web_search。 if (options?.injectHostedWebSearch === true && !hasHostedWebSearchTool(codexTools)) { codexTools.push({ type: "web_search" }); } - const codexToolChoice = anthropicToolChoiceToCodex(req.tool_choice, req.tools); + const codexToolChoice = toolConversionOptions + ? anthropicToolChoiceToCodex(req.tool_choice, req.tools, toolConversionOptions) + : anthropicToolChoiceToCodex(req.tool_choice, req.tools); // Build request const request: CodexResponsesRequest = { diff --git a/src/translation/tool-format.ts b/src/translation/tool-format.ts index 729647bc..52b7866a 100644 --- a/src/translation/tool-format.ts +++ b/src/translation/tool-format.ts @@ -39,6 +39,10 @@ export interface CodexHostedWebSearchTool { export type CodexTool = CodexToolDefinition | CodexHostedWebSearchTool; +export interface AnthropicToolConversionOptions { + mapClaudeCodeWebSearch?: boolean; +} + function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value); } @@ -68,21 +72,41 @@ function hasGeminiHostedSearch(tool: Record): boolean { return isRecord(tool.googleSearch) || isRecord(tool.googleSearchRetrieval); } -function isAnthropicHostedSearchTool(tool: Record): boolean { +function looksLikeClaudeCodeWebSearchTool(tool: Record): boolean { + if (tool.name !== "WebSearch") return false; + + const description = typeof tool.description === "string" ? tool.description.toLowerCase() : ""; + if (!description.includes("search") || !description.includes("web")) return false; + + if (!isRecord(tool.input_schema)) return false; + const properties = isRecord(tool.input_schema.properties) ? tool.input_schema.properties : null; + return isRecord(properties?.query); +} + +function isAnthropicHostedSearchTool( + tool: Record, + options?: AnthropicToolConversionOptions, +): boolean { if (tool.type === "web_search_20250305" || tool.type === "web_search") return true; - // Claude Code 内置工具名是 WebSearch;这里改走 Codex hosted search, - // 避免把搜索降级成需要客户端执行的普通 function tool。 - return tool.name === "WebSearch"; + return options?.mapClaudeCodeWebSearch === true && looksLikeClaudeCodeWebSearchTool(tool); } function hasAnthropicHostedSearchToolChoice( choiceName: string, tools: AnthropicMessagesRequest["tools"], + options?: AnthropicToolConversionOptions, ): boolean { - if (choiceName === "WebSearch") return true; + if (choiceName === "WebSearch" && !tools) return options?.mapClaudeCodeWebSearch === true; if (!tools) return false; return tools.some((tool) => { if (!isRecord(tool)) return false; + if ( + choiceName === "WebSearch" && + options?.mapClaudeCodeWebSearch === true && + looksLikeClaudeCodeWebSearchTool(tool) + ) { + return true; + } if (tool.type !== "web_search_20250305" && tool.type !== "web_search") { return false; } @@ -153,10 +177,11 @@ export function openAIFunctionsToCodex( export function anthropicToolsToCodex( tools: NonNullable, + options?: AnthropicToolConversionOptions, ): CodexTool[] { const defs: CodexTool[] = []; for (const t of tools) { - if (isRecord(t) && isAnthropicHostedSearchTool(t)) { + if (isRecord(t) && isAnthropicHostedSearchTool(t, options)) { defs.push({ type: "web_search" }); continue; } @@ -176,6 +201,7 @@ export function anthropicToolsToCodex( export function anthropicToolChoiceToCodex( choice: AnthropicMessagesRequest["tool_choice"], tools?: AnthropicMessagesRequest["tools"], + options?: AnthropicToolConversionOptions, ): string | { type: "function"; name: string } | { type: "web_search" } | undefined { if (!choice) return undefined; switch (choice.type) { @@ -184,7 +210,7 @@ export function anthropicToolChoiceToCodex( case "any": return "required"; case "tool": - if (hasAnthropicHostedSearchToolChoice(choice.name, tools)) { + if (hasAnthropicHostedSearchToolChoice(choice.name, tools, options)) { return { type: "web_search" }; } return { type: "function", name: choice.name }; diff --git a/tests/unit/routes/upstream-auth-bypass.test.ts b/tests/unit/routes/upstream-auth-bypass.test.ts index bfd0c697..4dd4a4a6 100644 --- a/tests/unit/routes/upstream-auth-bypass.test.ts +++ b/tests/unit/routes/upstream-auth-bypass.test.ts @@ -139,6 +139,66 @@ describe("upstream direct routing without Codex auth", () => { pool.destroy(); }); + it("keeps Anthropic direct routing free of Codex websocket and hosted search rewrites", async () => { + const pool = new AccountPool(); + const app = createMessagesRoutes(pool, undefined, undefined, { + resolveMatch: vi.fn(() => ({ kind: "adapter", adapter: { tag: "custom-upstream" } })), + } as never); + + const res = await app.request("/v1/messages", { + method: "POST", + headers: { + "Content-Type": "application/json", + "anthropic-version": "2023-06-01", + }, + body: JSON.stringify({ + model: "claude-opus-4-6", + max_tokens: 16, + messages: [{ role: "user", content: "hello" }], + tools: [ + { + name: "WebSearch", + description: "Project-local lookup implementation", + input_schema: { + type: "object", + properties: { query: { type: "string" } }, + }, + }, + ], + tool_choice: { type: "tool", name: "WebSearch" }, + }), + }); + + expect(res.status).toBe(200); + expect(mockHandleDirectRequest).toHaveBeenCalledTimes(1); + const [, , directReq] = mockHandleDirectRequest.mock.calls[0] as [ + unknown, + unknown, + { + codexRequest: { + useWebSocket?: boolean; + tools?: unknown[]; + tool_choice?: unknown; + }; + }, + unknown, + ]; + expect(directReq.codexRequest.useWebSocket).toBeUndefined(); + expect(directReq.codexRequest.tools).toEqual([ + { + type: "function", + name: "WebSearch", + description: "Project-local lookup implementation", + parameters: { + type: "object", + properties: { query: { type: "string" } }, + }, + }, + ]); + expect(directReq.codexRequest.tool_choice).toEqual({ type: "function", name: "WebSearch" }); + pool.destroy(); + }); + it("allows Gemini direct upstream routing without local accounts", async () => { const pool = new AccountPool(); const app = createGeminiRoutes(pool, undefined, undefined, { diff --git a/tests/unit/translation/anthropic-to-codex.test.ts b/tests/unit/translation/anthropic-to-codex.test.ts index f62bddaa..032dd160 100644 --- a/tests/unit/translation/anthropic-to-codex.test.ts +++ b/tests/unit/translation/anthropic-to-codex.test.ts @@ -358,6 +358,28 @@ describe("translateAnthropicToCodexRequest", () => { expect(anthropicToolChoiceToCodex).toHaveBeenCalledWith(toolChoice, tools); }); + it("passes Claude Code WebSearch mapping option when requested", () => { + const tools = [ + { name: "WebSearch", description: "Search the web", input_schema: {} }, + ]; + const toolChoice = { type: "tool" as const, name: "WebSearch" }; + translateAnthropicToCodexRequest( + makeRequest({ tools, tool_choice: toolChoice }), + undefined, + { mapClaudeCodeWebSearch: true }, + ); + + expect(anthropicToolsToCodex).toHaveBeenCalledWith( + tools, + { mapClaudeCodeWebSearch: true }, + ); + expect(anthropicToolChoiceToCodex).toHaveBeenCalledWith( + toolChoice, + tools, + { mapClaudeCodeWebSearch: true }, + ); + }); + it("does not inject hosted web_search by default", () => { const result = translateAnthropicToCodexRequest(makeRequest()); diff --git a/tests/unit/translation/tool-format.test.ts b/tests/unit/translation/tool-format.test.ts index 4aa8ef01..eb1d7284 100644 --- a/tests/unit/translation/tool-format.test.ts +++ b/tests/unit/translation/tool-format.test.ts @@ -507,9 +507,13 @@ describe("hosted web_search tool conversion", () => { }); it("converts Anthropic Claude Code WebSearch tool_choice to hosted web_search", () => { - expect(anthropicToolChoiceToCodex({ type: "tool", name: "WebSearch" })).toEqual({ - type: "web_search", - }); + expect( + anthropicToolChoiceToCodex( + { type: "tool", name: "WebSearch" }, + undefined, + { mapClaudeCodeWebSearch: true }, + ), + ).toEqual({ type: "web_search" }); }); it("converts Anthropic hosted web_search tool_choice to hosted web_search", () => { @@ -543,6 +547,24 @@ describe("hosted web_search tool conversion", () => { }); }); + it("preserves uppercase custom WebSearch tool_choice as function tool", () => { + const tools = [ + { + name: "WebSearch", + description: "Project-local lookup implementation", + input_schema: { type: "object", properties: { query: { type: "string" } } }, + }, + ] satisfies NonNullable; + + expect( + anthropicToolChoiceToCodex( + { type: "tool", name: "WebSearch" }, + tools, + { mapClaudeCodeWebSearch: true }, + ), + ).toEqual({ type: "function", name: "WebSearch" }); + }); + it("converts Anthropic hosted web search to Codex hosted web_search", () => { const tools = [ { @@ -578,11 +600,36 @@ describe("hosted web_search tool conversion", () => { }, ] satisfies NonNullable; - expect(anthropicToolsToCodex(tools)).toEqual([ + expect(anthropicToolsToCodex(tools, { mapClaudeCodeWebSearch: true })).toEqual([ { type: "web_search" }, ]); }); + it("preserves uppercase custom WebSearch tool as a function tool", () => { + const tools = [ + { + name: "WebSearch", + description: "Project-local lookup implementation", + input_schema: { + type: "object", + properties: { query: { type: "string" } }, + }, + }, + ] satisfies NonNullable; + + expect(anthropicToolsToCodex(tools, { mapClaudeCodeWebSearch: true })).toEqual([ + { + type: "function", + name: "WebSearch", + description: "Project-local lookup implementation", + parameters: { + type: "object", + properties: { query: { type: "string" } }, + }, + }, + ]); + }); + it("preserves a lowercase custom web_search tool as a function tool", () => { const tools = [ {