From fa87e3a6764327c45acf21bebb4743f434795e7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=AA=20Anh=20Tu=E1=BA=A5n?= Date: Wed, 20 May 2026 20:41:01 +0700 Subject: [PATCH 1/4] Add LLM-powered context compaction for /responses/compact Replace upstream compact forwarding with local LLM-based compaction. Proxy intercepts /responses/compact, calls configured LLM provider with full context, validates output structure, and returns formatted Responses API compact response. Key changes: - Add LLM_COMPACT_SYSTEM_PROMPT with 10 required handoff sections - Add upstreamJsonRequest helper for direct /chat/completions calls - Add validation and retry loop (MAX_LLM_COMPACT_RETRIES = 2) - Add thread context cache and local git evidence collection - Add compact request/response debug dumps to ~/.codex-remote-proxy/compact-dumps - Add joinUrlPath for safe URL composition with baseUrl - Replace /responses/compact upstream forward with runLlmCompact Quality enforcement: - Validate all 10 section headers present - Check "Next action" is single executable step - Retry on validation failure with feedback - Preserve intent over transcript Co-Authored-By: Claude Opus 4.7 --- node/src/server.mjs | 886 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 881 insertions(+), 5 deletions(-) diff --git a/node/src/server.mjs b/node/src/server.mjs index 9625607..af5bd91 100644 --- a/node/src/server.mjs +++ b/node/src/server.mjs @@ -1,7 +1,8 @@ import http from "node:http"; import https from "node:https"; import { URL } from "node:url"; -import { readFileSync } from "node:fs"; +import { execFileSync } from "node:child_process"; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; import { resolve } from "node:path"; import zlib from "node:zlib"; import { decompress as zstdDecompress } from "fzstd"; @@ -23,6 +24,39 @@ const HOP_BY_HOP_HEADERS = new Set([ ]); let DEBUG_ENABLED = false; +const THREAD_CONTEXT_CACHE = new Map(); + +const LLM_COMPACT_SYSTEM_PROMPT = `You are a context compaction assistant. Your job is to create an execution handoff for a coding agent that was interrupted mid-task. + +CRITICAL REQUIREMENTS: +- Use EXACTLY these section headers (in English): + • Current task: + • User intent: + • Repo / location: + • Current state: + • Important files: + • Changes already made: + • Known verification: + • Unfinished work: + • Next action: + • Do not do: + +- Be SPECIFIC and EXECUTABLE +- Include file paths, line numbers, command outputs +- "Next action" must be a single clear command or edit +- If you cannot determine the task, say "RECOVERY REQUIRED" explicitly + +Quality rules: +- Preserve intent, not transcript +- Include only files directly relevant to the active task, each with a reason +- State "Unknown" explicitly instead of inventing +- Avoid validator metadata or JSON blobs +- Do not truncate mid-sentence +- Deduplicate file paths (prefer repo-relative) +- Extract verification from actual test/command output, not doc mentions +- Make "Next action" a single executable step, not a category list`; + +const MAX_LLM_COMPACT_RETRIES = 2; function resolveConfigPath() { return process.env[CONFIG_ENV_VAR] ? resolve(process.env[CONFIG_ENV_VAR]) : DEFAULT_CONFIG_PATH; @@ -65,7 +99,8 @@ function loadConfig(configPath = resolveConfigPath()) { }, proxy: { overrideAuthorization: typeof proxy.overrideAuthorization === "boolean" ? proxy.overrideAuthorization : true, - requestIdHeader: typeof proxy.requestIdHeader === "string" && proxy.requestIdHeader ? proxy.requestIdHeader : "x-client-request-id" + requestIdHeader: typeof proxy.requestIdHeader === "string" && proxy.requestIdHeader ? proxy.requestIdHeader : "x-client-request-id", + compactDumpDir: typeof proxy.compactDumpDir === "string" && proxy.compactDumpDir ? proxy.compactDumpDir : "" } }; } @@ -112,10 +147,56 @@ function safeBodyPreview(buffer, maxLen = 4096) { } } +function defaultCompactDumpDir() { + return resolve(process.env.HOME || process.cwd(), ".codex-remote-proxy", "compact-dumps"); +} + +function compactDumpDir(settings) { + if (settings.proxy.compactDumpDir) return resolve(settings.proxy.compactDumpDir); + return DEBUG_ENABLED ? defaultCompactDumpDir() : ""; +} + +function safeDumpName(value) { + return String(value || "unknown").replace(/[^A-Za-z0-9_.-]/g, "_").slice(0, 96) || "unknown"; +} + +function redactText(text) { + return String(text) + .replace(/sk-[A-Za-z0-9_-]+/g, "sk-...redacted") + .replace(/Bearer\s+[A-Za-z0-9._~+/-]+/gi, "Bearer ...redacted") + .replace(/"apiKey"\s*:\s*"[^"]+"/gi, '"apiKey":"...redacted"') + .replace(/"authorization"\s*:\s*"[^"]+"/gi, '"authorization":"...redacted"'); +} + +function redactedJson(value) { + return JSON.parse(redactText(JSON.stringify(value))); +} + +function writeCompactDump(settings, requestId, phase, payload) { + const dir = compactDumpDir(settings); + if (!dir) return; + try { + mkdirSync(dir, { recursive: true, mode: 0o700 }); + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const file = resolve(dir, `${timestamp}_${safeDumpName(requestId)}_${phase}.json`); + writeFileSync(file, `${JSON.stringify(redactedJson(payload), null, 2)}\n`, { encoding: "utf8", mode: 0o600 }); + } catch (error) { + log("warn", "Failed to write compact dump", { error: JSON.stringify(error.message) }); + } +} + +function joinUrlPath(baseUrl, pathname, search = "") { + const base = new URL(baseUrl); + const basePath = base.pathname.replace(/\/$/, ""); + const path = String(pathname || "").startsWith("/") ? pathname : `/${pathname || ""}`; + base.pathname = `${basePath}${path === "/" ? "" : path}` || "/"; + base.search = search; + return base; +} + function buildTargetUrl(baseUrl, requestUrl) { const incoming = new URL(requestUrl, "http://127.0.0.1"); - const path = incoming.pathname === "/" ? "" : incoming.pathname; - return new URL(`${baseUrl}${path}${incoming.search}`); + return joinUrlPath(baseUrl, incoming.pathname, incoming.search); } function formatAuthorization(upstream) { @@ -148,6 +229,649 @@ function autoDecompress(buffer) { try { return zlib.brotliDecompressSync(buffer); } catch { return null; } } +function normalizeResponsesPayload(payload) { + if (!payload || typeof payload !== "object" || Array.isArray(payload)) return false; + let changed = false; + + if (payload.model === "gpt-5.4") { + payload.model = "gpt-5.5"; + changed = true; + } + + if (payload.store !== false) { + payload.store = false; + changed = true; + } + + if (payload.reasoning === null || typeof payload.reasoning !== "object" || Array.isArray(payload.reasoning)) { + payload.reasoning = {}; + changed = true; + } + + if (!payload.reasoning.effort) { + payload.reasoning.effort = "medium"; + changed = true; + } + + if (!payload.reasoning.summary) { + payload.reasoning.summary = "auto"; + changed = true; + } + + if (!Array.isArray(payload.include)) { + payload.include = []; + changed = true; + } + + if (!payload.include.includes("reasoning.encrypted_content")) { + payload.include.push("reasoning.encrypted_content"); + changed = true; + } + + if (!Object.prototype.hasOwnProperty.call(payload, "parallel_tool_calls")) { + payload.parallel_tool_calls = true; + changed = true; + } + + if (!Object.prototype.hasOwnProperty.call(payload, "instructions")) { + payload.instructions = ""; + changed = true; + } + + return changed; +} + +function rewriteRequestBody(buffer, { normalizeResponses = false } = {}) { + if (!buffer.length) return { body: buffer, rewritten: false, normalized: false }; + let payload; + try { + payload = JSON.parse(buffer.toString("utf8")); + } catch { + return { body: buffer, rewritten: false, normalized: false }; + } + + let rewritten = false; + let normalized = false; + + if (normalizeResponses) { + normalized = normalizeResponsesPayload(payload); + rewritten = normalized; + } else if (payload && typeof payload === "object" && payload.model === "gpt-5.4") { + payload.model = "gpt-5.5"; + rewritten = true; + } + + if (rewritten) return { body: Buffer.from(JSON.stringify(payload)), rewritten, normalized }; + return { body: buffer, rewritten: false, normalized: false }; +} + +function extractTextParts(value, out = []) { + if (!value) return out; + if (typeof value === "string") { + out.push(value); + return out; + } + if (Array.isArray(value)) { + for (const item of value) extractTextParts(item, out); + return out; + } + if (typeof value === "object") { + if (typeof value.text === "string") out.push(value.text); + if (typeof value.input_text === "string") out.push(value.input_text); + if (typeof value.output_text === "string") out.push(value.output_text); + if (value.content) extractTextParts(value.content, out); + } + return out; +} + +function previewText(text, maxLen = 500) { + const normalized = String(text).replace(/\s+/g, " ").trim(); + return normalized.length > maxLen ? `${normalized.slice(0, maxLen)}...` : normalized; +} + +function parseResponseStream(buffer) { + const text = buffer.toString("utf8"); + const events = text.split(/\r?\n\r?\n/); + const deltas = []; + let completed = null; + + for (const event of events) { + const dataLines = event + .split(/\r?\n/) + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trimStart()); + if (!dataLines.length) continue; + + const dataText = dataLines.join("\n"); + if (dataText === "[DONE]") continue; + + let payload; + try { + payload = JSON.parse(dataText); + } catch { + continue; + } + + if (typeof payload.delta === "string") deltas.push(payload.delta); + if (payload.type === "response.completed" && payload.response) completed = payload.response; + } + + const textOutput = deltas.join(""); + if (completed) return normalizeCompactResponse(completed, textOutput); + return createCompactResponse(textOutput); +} + +function getResponseText(response) { + const texts = []; + for (const item of response?.output || []) { + if (item?.type !== "message") continue; + for (const content of item.content || []) { + if (content?.type === "output_text" && typeof content.text === "string") texts.push(content.text); + } + } + return texts.join("\n"); +} + +function createCompactResponse(text, base = {}) { + return { + id: base.id || `resp_compact_${Date.now()}`, + object: "response", + created_at: base.created_at || Math.floor(Date.now() / 1000), + status: "completed", + output: [{ + id: "msg_compact_0", + type: "message", + role: "assistant", + content: [{ type: "output_text", text, annotations: [] }] + }] + }; +} + +function normalizeCompactResponse(response, fallbackText = "") { + const text = getResponseText(response) || fallbackText; + if (text.trim()) return createCompactResponse(text, response); + return createCompactResponse("", response); +} + +const REQUIRED_HANDOFF_SECTIONS = [ + "Current task:", + "User intent:", + "Repo / location:", + "Current state:", + "Important files:", + "Changes already made:", + "Known verification:", + "Unfinished work:", + "Next action:", + "Do not do:" +]; + +function compactOutputQuality(response, cachedContext) { + const text = getResponseText(response); + const hasFunctionCall = Array.isArray(response?.output) && response.output.some((item) => item?.type === "function_call"); + const saysNoTask = /không có task|chưa có task|no task|no specific task|chờ nhiệm vụ|gửi task tiếp/i.test(text); + const acknowledgedOnly = /đã nhận context|đã nạp hướng dẫn|sẽ theo AGENTS\.md|send.*task|gửi.*task/i.test(text); + const missingSections = REQUIRED_HANDOFF_SECTIONS.filter((section) => !text.includes(section)); + const tooShort = text.trim().length < 500; + const missesFiles = cachedContext?.hasFiles && !cachedContext.files.some((file) => text.includes(file)); + const hasValidatorMetadata = /Proxy note:|\{"bad":true|"missesFiles"|"hasFunctionCall"/.test(text); + const genericUnknown = /Current task:\s*Unknown from compacted context/i.test(text); + const bad = hasFunctionCall || tooShort || saysNoTask || acknowledgedOnly || missingSections.length > 0 || missesFiles || hasValidatorMetadata || genericUnknown; + return { bad, hasFunctionCall, tooShort, saysNoTask, acknowledgedOnly, missingSections, missesFiles, hasValidatorMetadata, genericUnknown, textLength: text.length }; +} + +function analyzeCompactInput(buffer) { + let payload; + try { + payload = JSON.parse(buffer.toString("utf8")); + } catch (error) { + return { parseError: error.message, bytes: buffer.length }; + } + + const input = Array.isArray(payload.input) ? payload.input : []; + const messages = input.filter((item) => item && item.type === "message"); + const byRole = {}; + const recent = []; + + for (const message of messages) byRole[message.role || "unknown"] = (byRole[message.role || "unknown"] || 0) + 1; + for (const message of messages.slice(-6)) { + const text = extractTextParts(message.content).join("\n"); + recent.push({ role: message.role || "unknown", phase: message.phase || null, text: previewText(text, 350) }); + } + + const allText = extractTextParts(payload.input).join("\n"); + const fileMentions = [...new Set((allText.match(/[A-Za-z0-9_./-]+\.(?:mjs|js|ts|tsx|json|md|html|css|toml|yml|yaml)/g) || []))].slice(-20); + return { model: payload.model, inputItems: input.length, messageCount: messages.length, byRole, bytes: buffer.length, fileMentions, recent }; +} + +function analyzeCompactOutput(response) { + const text = getResponseText(response); + return { + status: response?.status, + outputItems: Array.isArray(response?.output) ? response.output.length : 0, + textLength: text.length, + text: previewText(text, 1200), + mentionsFiles: /\.(mjs|js|ts|tsx|json|md|html|css|toml|ya?ml)\b/.test(text), + mentionsNext: /next|tiếp|todo|remaining|còn|verify|test|lint|typecheck|commit/i.test(text), + saysNoTask: /không có task|chưa có task|no task|no specific task|chờ nhiệm vụ|gửi task tiếp/i.test(text) + }; +} + +function cacheKeyFromHeaders(req) { + return String(req.headers["thread_id"] || req.headers["thread-id"] || req.headers["session_id"] || req.headers["session-id"] || ""); +} + +function interestingFiles(files) { + const noise = /(^|\/)SKILL\.md$|(^|\/)AGENTS\.md$|(^|\/)CLAUDE\.md$|(^|\/)MEMORY\.md$|(^|\/)memory_summary\.md$|rollout_summaries\/|\.codex\/|\.omx\/|cloudflared|pulsecall|deep-research|audit-.*\.md$|report.*\.html$|screenshot-manifest\.js$|ralph-progress\.js$|project\.ya?ml$|config\.(toml|ya?ml)$/i; + const relevant = /(^|\/)(src|tests|test|app|lib|sse|routes?)\/|\.(test|spec)\.(mjs|js|ts|tsx)$/i; + const unique = [...new Set(files)] + .map((file) => String(file).replace(/^\/+Users\/tuan\/Dev\/VibeLab\/9router\//, "")) + .filter((file) => file && !file.startsWith("r0/") && !file.startsWith("r1/") && !noise.test(file)); + const focused = unique.filter((file) => relevant.test(file)); + return (focused.length ? focused : unique).slice(-35); +} + +function workspacePath(value) { + const path = String(value || ""); + if (!path || path === "unknown" || path.includes("\n")) return ""; + if (!existsSync(path)) return ""; + try { + execFileSync("git", ["-C", path, "rev-parse", "--show-toplevel"], { encoding: "utf8", timeout: 1500, stdio: ["ignore", "pipe", "ignore"] }); + return path; + } catch { + return ""; + } +} + +function gitOutput(cwd, args) { + try { + return execFileSync("git", ["-C", cwd, ...args], { encoding: "utf8", timeout: 2000, stdio: ["ignore", "pipe", "ignore"] }).trim(); + } catch { + return ""; + } +} + +function collectLocalEvidence(workspaces) { + const workspace = (workspaces || []).map(workspacePath).find(Boolean); + if (!workspace) return null; + const root = gitOutput(workspace, ["rev-parse", "--show-toplevel"]) || workspace; + const status = gitOutput(workspace, ["status", "--short"]); + const diffStat = gitOutput(workspace, ["diff", "--stat"]); + const branch = gitOutput(workspace, ["branch", "--show-current"]); + return { + workspace: root, + branch: branch || "Unknown", + status: status || "Clean or no status output", + diffStat: diffStat || "No unstaged diff stat", + changedFiles: interestingFiles(status.split(/\r?\n/).map((line) => line.replace(/^..\s+/, "")).filter(Boolean)) + }; +} + +function inferWorkspacesFromFiles(files) { + const candidates = []; + for (const file of files || []) { + const text = String(file); + const match = text.match(/^(\/Users\/[^\s]+\/Dev\/VibeLab\/[^\/\s]+)/); + if (match) candidates.push(match[1]); + } + return [...new Set(candidates)]; +} + +function isProxyFallbackText(text) { + return /Current task:\s*Unknown from compacted context/i.test(text) || /Previous compact lacked a concrete task description/i.test(text); +} + +function stripProxyFallbackMessages(input) { + if (!Array.isArray(input)) return []; + return input.filter((item) => { + if (!item || item.type !== "message") return true; + return !isProxyFallbackText(extractTextParts(item.content).join("\n")); + }); +} + +function summarizeRequestForCache(req, buffer) { + let payload; + try { + payload = JSON.parse(buffer.toString("utf8")); + } catch { + return null; + } + + const input = stripProxyFallbackMessages(Array.isArray(payload.input) ? payload.input : []); + const text = extractTextParts(input).join("\n"); + const files = interestingFiles(text.match(/[A-Za-z0-9_./-]+\.(?:mjs|js|ts|tsx|json|md|html|css|toml|yml|yaml)/g) || []); + const snippets = []; + + for (const message of input.slice(-10)) { + if (!message || message.type !== "message") continue; + const messageText = extractTextParts(message.content).join("\n"); + if (!messageText.trim() || isProxyFallbackText(messageText)) continue; + snippets.push(`${message.role || "unknown"}${message.phase ? `/${message.phase}` : ""}: ${previewText(messageText, 650)}`); + } + + let metadata = null; + const rawMetadata = req.headers["x-codex-turn-metadata"]; + if (typeof rawMetadata === "string") { + try { + const parsed = JSON.parse(rawMetadata); + metadata = { + workspaces: parsed.workspaces ? Object.keys(parsed.workspaces) : [], + hasChanges: Object.values(parsed.workspaces || {}).some((workspace) => Boolean(workspace?.has_changes)), + turnId: parsed.turn_id || null + }; + } catch {} + } + + return { at: new Date().toISOString(), model: payload.model, path: req.url, files, metadata, snippets: snippets.slice(-6) }; +} + +function rememberThreadContext(req, buffer) { + if (!req.url?.startsWith("/responses") || req.url.startsWith("/responses/compact")) return; + const key = cacheKeyFromHeaders(req); + if (!key) return; + const summary = summarizeRequestForCache(req, buffer); + if (!summary) return; + const existing = THREAD_CONTEXT_CACHE.get(key) || { requests: [] }; + existing.requests.push(summary); + existing.requests = existing.requests.slice(-6); + THREAD_CONTEXT_CACHE.set(key, existing); + debugLog("THREAD CONTEXT CACHE", { key, requests: existing.requests.length, latest: { at: summary.at, files: summary.files, metadata: summary.metadata, snippets: summary.snippets } }); +} + +function getCachedContext(req) { + const key = cacheKeyFromHeaders(req); + const cached = key ? THREAD_CONTEXT_CACHE.get(key) : null; + const requests = cached?.requests || []; + const files = interestingFiles(requests.flatMap((request) => request.files || [])); + const workspaces = [...new Set(requests.flatMap((request) => request.metadata?.workspaces || []))]; + const hasChanges = requests.some((request) => request.metadata?.hasChanges); + const snippets = requests.flatMap((request) => request.snippets || []).slice(-12); + const localEvidence = collectLocalEvidence(workspaces); + const evidenceFiles = localEvidence?.changedFiles || []; + const allFiles = interestingFiles([...files, ...evidenceFiles]); + return { key, requests, files: allFiles, workspaces, hasChanges: hasChanges || Boolean(localEvidence?.status && localEvidence.status !== "Clean or no status output"), snippets, hasFiles: allFiles.length > 0, localEvidence }; +} + +function buildCachedContextText(cachedContext) { + if (!cachedContext.requests.length) return ""; + const lines = [ + "REMOTE COMPACT HANDOFF CONTEXT. This is a summarization task, not an implementation turn.", + "Ignore any previous proxy-generated recovery handoff that says 'Current task: Unknown from compacted context'. It was a failed fallback, not source truth.", + "Do not call tools or functions. Do not continue coding. Return one assistant message containing only an execution handoff.", + "Preserve the active user goal and ordered todo/plan verbatim when present. Put the ordered plan in Current task, Unfinished work, and make Next action the first unfinished item.", + "Use this exact shape and headings:", + "Current task:\n", + "User intent:\n", + "Repo / location:\n", + "Current state:\n", + "Important files:\n- : ", + "Changes already made:\n- ", + "Known verification:\n- Passed: \n- Failed: \n- Not run: ", + "Unfinished work:\n- ", + "Next action:\n", + "Do not do:\n- \n- Do not reset or discard uncommitted changes.", + "Quality rules: preserve intent, not transcript. Include only files directly relevant to the active task, each with a reason. State Unknown explicitly instead of inventing. Avoid validator metadata or JSON blobs.", + `Thread/session: ${cachedContext.key || "unknown"}` + ]; + if (cachedContext.workspaces.length) lines.push(`Workspace(s): ${cachedContext.workspaces.join(", ")}`); + if (cachedContext.hasChanges) lines.push("Working tree has changes according to Codex metadata or local git evidence."); + if (cachedContext.localEvidence) { + lines.push(`Local git evidence workspace: ${cachedContext.localEvidence.workspace}`); + lines.push(`Local git branch: ${cachedContext.localEvidence.branch}`); + lines.push(`Local git status --short:\n${cachedContext.localEvidence.status}`); + lines.push(`Local git diff --stat:\n${cachedContext.localEvidence.diffStat}`); + } + if (cachedContext.files.length) lines.push(`Candidate files from recent context and local git evidence: ${cachedContext.files.join(", ")}`); + if (cachedContext.snippets.length) { + lines.push("Recent excerpts to extract signal from:"); + for (const snippet of cachedContext.snippets) lines.push(`- ${snippet}`); + } + lines.push("If the active task, important files, verification, or next action cannot be identified, output the safe recovery handoff with Unknown fields and next action: run git status --short, git diff --stat, and inspect focused diffs."); + return lines.join("\n"); +} + +function injectCachedContextIntoCompact(req, buffer) { + const cachedContext = getCachedContext(req); + const cachedText = buildCachedContextText(cachedContext); + if (!cachedText) return { body: buffer, injected: false, reason: "no_cache", cachedContext }; + + let payload; + try { + payload = JSON.parse(buffer.toString("utf8")); + } catch (error) { + return { body: buffer, injected: false, reason: error.message, cachedContext }; + } + + const input = stripProxyFallbackMessages(Array.isArray(payload.input) ? payload.input : []); + const existingText = extractTextParts(input).join("\n"); + if (existingText.includes("REMOTE COMPACT HANDOFF CONTEXT")) { + payload.input = input; + return { body: Buffer.from(JSON.stringify(payload)), injected: false, reason: "already_injected", cachedContext }; + } + + payload.tools = []; + payload.tool_choice = "none"; + payload.parallel_tool_calls = false; + payload.input = [ + { type: "message", role: "developer", content: [{ type: "input_text", text: cachedText }] }, + ...input + ]; + + return { body: Buffer.from(JSON.stringify(payload)), injected: true, reason: "ok", cachedChars: cachedText.length, cachedContext }; +} + +function fallbackCompactContextFromRequest(buffer) { + let analysis; + try { + analysis = analyzeCompactInput(buffer); + } catch { + analysis = null; + } + const mentionedFiles = analysis?.fileMentions || []; + const inferredWorkspaces = inferWorkspacesFromFiles(mentionedFiles); + const localEvidence = collectLocalEvidence(inferredWorkspaces); + const files = interestingFiles([...mentionedFiles, ...(localEvidence?.changedFiles || [])]); + const snippets = (analysis?.recent || []) + .map((item) => `${item.role || "unknown"}${item.phase ? `/${item.phase}` : ""}: ${item.text || ""}`) + .filter((item) => item.trim() && !/|\[Request interrupted by user\]/i.test(item)) + .slice(-8); + return { + key: "compact-request", + requests: snippets.length ? [{}] : [], + files, + workspaces: inferredWorkspaces, + hasChanges: Boolean(localEvidence?.status && localEvidence.status !== "Clean or no status output"), + snippets, + hasFiles: files.length > 0, + localEvidence + }; +} + +function mergeCompactContexts(primary, secondary) { + if (!primary?.requests?.length) return secondary; + if (!secondary?.requests?.length) return primary; + const localEvidence = primary.localEvidence || secondary.localEvidence; + const files = interestingFiles([...(primary.files || []), ...(secondary.files || []), ...(localEvidence?.changedFiles || [])]); + const workspaces = [...new Set([...(primary.workspaces || []), ...(secondary.workspaces || [])])]; + const snippets = [...(primary.snippets || []), ...(secondary.snippets || [])] + .filter((item) => !/|\[Request interrupted by user\]/i.test(item)) + .slice(-16); + return { + key: primary.key || secondary.key, + requests: [...(primary.requests || []), ...(secondary.requests || [])].slice(-8), + files, + workspaces, + hasChanges: Boolean(primary.hasChanges || secondary.hasChanges || (localEvidence?.status && localEvidence.status !== "Clean or no status output")), + snippets, + hasFiles: files.length > 0, + localEvidence + }; +} + +function compactContextForSelection(cachedContext, body) { + return mergeCompactContexts(cachedContext, fallbackCompactContextFromRequest(body)); +} + +function snippetText(cachedContext) { + return cachedContext.snippets.join("\n"); +} + +function sentenceFromText(text, patterns) { + const normalized = String(text).replace(/\s+/g, " ").trim(); + const sentences = normalized.split(/(?<=[.!?。])\s+|\s+-\s+/).map((item) => item.trim()).filter(Boolean); + for (const pattern of patterns) { + const found = sentences.find((sentence) => pattern.test(sentence)); + if (found) return found.slice(0, 400); + } + return ""; +} + +function inferTaskFromContext(cachedContext) { + const text = snippetText(cachedContext); + const fromFinal = sentenceFromText(text, [/đang làm|current task|active task|mục tiêu|goal|kế hoạch|plan|implement|finish|fix|release|diff/i]); + if (fromFinal) return fromFinal; + if (cachedContext.localEvidence?.diffStat && cachedContext.localEvidence.diffStat !== "No unstaged diff stat") { + return "Continue the active work represented by the current git diff and recent conversation snippets."; + } + return "Unknown from compacted context."; +} + +function inferUserIntentFromContext(cachedContext) { + const text = snippetText(cachedContext); + const userLine = [...cachedContext.snippets].reverse().find((line) => /^user/i.test(line)); + if (userLine) return previewText(userLine.replace(/^user[^:]*:\s*/i, ""), 500); + const intent = sentenceFromText(text, [/user wants|người dùng muốn|yêu cầu|asked|mục tiêu|goal|release|diff|verify|test|fix/i]); + return intent || "Unknown. The compact context did not contain a concrete user intent."; +} + +function inferActivePlanFromContext(cachedContext) { + const text = snippetText(cachedContext); + const plan = []; + const numbered = text.match(/(?:^|\s)(?:\d+\.|[-*])\s+([^\n.]{12,180})/g) || []; + for (const item of numbered.slice(-6)) { + const cleaned = item.replace(/^\s*(?:\d+\.|[-*])\s+/, "").trim(); + if (cleaned && !/memory|citation|screenshot/i.test(cleaned)) plan.push(cleaned.replace(/[.;:,]$/, ".")); + } + for (const pattern of [/next action[^:]*:\s*([^\n]{12,180})/i, /tiếp theo[^:]*:\s*([^\n]{12,180})/i, /còn (?:lại|rủi ro)[^:]*:\s*([^\n]{12,180})/i]) { + const match = text.match(pattern); + if (match?.[1]) plan.push(match[1].trim().replace(/[.;:,]$/, ".")); + } + return [...new Set(plan)].slice(0, 6); +} + +function inferVerificationFromContext(cachedContext) { + const text = snippetText(cachedContext); + const passed = []; + const failed = []; + const notRun = []; + const lines = text.split(/\r?\n/).map((line) => line.replace(/^assistant\/(?:commentary|final_answer):\s*/i, "").trim()).filter(Boolean); + + for (const line of lines) { + const commands = [...line.matchAll(/`([^`]{2,80})`/g)].map((match) => match[1]); + const hasPass = /\b(OK|pass(?:ed)?|xanh|0 fail|green)\b/i.test(line); + const hasFail = /\b(fail(?:ed)?|đỏ|red|error)\b/i.test(line) && !/0 fail/i.test(line); + const hasPending = /\b(đang chạy|still running|not run|chưa chạy|pending|chờ)\b/i.test(line); + const target = commands.length ? commands.join(", ") : previewText(line, 180); + if (hasFail) failed.push(target); + else if (hasPending) notRun.push(target); + else if (hasPass) passed.push(target); + } + + return { passed: [...new Set(passed)].filter(Boolean).slice(0, 8), failed: [...new Set(failed)].filter(Boolean).slice(0, 6), notRun: [...new Set(notRun)].filter(Boolean).slice(0, 6) }; +} + +function executableNextAction(activePlan, hasSignal) { + const candidate = activePlan.find((item) => /^(run|inspect|fix|continue|verify|check|review|decide|stage|commit|rerun|read|open|compare|chạy|kiểm|sửa|đọc|xem|so|quyết)/i.test(item)); + if (candidate) return candidate; + if (activePlan.length) return `Inspect and continue: ${activePlan[0]}`; + if (hasSignal) return "Inspect the focused current diff and recent verification output, then continue the first unfinished item from the captured task."; + return "Run `git status --short`, `git diff --stat`, and inspect focused diffs for changed source/test files. Infer the active task only from current repo evidence."; +} + +function describeFile(file) { + if (/\.(test|spec)\.(mjs|js|ts|tsx)$|(^|\/)tests?\//i.test(file)) return "test file directly mentioned by current diff or compact context."; + if (/(^|\/)src\//i.test(file)) return "source file directly mentioned by current diff or compact context."; + if (/(^|\/)scripts?\//i.test(file)) return "script/tooling file directly mentioned by current diff or compact context."; + if (/(^|\/)schema/i.test(file)) return "schema/config contract file directly mentioned by current diff or compact context."; + if (/package\.json|README|CHANGELOG|\.md$/i.test(file)) return "release/docs/config file directly mentioned by current diff or compact context."; + return "directly mentioned file from active compact context; inspect focused diff before editing."; +} + +function synthesizeCompactSummary(cachedContext) { + const repo = cachedContext.workspaces.length ? cachedContext.workspaces.join(", ") : "Unknown"; + const files = cachedContext.files.length ? cachedContext.files : []; + const evidence = cachedContext.localEvidence; + const task = inferTaskFromContext(cachedContext); + const activePlan = inferActivePlanFromContext(cachedContext); + const verification = inferVerificationFromContext(cachedContext); + const hasSignal = !task.startsWith("Unknown"); + const currentTask = activePlan.length + ? `${task} Active ordered plan from the latest user goal: ${activePlan.join(" ")}` + : task; + const lines = [ + "Current task:", + currentTask, + "", + "User intent:", + inferUserIntentFromContext(cachedContext), + "", + "Repo / location:", + evidence?.workspace || repo, + "", + "Current state:", + evidence ? `Branch: ${evidence.branch}. git status --short: ${evidence.status}. git diff --stat: ${evidence.diffStat}. Preserve current worktree state, not stale compact fallback text.` : (cachedContext.hasChanges ? "Uncommitted changes exist according to Codex metadata. Preserve them. Continue from current worktree state, not from stale compact fallback text." : "Current worktree state is unknown. Preserve any uncommitted changes."), + "", + "Important files:" + ]; + if (files.length) { + for (const file of files.slice(0, 14)) lines.push(`- ${file}: ${describeFile(file)}`); + } else { + lines.push("Unknown. Must recover from git diff/status and recent test output."); + } + lines.push("", "Changes already made:"); + if (hasSignal) { + lines.push(`- ${task}`); + if (evidence?.diffStat && evidence.diffStat !== "No unstaged diff stat") lines.push("- Current git diff/stat is captured in Current state and Important files."); + if (verification.passed.length || verification.failed.length || verification.notRun.length) lines.push("- Recent verification signals are captured in Known verification."); + } else { + lines.push("Unknown."); + } + lines.push( + "", + "Known verification:", + `- Passed: ${verification.passed.length ? verification.passed.join("; ") : "None recorded in proxy structured cache"}`, + `- Failed: ${verification.failed.length ? verification.failed.join("; ") : "None recorded in proxy structured cache"}`, + `- Not run: ${verification.notRun.length ? verification.notRun.join("; ") : (hasSignal ? "Confirm targeted unit tests, lint/typecheck, and full build status from current repo output." : "Unknown")}`, + "", + "Unfinished work:" + ); + if (activePlan.length) { + for (const item of activePlan) lines.push(`- ${item}`); + } else if (hasSignal) { + lines.push("- Continue from the captured task, git evidence, and verification state; inspect focused diffs before editing."); + } else { + lines.push("- Recover task intent from repository state."); + } + const nextAction = executableNextAction(activePlan, hasSignal); + lines.push( + "", + "Next action:", + nextAction, + "", + "Do not do:", + "- Do not reuse or trust previous proxy fallback handoffs that say `Current task: Unknown from compacted context`.", + "- Do not continue unrelated memories, global config, Cloudflare, PulseCall, or report-generation context unless the current diff proves it is relevant.", + "- Do not reset or discard uncommitted changes." + ); + return lines.join("\n"); +} + +function chooseCompactResponse(response, cachedContext) { + const quality = compactOutputQuality(response, cachedContext); + if (!quality.bad) return { response: normalizeCompactResponse(response), quality, fallback: false, synthesized: false }; + return { response: createCompactResponse(synthesizeCompactSummary(cachedContext)), quality, fallback: false, synthesized: true }; +} + function sanitizeHeadersForDebug(headersObject) { const result = {}; for (const [key, value] of Object.entries(headersObject)) { @@ -223,6 +947,115 @@ function isEventStream(contentType = "") { return contentType.split(";", 1)[0].trim().toLowerCase() === "text/event-stream"; } +function upstreamJsonRequest(settings, pathname, payload) { + return new Promise((resolvePromise, reject) => { + const targetUrl = joinUrlPath(settings.upstream.baseUrl, pathname); + const transport = targetUrl.protocol === "https:" ? https : http; + const body = Buffer.from(JSON.stringify(payload)); + const headers = { + "content-type": "application/json", + "content-length": String(body.length), + [settings.upstream.authHeader]: formatAuthorization(settings.upstream), + ...settings.upstream.extraHeaders + }; + log("info", "Upstream JSON request", { pathname, target_url: targetUrl.href, body_bytes: body.length }); + debugLog("UPSTREAM JSON REQUEST", { + pathname, + targetUrl: targetUrl.href, + headers: Object.fromEntries(Object.entries(headers).map(([key, value]) => [key, key.toLowerCase() === "authorization" ? maskSecret(value) : value])), + bodyBytes: body.length + }); + + const request = transport.request( + { + method: "POST", + protocol: targetUrl.protocol, + hostname: targetUrl.hostname, + port: targetUrl.port || undefined, + path: `${targetUrl.pathname}${targetUrl.search}`, + headers, + rejectUnauthorized: settings.upstream.verifySsl, + timeout: settings.upstream.timeoutMs + }, + (response) => { + const chunks = []; + response.on("data", (chunk) => chunks.push(chunk)); + response.on("end", () => { + const raw = Buffer.concat(chunks).toString("utf8"); + if ((response.statusCode || 502) < 200 || (response.statusCode || 502) >= 300) { + reject(new Error(`Upstream ${pathname} failed with ${response.statusCode}: ${previewText(raw, 1000)}`)); + return; + } + try { + resolvePromise(JSON.parse(raw)); + } catch (error) { + reject(new Error(`Failed to parse upstream ${pathname} response: ${error.message}`)); + } + }); + } + ); + request.on("timeout", () => request.destroy(new Error(`Upstream ${pathname} timed out`))); + request.on("error", reject); + request.end(body); + }); +} + +function compactContextPrompt(cachedContext, originalAnalysis, validation = null) { + const lines = []; + if (validation) { + lines.push("Previous compact attempt failed validation. Fix these issues exactly:"); + lines.push(JSON.stringify(validation)); + lines.push(""); + } + lines.push("Create a compact execution handoff from this context."); + lines.push(""); + lines.push("Compact request analysis:"); + lines.push(JSON.stringify(originalAnalysis, null, 2)); + lines.push(""); + lines.push("Proxy context and local evidence:"); + lines.push(buildCachedContextText(cachedContext) || "No cached context available."); + return lines.join("\n"); +} + +function extractChatCompletionText(response) { + const content = response?.choices?.[0]?.message?.content; + if (typeof content === "string") return content; + if (Array.isArray(content)) return extractTextParts(content).join("\n"); + return ""; +} + +function validateLlmCompactText(text, cachedContext) { + const response = createCompactResponse(text); + const quality = compactOutputQuality(response, cachedContext); + const nextAction = String(text).match(/Next action:\s*([\s\S]*?)(?:\nDo not do:|$)/i)?.[1]?.trim() || ""; + const badNextAction = !nextAction || nextAction.length < 12 || /^(unknown|none|n\/a)$/i.test(nextAction) || /\n-\s.*\n-\s/.test(nextAction); + return { ok: !quality.bad && !badNextAction, quality, badNextAction, nextAction }; +} + +async function runLlmCompact(settings, cachedContext, originalAnalysis) { + let validation = null; + for (let attempt = 0; attempt <= MAX_LLM_COMPACT_RETRIES; attempt += 1) { + const payload = { + model: "gpt-5.5", + messages: [ + { role: "system", content: LLM_COMPACT_SYSTEM_PROMPT }, + { role: "user", content: compactContextPrompt(cachedContext, originalAnalysis, validation) } + ], + temperature: 0.2, + max_tokens: 4096, + stream: false + }; + const raw = await upstreamJsonRequest(settings, "/chat/completions", payload); + const text = extractChatCompletionText(raw).trim(); + validation = validateLlmCompactText(text, cachedContext); + debugLog("LLM COMPACT ATTEMPT", { attempt, validation, text: previewText(text, 2000) }); + if (validation.ok || attempt === MAX_LLM_COMPACT_RETRIES) { + return { response: createCompactResponse(text), validation, attempts: attempt + 1 }; + } + } + throw new Error("LLM compact failed unexpectedly"); +} + function createServer(settings) { return http.createServer((req, res) => { if (!req.url) { @@ -252,7 +1085,7 @@ function createServer(settings) { const chunks = []; req.on("data", (chunk) => chunks.push(chunk)); - req.on("end", () => { + req.on("end", async () => { let body = Buffer.concat(chunks); const contentEncoding = req.headers["content-encoding"]; let bodyTransformed = false; @@ -279,6 +1112,49 @@ function createServer(settings) { } } + const normalizeResponses = req.url.startsWith("/responses") && !req.url.startsWith("/responses/compact"); + const rewrite = rewriteRequestBody(body, { normalizeResponses }); + if (rewrite.rewritten) { + body = rewrite.body; + bodyTransformed = true; + debugLog("REQUEST REWRITE", { model: "gpt-5.5", normalizedResponses: rewrite.normalized }); + } + if (req.url.startsWith("/responses/compact")) { + log("info", "Compact request received", { request_id: requestId, path: req.url }); + const cachedContext = compactContextForSelection(getCachedContext(req), body); + const compactInputAnalysis = analyzeCompactInput(body); + debugLog("COMPACT INPUT ANALYSIS", compactInputAnalysis); + writeCompactDump(settings, requestId, "request", { + requestId, + path: req.url, + mode: "llm_compact", + inputAnalysis: compactInputAnalysis, + localEvidence: cachedContext?.localEvidence || null + }); + try { + const llmCompact = await runLlmCompact(settings, cachedContext, compactInputAnalysis); + const compactOutputAnalysis = { ...analyzeCompactOutput(llmCompact.response), validation: llmCompact.validation, attempts: llmCompact.attempts, mode: "llm_compact" }; + debugLog("LLM COMPACT RESPONSE", llmCompact.response); + writeCompactDump(settings, requestId, "response", { requestId, stream: false, outputAnalysis: compactOutputAnalysis, response: llmCompact.response }); + writeJson(res, 200, llmCompact.response); + log("info", "Proxied compact request", { + request_id: requestId, + method: req.method || "POST", + path: req.url, + status: 200, + adapted: true, + mode: "llm_compact", + attempts: llmCompact.attempts, + duration_ms: Date.now() - startedAt + }); + } catch (error) { + log("error", "LLM compact failed", { request_id: requestId, error: error.message }); + writeJson(res, 502, { error: { message: error.message, type: "llm_compact_failed" } }); + } + return; + } + rememberThreadContext(req, body); + const headers = buildUpstreamHeaders(req, settings, targetUrl, { stripContentHeaders: bodyTransformed }); From 98d681b563165fef2053effacdf5e6a0ac0aa9fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=AA=20Anh=20Tu=E1=BA=A5n?= Date: Fri, 22 May 2026 18:50:29 +0700 Subject: [PATCH 2/4] Implement LLM compact helper and regression tests for /responses/compact --- node/src/server.mjs | 109 +++++++++++++++++++++++++++++++++++ node/test/server.test.mjs | 116 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 225 insertions(+) diff --git a/node/src/server.mjs b/node/src/server.mjs index cbce3f8..a0ba179 100644 --- a/node/src/server.mjs +++ b/node/src/server.mjs @@ -963,6 +963,115 @@ function chooseCompactResponse(response, cachedContext) { return { response: createCompactResponse(synthesizeCompactSummary(cachedContext)), quality, fallback: false, synthesized: true }; } +function upstreamJsonRequest(settings, pathname, payload) { + return new Promise((resolvePromise, reject) => { + const targetUrl = joinUrlPath(settings.upstream.baseUrl, pathname); + const transport = targetUrl.protocol === "https:" ? https : http; + const body = Buffer.from(JSON.stringify(payload)); + const headers = { + "content-type": "application/json", + "content-length": String(body.length), + [settings.upstream.authHeader]: formatAuthorization(settings.upstream), + ...settings.upstream.extraHeaders + }; + + debugLog("UPSTREAM JSON REQUEST", { + pathname, + targetUrl: targetUrl.href, + headers: Object.fromEntries(Object.entries(headers).map(([key, value]) => [key, key.toLowerCase() === "authorization" ? maskSecret(value) : value])), + bodyBytes: body.length + }); + + const request = transport.request( + { + method: "POST", + protocol: targetUrl.protocol, + hostname: targetUrl.hostname, + port: targetUrl.port || undefined, + path: `${targetUrl.pathname}${targetUrl.search}`, + headers, + rejectUnauthorized: settings.upstream.verifySsl, + timeout: settings.upstream.timeoutMs + }, + (response) => { + const chunks = []; + response.on("data", (chunk) => chunks.push(chunk)); + response.on("end", () => { + const raw = Buffer.concat(chunks).toString("utf8"); + if ((response.statusCode || 502) < 200 || (response.statusCode || 502) >= 300) { + reject(new Error(`Upstream ${pathname} failed with ${response.statusCode}: ${previewText(raw, 1000)}`)); + return; + } + try { + resolvePromise(JSON.parse(raw)); + } catch (error) { + reject(new Error(`Failed to parse upstream ${pathname} response: ${error.message}`)); + } + }); + } + ); + request.on("timeout", () => request.destroy(new Error(`Upstream ${pathname} timed out`))); + request.on("error", reject); + request.end(body); + }); +} + +function compactContextPrompt(cachedContext, originalAnalysis, validation = null) { + const lines = []; + if (validation) { + lines.push("Previous compact attempt failed validation. Fix these issues exactly:"); + lines.push(JSON.stringify(validation)); + lines.push(""); + } + lines.push("Create a compact execution handoff from this context."); + lines.push(""); + lines.push("Compact request analysis:"); + lines.push(JSON.stringify(originalAnalysis, null, 2)); + lines.push(""); + lines.push("Proxy context and local evidence:"); + lines.push(buildCachedContextText(cachedContext) || "No cached context available."); + return lines.join("\n"); +} + +function extractChatCompletionText(response) { + const content = response?.choices?.[0]?.message?.content; + if (typeof content === "string") return content; + if (Array.isArray(content)) return extractTextParts(content).join("\n"); + return ""; +} + +function validateLlmCompactText(text, cachedContext) { + const response = createCompactResponse(text); + const quality = compactOutputQuality(response, cachedContext); + const nextAction = String(text).match(/Next action:\s*([\s\S]*?)(?:\nDo not do:|$)/i)?.[1]?.trim() || ""; + const badNextAction = !nextAction || nextAction.length < 12 || /^(unknown|none|n\/a)$/i.test(nextAction) || /\n-\s.*\n-\s/.test(nextAction); + return { ok: !quality.bad && !badNextAction, quality, badNextAction, nextAction }; +} + +async function runLlmCompact(settings, cachedContext, originalAnalysis) { + let validation = null; + for (let attempt = 0; attempt <= MAX_LLM_COMPACT_RETRIES; attempt += 1) { + const payload = { + model: "gpt-5.5", + messages: [ + { role: "system", content: LLM_COMPACT_SYSTEM_PROMPT }, + { role: "user", content: compactContextPrompt(cachedContext, originalAnalysis, validation) } + ], + temperature: 0.2, + max_tokens: 4096, + stream: false + }; + const raw = await upstreamJsonRequest(settings, "/chat/completions", payload); + const text = extractChatCompletionText(raw).trim(); + validation = validateLlmCompactText(text, cachedContext); + debugLog("LLM COMPACT ATTEMPT", { attempt, validation, text: previewText(text, 2000) }); + if (validation.ok || attempt === MAX_LLM_COMPACT_RETRIES) { + return { response: createCompactResponse(text), validation, attempts: attempt + 1 }; + } + } + throw new Error("LLM compact failed unexpectedly"); +} + function buildHealthPayload(settings, captureManager) { return { ok: true, diff --git a/node/test/server.test.mjs b/node/test/server.test.mjs index 82147cf..4fe2d11 100644 --- a/node/test/server.test.mjs +++ b/node/test/server.test.mjs @@ -36,6 +36,122 @@ function requestJson(url, body) { }); } +function validCompactText() { + return [ + "Current task:", + "Repair the compact proxy route after an upstream fork sync removed the local LLM compact helper implementation.", + "", + "User intent:", + "The user needs /responses/compact to return a valid Responses API handoff instead of a 502 ReferenceError.", + "", + "Repo / location:", + "/Users/tuan/Dev/VibeLab/codex-remote-proxy", + "", + "Current state:", + "The proxy intercepts compact requests and should call the configured chat completions provider directly.", + "", + "Important files:", + "- node/src/server.mjs: owns the compact route and LLM compact helper implementation.", + "- node/test/server.test.mjs: covers the compact route regression.", + "", + "Changes already made:", + "- Restored the compact helper path and regression coverage in the proxy test suite.", + "", + "Known verification:", + "- Passed: compact route regression test returned a completed response.", + "- Failed: None recorded.", + "- Not run: package release workflow.", + "", + "Unfinished work:", + "- Confirm the proxy process is restarted with the patched code.", + "", + "Next action:", + "Run npm test --prefix node and restart the local proxy process after the tests pass.", + "", + "Do not do:", + "- Do not forward /responses/compact to the upstream Responses endpoint.", + "- Do not reset or discard uncommitted changes.", + "", + "This extra detail keeps the compact response above the validator minimum length while preserving concrete state, files, verification, and next action. The content is intentionally deterministic for regression testing." + ].join("\n"); +} + +test("compact route uses chat completions helper and returns a compact response", async () => { + const dir = makeTempDir("crp-compact"); + mkdirSync(dir, { recursive: true }); + const upstreamCalls = []; + + const upstreamServer = http.createServer((req, res) => { + const chunks = []; + req.on("data", (chunk) => chunks.push(chunk)); + req.on("end", () => { + const payload = JSON.parse(Buffer.concat(chunks).toString("utf8")); + upstreamCalls.push({ url: req.url, authorization: req.headers.authorization, payload }); + res.statusCode = 200; + res.setHeader("content-type", "application/json"); + res.end(JSON.stringify({ + choices: [{ + message: { + content: validCompactText() + } + }] + })); + }); + }); + const upstreamPort = await listen(upstreamServer); + + const { server, captureManager } = createApp({ + configPath: join(dir, "proxy-config.json"), + server: { + host: "127.0.0.1", + port: 0, + logLevel: "info" + }, + upstream: { + baseUrl: `http://127.0.0.1:${upstreamPort}`, + apiKey: "upstream-secret", + timeoutMs: 300000, + verifySsl: true, + authHeader: "authorization", + authScheme: "Bearer", + extraHeaders: {} + }, + proxy: { + overrideAuthorization: true, + requestIdHeader: "x-client-request-id" + }, + capture: { + enabled: false + } + }); + const proxyPort = await listen(server); + + const response = await requestJson(`http://127.0.0.1:${proxyPort}/responses/compact`, { + model: "gpt-5.4", + input: [{ + type: "message", + role: "user", + content: [{ type: "input_text", text: "compact this context" }] + }] + }); + + assert.equal(response.status, 200); + const json = await response.json(); + assert.equal(json.status, "completed"); + assert.match(json.output[0].content[0].text, /Current task:/); + assert.equal(upstreamCalls.length, 1); + assert.equal(upstreamCalls[0].url, "/chat/completions"); + assert.equal(upstreamCalls[0].authorization, "Bearer upstream-secret"); + assert.equal(upstreamCalls[0].payload.model, "gpt-5.5"); + + server.close(); + await once(server, "close"); + upstreamServer.close(); + await once(upstreamServer, "close"); + captureManager.close(); + rmSync(dir, { recursive: true, force: true }); +}); + test("server writes proxied request and response to sqlite", async () => { const dir = makeTempDir("crp-server"); mkdirSync(dir, { recursive: true }); From 965cfa35cfc5e8277431d3ed9d83afe781058416 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=AA=20Anh=20Tu=E1=BA=A5n?= Date: Fri, 22 May 2026 21:19:09 +0700 Subject: [PATCH 3/4] feat: add modelOverride support and service management tools - Add modelOverride config option to override model name in all requests - Support CLI flag --model-override, env var CRP_MODEL_OVERRIDE, and saved config - Preserve existing proxy config on restart to prevent overwriting user changes - Add change-model.sh script for easy model switching - Add manage-service.sh for macOS LaunchAgent service management - Update proxy-config.example.json with modelOverride field - Add comprehensive documentation (MODEL-OVERRIDE.md, USAGE.md, INSTALLATION.md) This allows users to easily switch between different LLM models (Claude, GPT, etc.) without modifying code, and provides convenient service management on macOS. --- INSTALLATION.md | 76 ++++++++++++++++ MODEL-OVERRIDE.md | 73 +++++++++++++++ USAGE.md | 162 +++++++++++++++++++++++++++++++++ change-model.sh | 57 ++++++++++++ manage-service.sh | 67 ++++++++++++++ node/bin/crp.mjs | 30 +++++- node/proxy-config.example.json | 3 +- node/src/server.mjs | 34 +++++-- 8 files changed, 489 insertions(+), 13 deletions(-) create mode 100644 INSTALLATION.md create mode 100644 MODEL-OVERRIDE.md create mode 100644 USAGE.md create mode 100755 change-model.sh create mode 100755 manage-service.sh diff --git a/INSTALLATION.md b/INSTALLATION.md new file mode 100644 index 0000000..ac3b456 --- /dev/null +++ b/INSTALLATION.md @@ -0,0 +1,76 @@ +# Codex Remote Proxy - Installation Summary + +## Cài đặt thành công! + +Proxy đã được cài đặt từ source và chạy ngầm như một LaunchAgent service trên macOS. + +### Thông tin cấu hình + +- **Service name**: `dev.tuanle.codex-remote-proxy` +- **Proxy URL**: `http://127.0.0.1:56210` +- **Upstream URL**: `https://llm.tuanle.dev/v1` +- **Config file**: `~/.codex-remote-proxy/config.json` +- **Service plist**: `~/Library/LaunchAgents/dev.tuanle.codex-remote-proxy.plist` + +### Quản lý service + +Sử dụng script tiện ích: + +```bash +cd /Users/justin/Dev/VibeLab/codex-remote-proxy +./manage-service.sh {start|stop|restart|status|health|logs} +``` + +Hoặc dùng launchctl trực tiếp: + +```bash +# Start service +launchctl load ~/Library/LaunchAgents/dev.tuanle.codex-remote-proxy.plist + +# Stop service +launchctl unload ~/Library/LaunchAgents/dev.tuanle.codex-remote-proxy.plist + +# Check status +launchctl list | grep codex-remote-proxy +``` + +### Kiểm tra trạng thái + +```bash +# Proxy status +cd /Users/justin/Dev/VibeLab/codex-remote-proxy/node +node bin/crp.mjs status --json + +# Health check +curl http://127.0.0.1:56210/_proxy/health +``` + +### Log files + +- Service output: `~/.codex-remote-proxy/service.log` +- Service errors: `~/.codex-remote-proxy/service.error.log` +- Proxy logs: `~/.codex-remote-proxy/proxy.log` + +### Tính năng + +- ✅ Tự động khởi động khi login (RunAtLoad) +- ✅ Tự động restart nếu crash (KeepAlive) +- ✅ Chạy ngầm không cần terminal +- ✅ Log đầy đủ cho debugging + +### Sử dụng với Codex + +1. Restart Codex Desktop +2. Sign in với ChatGPT account +3. Codex sẽ tự động forward requests qua proxy này đến upstream API của bạn + +### Gỡ cài đặt + +```bash +# Stop và remove service +launchctl unload ~/Library/LaunchAgents/dev.tuanle.codex-remote-proxy.plist +rm ~/Library/LaunchAgents/dev.tuanle.codex-remote-proxy.plist + +# Xóa config (optional) +rm -rf ~/.codex-remote-proxy +``` diff --git a/MODEL-OVERRIDE.md b/MODEL-OVERRIDE.md new file mode 100644 index 0000000..55f0a62 --- /dev/null +++ b/MODEL-OVERRIDE.md @@ -0,0 +1,73 @@ +# Model Override Feature + +## Tính năng mới: Override Model + +Proxy giờ đây hỗ trợ override model name trong tất cả requests gửi đến upstream API. + +### Cách sử dụng + +#### Option 1: Lưu trong config file + +Thêm `modelOverride` vào `~/.codex-remote-proxy/config.json`: + +```json +{ + "upstreamBaseUrl": "https://llm.tuanle.dev/v1", + "apiKey": "sk-your-key", + "listenHost": "127.0.0.1", + "listenPort": 56210, + "captureEnabled": false, + "modelOverride": "claude-opus-4" +} +``` + +#### Option 2: CLI flag + +```bash +crp start --model-override "claude-opus-4" +``` + +#### Option 3: Environment variable + +```bash +export CRP_MODEL_OVERRIDE="claude-opus-4" +crp start +``` + +### Ví dụ model names + +- `claude-opus-4` - Claude Opus 4 +- `claude-sonnet-4` - Claude Sonnet 4 +- `gpt-4o` - GPT-4 Optimized +- `gpt-4-turbo` - GPT-4 Turbo +- Hoặc bất kỳ model name nào mà upstream API của bạn hỗ trợ + +### Cách hoạt động + +Khi `modelOverride` được set: +- Tất cả requests từ Codex sẽ có `model` field được thay thế bằng giá trị `modelOverride` +- Nếu không set, proxy sẽ giữ nguyên model từ Codex (hoặc convert `gpt-5.4` → `gpt-5.5` theo mặc định) + +### Kiểm tra config + +```bash +curl http://127.0.0.1:56210/_proxy/health | jq .modelOverride +``` + +### Thay đổi model + +1. Sửa file config: +```bash +nano ~/.codex-remote-proxy/config.json +``` + +2. Restart service: +```bash +cd /Users/justin/Dev/VibeLab/codex-remote-proxy +./manage-service.sh restart +``` + +3. Verify: +```bash +./manage-service.sh health | jq .modelOverride +``` diff --git a/USAGE.md b/USAGE.md new file mode 100644 index 0000000..6f3c729 --- /dev/null +++ b/USAGE.md @@ -0,0 +1,162 @@ +# Codex Remote Proxy - Hướng dẫn sử dụng + +## Đã cài đặt thành công! + +### Thông tin cấu hình + +- **Service**: `dev.tuanle.codex-remote-proxy` +- **Proxy URL**: `http://127.0.0.1:56210` +- **Upstream**: `https://llm.tuanle.dev/v1` +- **Model**: `claude-opus-4` (có thể thay đổi) + +### Quản lý service + +```bash +cd /Users/justin/Dev/VibeLab/codex-remote-proxy + +# Xem status +./manage-service.sh status + +# Restart +./manage-service.sh restart + +# Stop +./manage-service.sh stop + +# Start +./manage-service.sh start + +# Health check +./manage-service.sh health + +# Xem logs +./manage-service.sh logs +``` + +### Thay đổi model + +**Cách nhanh nhất: Dùng script helper** + +```bash +cd /Users/justin/Dev/VibeLab/codex-remote-proxy + +# Thay đổi model +./change-model.sh claude-opus-4-7 +./change-model.sh claude-sonnet-4 +./change-model.sh gpt-4o +``` + +Script này sẽ tự động: +- Cập nhật cả 2 file config (user + runtime) +- Restart service +- Verify model đã đổi thành công + +**Cách 1: Sửa trực tiếp config file** + +```bash +# Edit config +nano ~/.codex-remote-proxy/config.json + +# Thay đổi dòng: +"modelOverride": "claude-opus-4" + +# Thành model bạn muốn, ví dụ: +"modelOverride": "gpt-4o" + +# Restart service +./manage-service.sh restart +``` + +**Cách 2: Sửa runtime config** + +```bash +# Edit runtime config +nano ~/.codex-remote-proxy/node/proxy-config.json + +# Tìm và sửa: +"modelOverride": "claude-opus-4" + +# Restart service +./manage-service.sh restart +``` + +**Lưu ý:** Config sẽ được giữ nguyên sau khi restart, không bị ghi đè nữa! + +### Các model có thể dùng + +- `claude-opus-4` - Claude Opus 4 (đang dùng) +- `claude-sonnet-4` - Claude Sonnet 4 +- `claude-haiku-4` - Claude Haiku 4 +- `gpt-4o` - GPT-4 Optimized +- `gpt-4-turbo` - GPT-4 Turbo +- `gpt-3.5-turbo` - GPT-3.5 Turbo +- Hoặc bất kỳ model nào upstream API hỗ trợ + +### Kiểm tra config hiện tại + +```bash +# Xem model đang dùng +curl -s http://127.0.0.1:56210/_proxy/health | jq .modelOverride + +# Xem toàn bộ config +./manage-service.sh health +``` + +### Sử dụng với Codex + +1. Restart Codex Desktop +2. Sign in với ChatGPT account +3. Codex sẽ tự động forward requests qua proxy +4. Tất cả requests sẽ dùng model bạn đã config + +### Troubleshooting + +**Model không thay đổi sau khi restart:** +```bash +# Kiểm tra config file +cat ~/.codex-remote-proxy/node/proxy-config.json | jq .upstream.modelOverride + +# Nếu vẫn sai, sửa trực tiếp và restart +nano ~/.codex-remote-proxy/node/proxy-config.json +./manage-service.sh restart +``` + +**Service không start:** +```bash +# Xem logs +./manage-service.sh logs + +# Hoặc +tail -50 ~/.codex-remote-proxy/service.error.log +``` + +**Proxy không response:** +```bash +# Check service status +launchctl list | grep codex-remote-proxy + +# Restart service +./manage-service.sh restart +``` + +### Files quan trọng + +- Config: `~/.codex-remote-proxy/config.json` +- Runtime config: `~/.codex-remote-proxy/node/proxy-config.json` +- Service plist: `~/Library/LaunchAgents/dev.tuanle.codex-remote-proxy.plist` +- Logs: `~/.codex-remote-proxy/service.log` +- Error logs: `~/.codex-remote-proxy/service.error.log` + +### Gỡ cài đặt + +```bash +# Stop service +./manage-service.sh stop + +# Remove service +launchctl unload ~/Library/LaunchAgents/dev.tuanle.codex-remote-proxy.plist +rm ~/Library/LaunchAgents/dev.tuanle.codex-remote-proxy.plist + +# Xóa config (optional) +rm -rf ~/.codex-remote-proxy +``` diff --git a/change-model.sh b/change-model.sh new file mode 100755 index 0000000..9f9434d --- /dev/null +++ b/change-model.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +# Change Model Script +# Usage: ./change-model.sh + +if [ -z "$1" ]; then + echo "Usage: $0 " + echo "" + echo "Examples:" + echo " $0 claude-opus-4-7" + echo " $0 claude-sonnet-4" + echo " $0 gpt-4o" + exit 1 +fi + +MODEL="$1" +USER_CONFIG="$HOME/.codex-remote-proxy/config.json" +RUNTIME_CONFIG="$HOME/.codex-remote-proxy/node/proxy-config.json" + +echo "Changing model to: $MODEL" +echo "" + +# Update user config +if [ -f "$USER_CONFIG" ]; then + echo "Updating user config..." + jq --arg model "$MODEL" '.modelOverride = $model' "$USER_CONFIG" > "$USER_CONFIG.tmp" && mv "$USER_CONFIG.tmp" "$USER_CONFIG" + echo "✓ Updated: $USER_CONFIG" +else + echo "⚠ User config not found: $USER_CONFIG" +fi + +# Update runtime config +if [ -f "$RUNTIME_CONFIG" ]; then + echo "Updating runtime config..." + jq --arg model "$MODEL" '.upstream.modelOverride = $model' "$RUNTIME_CONFIG" > "$RUNTIME_CONFIG.tmp" && mv "$RUNTIME_CONFIG.tmp" "$RUNTIME_CONFIG" + echo "✓ Updated: $RUNTIME_CONFIG" +else + echo "⚠ Runtime config not found: $RUNTIME_CONFIG" +fi + +echo "" +echo "Restarting service..." +cd /Users/justin/Dev/VibeLab/codex-remote-proxy +./manage-service.sh restart > /dev/null 2>&1 + +sleep 2 + +echo "" +echo "Verifying..." +CURRENT_MODEL=$(curl -s http://127.0.0.1:56210/_proxy/health | jq -r '.modelOverride // "null"') + +if [ "$CURRENT_MODEL" = "$MODEL" ]; then + echo "✓ Model changed successfully to: $CURRENT_MODEL" +else + echo "✗ Failed to change model. Current: $CURRENT_MODEL, Expected: $MODEL" + exit 1 +fi diff --git a/manage-service.sh b/manage-service.sh new file mode 100755 index 0000000..1a97978 --- /dev/null +++ b/manage-service.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +# Codex Remote Proxy Service Manager +# Location: /Users/justin/Dev/VibeLab/codex-remote-proxy/manage-service.sh + +SERVICE_NAME="dev.tuanle.codex-remote-proxy" +PLIST_PATH="$HOME/Library/LaunchAgents/${SERVICE_NAME}.plist" +PROJECT_DIR="/Users/justin/Dev/VibeLab/codex-remote-proxy/node" + +case "$1" in + start) + echo "Starting Codex Remote Proxy service..." + launchctl load "$PLIST_PATH" + sleep 2 + launchctl list | grep "$SERVICE_NAME" + ;; + + stop) + echo "Stopping Codex Remote Proxy service..." + launchctl unload "$PLIST_PATH" + ;; + + restart) + echo "Restarting Codex Remote Proxy service..." + launchctl unload "$PLIST_PATH" 2>/dev/null + sleep 1 + launchctl load "$PLIST_PATH" + sleep 2 + launchctl list | grep "$SERVICE_NAME" + ;; + + status) + echo "Checking service status..." + launchctl list | grep "$SERVICE_NAME" + echo "" + cd "$PROJECT_DIR" && node bin/crp.mjs status --json | jq . + ;; + + health) + echo "Checking proxy health..." + curl -s http://127.0.0.1:56210/_proxy/health | jq . + ;; + + logs) + echo "=== Service Output Log ===" + tail -50 "$HOME/.codex-remote-proxy/service.log" + echo "" + echo "=== Service Error Log ===" + tail -50 "$HOME/.codex-remote-proxy/service.error.log" + echo "" + echo "=== Proxy Log ===" + tail -50 "$HOME/.codex-remote-proxy/proxy.log" + ;; + + *) + echo "Usage: $0 {start|stop|restart|status|health|logs}" + echo "" + echo "Commands:" + echo " start - Start the service" + echo " stop - Stop the service" + echo " restart - Restart the service" + echo " status - Show service and proxy status" + echo " health - Check proxy health endpoint" + echo " logs - Show recent logs" + exit 1 + ;; +esac diff --git a/node/bin/crp.mjs b/node/bin/crp.mjs index a78e7df..6f2267c 100644 --- a/node/bin/crp.mjs +++ b/node/bin/crp.mjs @@ -755,6 +755,13 @@ function resolveUserSettings(options) { { value: saved.captureDbPath, source: "saved" } ], defaultValue: DEFAULT_CAPTURE_DB_PATH + }), + modelOverride: resolveConfigValue({ + cliValue: options["model-override"], + envKey: "CRP_MODEL_OVERRIDE", + savedValues: [ + { value: saved.modelOverride, source: "saved" } + ] }) }; } @@ -784,6 +791,7 @@ async function installCommand(options) { const proxyUrl = `http://${listenHost}:${listenPort}`; const captureEnabled = Boolean(resolved.captureEnabled.value); const captureDbPath = ensureCaptureDbPath(resolved.captureDbPath.value); + const modelOverride = resolved.modelOverride?.value || null; const proxyConfig = { server: { host: listenHost, port: listenPort, logLevel: "info" }, @@ -794,7 +802,8 @@ async function installCommand(options) { verifySsl: true, authHeader: "authorization", authScheme: "Bearer", - extraHeaders: {} + extraHeaders: {}, + modelOverride }, proxy: { overrideAuthorization: true, @@ -807,7 +816,24 @@ async function installCommand(options) { }; ensureStateDirs(); - writeProxyConfig(proxyConfigPath, proxyConfig); + + // Only write proxy config if it doesn't exist or if explicitly requested + if (!existsSync(proxyConfigPath) || options["force-config"]) { + writeProxyConfig(proxyConfigPath, proxyConfig); + } else { + // Config exists, merge modelOverride if provided + try { + const existingConfig = JSON.parse(readFileSync(proxyConfigPath, "utf8")); + if (modelOverride && existingConfig.upstream) { + existingConfig.upstream.modelOverride = modelOverride; + writeProxyConfig(proxyConfigPath, existingConfig); + } + } catch { + // If read fails, write new config + writeProxyConfig(proxyConfigPath, proxyConfig); + } + } + if (!existsSync(codexConfigPath)) { throw new Error(`Codex config not found: ${codexConfigPath}`); } diff --git a/node/proxy-config.example.json b/node/proxy-config.example.json index 1ccd301..1c98792 100644 --- a/node/proxy-config.example.json +++ b/node/proxy-config.example.json @@ -11,7 +11,8 @@ "verifySsl": true, "authHeader": "authorization", "authScheme": "Bearer", - "extraHeaders": {} + "extraHeaders": {}, + "modelOverride": null }, "proxy": { "overrideAuthorization": true, diff --git a/node/src/server.mjs b/node/src/server.mjs index a0ba179..206c0aa 100644 --- a/node/src/server.mjs +++ b/node/src/server.mjs @@ -113,7 +113,8 @@ export function loadConfig(configPath = resolveConfigPath()) { verifySsl: typeof upstream.verifySsl === "boolean" ? upstream.verifySsl : true, authHeader: typeof upstream.authHeader === "string" && upstream.authHeader ? upstream.authHeader : "authorization", authScheme: typeof upstream.authScheme === "string" ? upstream.authScheme : "Bearer", - extraHeaders: isStringMap(upstream.extraHeaders) ? upstream.extraHeaders : {} + extraHeaders: isStringMap(upstream.extraHeaders) ? upstream.extraHeaders : {}, + modelOverride: typeof upstream.modelOverride === "string" && upstream.modelOverride ? upstream.modelOverride : null }, proxy: { overrideAuthorization: typeof proxy.overrideAuthorization === "boolean" ? proxy.overrideAuthorization : true, @@ -320,11 +321,16 @@ function isEventStream(contentType = "") { return contentType.split(";", 1)[0].trim().toLowerCase() === "text/event-stream"; } -function normalizeResponsesPayload(payload) { +function normalizeResponsesPayload(payload, settings) { if (!payload || typeof payload !== "object" || Array.isArray(payload)) return false; let changed = false; - if (payload.model === "gpt-5.4") { + if (settings?.upstream?.modelOverride) { + if (payload.model !== settings.upstream.modelOverride) { + payload.model = settings.upstream.modelOverride; + changed = true; + } + } else if (payload.model === "gpt-5.4") { payload.model = "gpt-5.5"; changed = true; } @@ -372,7 +378,7 @@ function normalizeResponsesPayload(payload) { return changed; } -function rewriteRequestBody(buffer, { normalizeResponses = false } = {}) { +function rewriteRequestBody(buffer, { normalizeResponses = false, settings = null } = {}) { if (!buffer.length) return { body: buffer, rewritten: false, normalized: false }; let payload; try { @@ -385,11 +391,18 @@ function rewriteRequestBody(buffer, { normalizeResponses = false } = {}) { let normalized = false; if (normalizeResponses) { - normalized = normalizeResponsesPayload(payload); + normalized = normalizeResponsesPayload(payload, settings); rewritten = normalized; - } else if (payload && typeof payload === "object" && payload.model === "gpt-5.4") { - payload.model = "gpt-5.5"; - rewritten = true; + } else if (payload && typeof payload === "object") { + if (settings?.upstream?.modelOverride) { + if (payload.model !== settings.upstream.modelOverride) { + payload.model = settings.upstream.modelOverride; + rewritten = true; + } + } else if (payload.model === "gpt-5.4") { + payload.model = "gpt-5.5"; + rewritten = true; + } } if (rewritten) return { body: Buffer.from(JSON.stringify(payload)), rewritten, normalized }; @@ -1079,6 +1092,7 @@ function buildHealthPayload(settings, captureManager) { listenHost: settings.server.host, listenPort: settings.server.port, upstreamBaseUrl: settings.upstream.baseUrl, + modelOverride: settings.upstream.modelOverride || null, overrideAuthorization: settings.proxy.overrideAuthorization, authHeader: settings.upstream.authHeader, authScheme: settings.upstream.authScheme, @@ -1189,11 +1203,11 @@ export function createServer(settings, { captureManager = createCaptureManager({ } const normalizeResponses = req.url.startsWith("/responses") && !req.url.startsWith("/responses/compact"); - const rewrite = rewriteRequestBody(body, { normalizeResponses }); + const rewrite = rewriteRequestBody(body, { normalizeResponses, settings }); if (rewrite.rewritten) { body = rewrite.body; bodyTransformed = true; - debugLog("REQUEST REWRITE", { model: "gpt-5.5", normalizedResponses: rewrite.normalized }); + debugLog("REQUEST REWRITE", { model: settings.upstream.modelOverride || "gpt-5.5", normalizedResponses: rewrite.normalized }); } if (req.url.startsWith("/responses/compact")) { log("info", "Compact request received", { request_id: requestId, path: req.url }); From 9fac233b8fcb178c16d4b650e2798d3e43eb44db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=AA=20Anh=20Tu=E1=BA=A5n?= Date: Fri, 22 May 2026 21:30:51 +0700 Subject: [PATCH 4/4] feat: add retry mechanism with exponential backoff for upstream stability - Add retryAttempts and retryDelayMs config options (default: 3 attempts, 1000ms base delay) - Implement exponential backoff retry logic for transient network errors - Retry on ECONNREFUSED, ETIMEDOUT, ECONNRESET, ENOTFOUND, EAI_AGAIN, and timeout errors - Log retry attempts with delay and error details - Prevent Codex disconnections when upstream provider is unstable --- node/package-lock.json | 25 +---- node/src/server.mjs | 241 +++++++++++++++++++++++------------------ 2 files changed, 136 insertions(+), 130 deletions(-) diff --git a/node/package-lock.json b/node/package-lock.json index bd23b90..3384190 100644 --- a/node/package-lock.json +++ b/node/package-lock.json @@ -1,12 +1,12 @@ { "name": "@cluic/codex-remote-proxy", - "version": "0.1.3", + "version": "0.2.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@cluic/codex-remote-proxy", - "version": "0.1.3", + "version": "0.2.1", "dependencies": { "fzstd": "^0.1.1" }, @@ -404,18 +404,6 @@ "node": ">= 8" } }, - "node_modules/@types/node": { - "version": "25.9.0", - "resolved": "https://registry.npmjs.org/@types/node/-/node-25.9.0.tgz", - "integrity": "sha512-AOQwYUNolgy3VosiRqXrACUXTN8nJUtPl7FJXMqZVyxiiCLhQuG3jXKvCS1ALr+Y2OmZhzzLVlYPEqJaiqkaJQ==", - "dev": true, - "license": "MIT", - "optional": true, - "peer": true, - "dependencies": { - "undici-types": ">=7.24.0 <7.24.7" - } - }, "node_modules/ansi-colors": { "version": "4.1.3", "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.3.tgz", @@ -1263,15 +1251,6 @@ "node": ">=8.0" } }, - "node_modules/undici-types": { - "version": "7.24.6", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.24.6.tgz", - "integrity": "sha512-WRNW+sJgj5OBN4/0JpHFqtqzhpbnV0GuB+OozA9gCL7a993SmU+1JBZCzLNxYsbMfIeDL+lTsphD5jN5N+n0zg==", - "dev": true, - "license": "MIT", - "optional": true, - "peer": true - }, "node_modules/universalify": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz", diff --git a/node/src/server.mjs b/node/src/server.mjs index 206c0aa..b8afdd3 100644 --- a/node/src/server.mjs +++ b/node/src/server.mjs @@ -114,7 +114,9 @@ export function loadConfig(configPath = resolveConfigPath()) { authHeader: typeof upstream.authHeader === "string" && upstream.authHeader ? upstream.authHeader : "authorization", authScheme: typeof upstream.authScheme === "string" ? upstream.authScheme : "Bearer", extraHeaders: isStringMap(upstream.extraHeaders) ? upstream.extraHeaders : {}, - modelOverride: typeof upstream.modelOverride === "string" && upstream.modelOverride ? upstream.modelOverride : null + modelOverride: typeof upstream.modelOverride === "string" && upstream.modelOverride ? upstream.modelOverride : null, + retryAttempts: Number.isInteger(upstream.retryAttempts) && upstream.retryAttempts >= 0 ? upstream.retryAttempts : 3, + retryDelayMs: Number.isInteger(upstream.retryDelayMs) && upstream.retryDelayMs >= 0 ? upstream.retryDelayMs : 1000 }, proxy: { overrideAuthorization: typeof proxy.overrideAuthorization === "boolean" ? proxy.overrideAuthorization : true, @@ -1283,124 +1285,149 @@ export function createServer(settings, { captureManager = createCaptureManager({ body: safeBodyPreview(body) }); - const upstreamRequest = transport.request( - { - method: req.method, - protocol: targetUrl.protocol, - hostname: targetUrl.hostname, - port: targetUrl.port || undefined, - path: `${targetUrl.pathname}${targetUrl.search}`, - headers, - rejectUnauthorized: settings.upstream.verifySsl - }, - (upstreamResponse) => { - const stream = isEventStream(upstreamResponse.headers["content-type"]); - debugLog("RESPONSE HEADERS", { - status: upstreamResponse.statusCode, - headers: upstreamResponse.headers - }); + function attemptUpstreamRequest(attemptNumber) { + const upstreamRequest = transport.request( + { + method: req.method, + protocol: targetUrl.protocol, + hostname: targetUrl.hostname, + port: targetUrl.port || undefined, + path: `${targetUrl.pathname}${targetUrl.search}`, + headers, + rejectUnauthorized: settings.upstream.verifySsl + }, + (upstreamResponse) => { + const stream = isEventStream(upstreamResponse.headers["content-type"]); + debugLog("RESPONSE HEADERS", { + status: upstreamResponse.statusCode, + headers: upstreamResponse.headers + }); - const responseHeaders = headersToObject(upstreamResponse.rawHeaders); - const respChunks = []; - upstreamResponse.on("data", (chunk) => { - respChunks.push(chunk); - }); + const responseHeaders = headersToObject(upstreamResponse.rawHeaders); + const respChunks = []; + upstreamResponse.on("data", (chunk) => { + respChunks.push(chunk); + }); - res.statusCode = upstreamResponse.statusCode || 502; - writeHeadersToResponse(res, upstreamResponse.rawHeaders); - upstreamResponse.pipe(res); - upstreamResponse.on("end", () => { - responseCompleted = true; - const responseBody = Buffer.concat(respChunks); - if (responseBody.length) { - debugLog("RESPONSE BODY", { - status: upstreamResponse.statusCode, - body: safeBodyPreview(responseBody) + res.statusCode = upstreamResponse.statusCode || 502; + writeHeadersToResponse(res, upstreamResponse.rawHeaders); + upstreamResponse.pipe(res); + upstreamResponse.on("end", () => { + responseCompleted = true; + const responseBody = Buffer.concat(respChunks); + if (responseBody.length) { + debugLog("RESPONSE BODY", { + status: upstreamResponse.statusCode, + body: safeBodyPreview(responseBody) + }); + } + finalizeCapture({ + responseStatus: upstreamResponse.statusCode || 502, + responseHeaders, + responseBody, + isStream: stream, + upstreamRequestId: typeof upstreamResponse.headers["x-request-id"] === "string" ? upstreamResponse.headers["x-request-id"] : null + }); + logFn("info", "Proxied request", { + request_id: requestId, + method: req.method || "GET", + path: req.url, + status: upstreamResponse.statusCode || 502, + stream, + duration_ms: Date.now() - startedAt, + retry_attempt: attemptNumber > 0 ? attemptNumber : undefined }); - } - finalizeCapture({ - responseStatus: upstreamResponse.statusCode || 502, - responseHeaders, - responseBody, - isStream: stream, - upstreamRequestId: typeof upstreamResponse.headers["x-request-id"] === "string" ? upstreamResponse.headers["x-request-id"] : null }); - logFn("info", "Proxied request", { + } + ); + + upstreamRequest.setTimeout(settings.upstream.timeoutMs, () => { + upstreamRequest.destroy(new Error("upstream timeout")); + }); + + upstreamRequest.on("error", (error) => { + const isRetryableError = ["ECONNREFUSED", "ETIMEDOUT", "ECONNRESET", "ENOTFOUND", "EAI_AGAIN"].includes(error.code) || error.message === "upstream timeout"; + const canRetry = isRetryableError && attemptNumber < settings.upstream.retryAttempts; + + if (canRetry) { + const delay = settings.upstream.retryDelayMs * Math.pow(2, attemptNumber); + logFn("warn", "Upstream request failed, retrying", { request_id: requestId, - method: req.method || "GET", - path: req.url, - status: upstreamResponse.statusCode || 502, - stream, - duration_ms: Date.now() - startedAt + attempt: attemptNumber + 1, + max_attempts: settings.upstream.retryAttempts, + retry_delay_ms: delay, + error: error.message, + error_code: error.code || "(none)" }); - }); - } - ); - - upstreamRequest.setTimeout(settings.upstream.timeoutMs, () => { - upstreamRequest.destroy(new Error("upstream timeout")); - }); + setTimeout(() => attemptUpstreamRequest(attemptNumber + 1), delay); + return; + } - upstreamRequest.on("error", (error) => { - const statusCode = error.message === "upstream timeout" ? 504 : 502; - const errorType = statusCode === 504 ? "proxy_timeout" : "proxy_upstream_error"; - const payload = { - error: { - message: statusCode === 504 ? "Upstream request timed out" : "Failed to reach upstream service", - type: errorType, - request_id: requestId + const statusCode = error.message === "upstream timeout" ? 504 : 502; + const errorType = statusCode === 504 ? "proxy_timeout" : "proxy_upstream_error"; + const payload = { + error: { + message: statusCode === 504 ? "Upstream request timed out" : "Failed to reach upstream service", + type: errorType, + request_id: requestId + } + }; + const responseBody = Buffer.from(JSON.stringify(payload)); + const responseHeaders = { + "content-type": "application/json; charset=utf-8", + "content-length": String(responseBody.length) + }; + + debugLog("UPSTREAM ERROR", { + error: error.message, + code: error.code || "(none)", + stack: error.stack, + attempts: attemptNumber + 1 + }); + if (!res.headersSent) { + writeJson(res, statusCode, payload); + } else { + res.destroy(error); } - }; - const responseBody = Buffer.from(JSON.stringify(payload)); - const responseHeaders = { - "content-type": "application/json; charset=utf-8", - "content-length": String(responseBody.length) - }; - - debugLog("UPSTREAM ERROR", { - error: error.message, - code: error.code || "(none)", - stack: error.stack - }); - if (!res.headersSent) { - writeJson(res, statusCode, payload); - } else { - res.destroy(error); - } - finalizeCapture({ - responseStatus: statusCode, - responseHeaders, - responseBody, - errorType, - errorMessage: error.message, - upstreamRequestId: null - }); - logFn("warn", "Proxy request failed", { - request_id: requestId, - method: req.method || "GET", - path: req.url, - status: statusCode, - duration_ms: Date.now() - startedAt, - error: JSON.stringify(error.message) + finalizeCapture({ + responseStatus: statusCode, + responseHeaders, + responseBody, + errorType, + errorMessage: error.message, + upstreamRequestId: null + }); + logFn("warn", "Proxy request failed after retries", { + request_id: requestId, + method: req.method || "GET", + path: req.url, + status: statusCode, + duration_ms: Date.now() - startedAt, + attempts: attemptNumber + 1, + error: JSON.stringify(error.message) + }); }); - }); - res.on("close", () => { - if (responseCompleted || res.writableFinished) { - return; - } - finalizeCapture({ - responseStatus: res.statusCode || null, - responseHeaders: {}, - responseBody: Buffer.alloc(0), - isStream: false, - upstreamRequestId: null, - errorType: "proxy_client_abort", - errorMessage: "Client closed connection" + res.on("close", () => { + if (responseCompleted || res.writableFinished) { + return; + } + upstreamRequest.destroy(); + finalizeCapture({ + responseStatus: res.statusCode || null, + responseHeaders: {}, + responseBody: Buffer.alloc(0), + isStream: false, + upstreamRequestId: null, + errorType: "proxy_client_abort", + errorMessage: "Client closed connection" + }); }); - }); - upstreamRequest.end(body); + upstreamRequest.end(body); + } + + attemptUpstreamRequest(0); }); }); }