From 8bd93ac281b0813a5802a4bb1c29ad40dcecd147 Mon Sep 17 00:00:00 2001 From: Alex Alecu Date: Thu, 7 May 2026 15:49:09 +0300 Subject: [PATCH 1/3] fix(code-review): preflight follow-up sessions --- .../src/cloud-agent-next-client.test.ts | 60 ++++ .../src/cloud-agent-next-client.ts | 88 +++++ packages/worker-utils/src/index.ts | 5 + services/cloud-agent-next/src/router.test.ts | 149 ++++++++ .../src/router/handlers/session-management.ts | 97 ++++++ .../cloud-agent-next/src/router/schemas.ts | 27 ++ .../src/code-review-orchestrator.ts | 85 +++-- .../code-review-orchestrator.test.ts | 326 ++++++++++++++++++ 8 files changed, 815 insertions(+), 22 deletions(-) diff --git a/packages/worker-utils/src/cloud-agent-next-client.test.ts b/packages/worker-utils/src/cloud-agent-next-client.test.ts index 9a842d0bd7..1b33146294 100644 --- a/packages/worker-utils/src/cloud-agent-next-client.test.ts +++ b/packages/worker-utils/src/cloud-agent-next-client.test.ts @@ -158,3 +158,63 @@ describe('CloudAgentNextFetchClient billing error detection', () => { ).rejects.not.toThrow(CloudAgentNextBillingError); }); }); + +describe('CloudAgentNextFetchClient getSessionHealth', () => { + it('posts to getSessionHealth and parses a healthy response', async () => { + const fetchMock = mockFetch(200, { + result: { + data: { + cloudAgentSessionId: 'agent_123', + sandboxId: 'ses-abc123', + sandboxStatus: 'healthy', + executionHealth: 'none', + }, + }, + }); + vi.stubGlobal('fetch', fetchMock); + const client = createCloudAgentNextFetchClient(BASE_URL); + + const result = await client.getSessionHealth( + { Authorization: 'Bearer token' }, + { cloudAgentSessionId: 'agent_123' } + ); + + expect(result).toEqual({ + cloudAgentSessionId: 'agent_123', + sandboxId: 'ses-abc123', + sandboxStatus: 'healthy', + executionHealth: 'none', + }); + expect(fetchMock).toHaveBeenCalledWith( + `${BASE_URL}/trpc/getSessionHealth`, + expect.objectContaining({ + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: 'Bearer token', + }, + body: JSON.stringify({ cloudAgentSessionId: 'agent_123' }), + }) + ); + }); + + it('rejects malformed health responses', async () => { + vi.stubGlobal( + 'fetch', + mockFetch(200, { + result: { + data: { + cloudAgentSessionId: 'agent_123', + sandboxStatus: 'on-fire', + executionHealth: 'none', + }, + }, + }) + ); + const client = createCloudAgentNextFetchClient(BASE_URL); + + await expect(client.getSessionHealth({}, { cloudAgentSessionId: 'agent_123' })).rejects.toThrow( + 'Unexpected getSessionHealth response shape' + ); + }); +}); diff --git a/packages/worker-utils/src/cloud-agent-next-client.ts b/packages/worker-utils/src/cloud-agent-next-client.ts index 736a046302..7bdef3d911 100644 --- a/packages/worker-utils/src/cloud-agent-next-client.ts +++ b/packages/worker-utils/src/cloud-agent-next-client.ts @@ -71,6 +71,30 @@ export type CloudAgentSendMessageOutput = { status?: string; }; +export type CloudAgentSessionHealthInput = { + cloudAgentSessionId: string; +}; + +export type CloudAgentSandboxStatus = 'healthy' | 'destroyed' | 'unreachable' | 'unknown'; + +export type CloudAgentSessionExecutionHealth = 'healthy' | 'unknown' | 'stale' | 'none'; + +export type CloudAgentActiveExecutionStatus = + | 'pending' + | 'running' + | 'completed' + | 'failed' + | 'interrupted'; + +export type CloudAgentSessionHealthOutput = { + cloudAgentSessionId: string; + sandboxId?: string; + sandboxStatus: CloudAgentSandboxStatus; + executionHealth: CloudAgentSessionExecutionHealth; + activeExecutionStatus?: CloudAgentActiveExecutionStatus; + activeExecutionId?: string; +}; + export type CloudAgentInterruptInput = { sessionId: string; }; @@ -128,6 +152,30 @@ function isBillingErrorBody(body: string): boolean { ); } +function isCloudAgentSandboxStatus(value: unknown): value is CloudAgentSandboxStatus { + return ( + value === 'healthy' || value === 'destroyed' || value === 'unreachable' || value === 'unknown' + ); +} + +function isCloudAgentSessionExecutionHealth( + value: unknown +): value is CloudAgentSessionExecutionHealth { + return value === 'healthy' || value === 'unknown' || value === 'stale' || value === 'none'; +} + +function isCloudAgentActiveExecutionStatus( + value: unknown +): value is CloudAgentActiveExecutionStatus { + return ( + value === 'pending' || + value === 'running' || + value === 'completed' || + value === 'failed' || + value === 'interrupted' + ); +} + /** * Parse a tRPC JSON-RPC envelope and return `result.data`, throwing on * non-200 responses or unexpected shapes. @@ -187,6 +235,11 @@ export type CloudAgentNextFetchClient = { input: CloudAgentSendMessageInput ): Promise; + getSessionHealth( + headers: Record, + input: CloudAgentSessionHealthInput + ): Promise; + interruptSession( headers: Record, input: CloudAgentInterruptInput @@ -253,6 +306,41 @@ export function createCloudAgentNextFetchClient(baseUrl: string): CloudAgentNext return data as unknown as CloudAgentSendMessageOutput; }, + async getSessionHealth(headers, input) { + const data = await trpcPost>( + trpc('getSessionHealth'), + headers, + input, + 'getSessionHealth' + ); + + if ( + typeof data.cloudAgentSessionId !== 'string' || + !isCloudAgentSandboxStatus(data.sandboxStatus) || + !isCloudAgentSessionExecutionHealth(data.executionHealth) || + (data.sandboxId !== undefined && typeof data.sandboxId !== 'string') || + (data.activeExecutionId !== undefined && typeof data.activeExecutionId !== 'string') || + (data.activeExecutionStatus !== undefined && + !isCloudAgentActiveExecutionStatus(data.activeExecutionStatus)) + ) { + throw new Error( + `Unexpected getSessionHealth response shape: ${JSON.stringify(data).slice(0, 500)}` + ); + } + + const health: CloudAgentSessionHealthOutput = { + cloudAgentSessionId: data.cloudAgentSessionId, + sandboxStatus: data.sandboxStatus, + executionHealth: data.executionHealth, + }; + if (data.sandboxId !== undefined) health.sandboxId = data.sandboxId; + if (data.activeExecutionId !== undefined) health.activeExecutionId = data.activeExecutionId; + if (data.activeExecutionStatus !== undefined) { + health.activeExecutionStatus = data.activeExecutionStatus; + } + return health; + }, + async interruptSession(headers, input) { return trpcPost( trpc('interruptSession'), diff --git a/packages/worker-utils/src/index.ts b/packages/worker-utils/src/index.ts index 85b5a6479a..82e39a5df1 100644 --- a/packages/worker-utils/src/index.ts +++ b/packages/worker-utils/src/index.ts @@ -37,6 +37,11 @@ export type { CloudAgentUpdateSessionInput, CloudAgentSendMessageInput, CloudAgentSendMessageOutput, + CloudAgentSessionHealthInput, + CloudAgentSessionHealthOutput, + CloudAgentSandboxStatus, + CloudAgentSessionExecutionHealth, + CloudAgentActiveExecutionStatus, CloudAgentInterruptInput, CloudAgentInterruptOutput, } from './cloud-agent-next-client.js'; diff --git a/services/cloud-agent-next/src/router.test.ts b/services/cloud-agent-next/src/router.test.ts index 386d9603e1..aaef8e50ae 100644 --- a/services/cloud-agent-next/src/router.test.ts +++ b/services/cloud-agent-next/src/router.test.ts @@ -897,6 +897,155 @@ describe('router sessionId validation', () => { }); }); + describe('getSessionHealth procedure', () => { + let mockContext: TRPCContext; + let caller: ReturnType; + let cloudAgentSession: MockCAS; + let mockGetMetadata: ReturnType; + let mockGetActiveExecutionId: ReturnType; + let mockGetExecution: ReturnType; + let mockListProcesses: ReturnType; + + beforeEach(() => { + vi.clearAllMocks(); + + mockGetMetadata = vi.fn(); + mockGetActiveExecutionId = vi.fn().mockResolvedValue(null); + mockGetExecution = vi.fn().mockResolvedValue(null); + mockListProcesses = vi.fn().mockResolvedValue([]); + + mockContext = { + userId: 'test-user-123', + authToken: 'test-token', + botId: undefined, + request: {} as Request, + env: { + Sandbox: {} as TRPCContext['env']['Sandbox'], + SandboxSmall: {} as TRPCContext['env']['SandboxSmall'], + CLOUD_AGENT_SESSION: { + idFromName: vi.fn((id: string) => ({ id })), + get: vi.fn(() => ({ + getMetadata: mockGetMetadata, + getActiveExecutionId: mockGetActiveExecutionId, + getExecution: mockGetExecution, + })), + } as unknown as TRPCContext['env']['CLOUD_AGENT_SESSION'], + SESSION_INGEST: { + fetch: vi.fn(), + } as unknown as TRPCContext['env']['SESSION_INGEST'], + R2_BUCKET: {} as TRPCContext['env']['R2_BUCKET'], + GIT_TOKEN_SERVICE: {} as Env['GIT_TOKEN_SERVICE'], + NEXTAUTH_SECRET: 'test-secret', + INTERNAL_API_SECRET_PROD: { + get: vi.fn().mockResolvedValue('test-secret'), + } as unknown as TRPCContext['env']['INTERNAL_API_SECRET_PROD'], + HYPERDRIVE: { + connectionString: 'postgresql://test', + } as unknown as TRPCContext['env']['HYPERDRIVE'], + }, + }; + cloudAgentSession = mockContext.env.CLOUD_AGENT_SESSION as unknown as MockCAS; + vi.mocked(getSandbox).mockReturnValue({ + listProcesses: mockListProcesses, + } as unknown as ReturnType); + caller = appRouter.createCaller(mockContext); + }); + + it('returns healthy sandbox and none execution health when no execution is active', async () => { + const sessionId: SessionId = 'agent_88888888-8888-8888-8888-888888888888'; + const sandboxId = 'ses-a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6'; + mockGetMetadata.mockResolvedValue({ + version: 123456789, + sessionId, + userId: 'test-user-123', + timestamp: 123456789, + sandboxId, + } satisfies CloudAgentSessionState); + + const result = await caller.getSessionHealth({ cloudAgentSessionId: sessionId }); + + expect(result).toEqual({ + cloudAgentSessionId: sessionId, + sandboxId, + sandboxStatus: 'healthy', + executionHealth: 'none', + activeExecutionId: undefined, + activeExecutionStatus: undefined, + }); + expect(cloudAgentSession.idFromName).toHaveBeenCalledWith(`test-user-123:${sessionId}`); + expect(getSandbox).toHaveBeenCalledWith(mockContext.env.SandboxSmall, sandboxId); + expect(mockListProcesses).toHaveBeenCalled(); + }); + + it('returns stale execution health for a stale running execution', async () => { + const sessionId: SessionId = 'agent_99999999-9999-9999-9999-999999999999'; + const activeExecutionId = 'exc_stale_execution'; + mockGetMetadata.mockResolvedValue({ + version: 123456789, + sessionId, + orgId: 'org-123', + userId: 'test-user-123', + timestamp: 123456789, + } satisfies CloudAgentSessionState); + mockGetActiveExecutionId.mockResolvedValue(activeExecutionId); + mockGetExecution.mockResolvedValue({ + executionId: activeExecutionId, + status: 'running', + startedAt: Date.now() - 20 * 60 * 1000, + mode: 'code', + streamingMode: 'websocket', + lastHeartbeat: Date.now() - 11 * 60 * 1000, + }); + + const result = await caller.getSessionHealth({ cloudAgentSessionId: sessionId }); + + expect(result).toMatchObject({ + cloudAgentSessionId: sessionId, + sandboxStatus: 'healthy', + executionHealth: 'stale', + activeExecutionId, + activeExecutionStatus: 'running', + }); + expect(mockGetExecution).toHaveBeenCalledWith(activeExecutionId); + }); + + it('returns NOT_FOUND for missing session metadata', async () => { + const sessionId: SessionId = 'agent_aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'; + mockGetMetadata.mockResolvedValue(null); + + await expect(caller.getSessionHealth({ cloudAgentSessionId: sessionId })).rejects.toThrow( + 'Session not found' + ); + expect(getSandbox).not.toHaveBeenCalled(); + }); + + it('returns unreachable when sandbox process listing fails', async () => { + const sessionId: SessionId = 'agent_bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb'; + const sandboxId = 'ses-b1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6'; + mockGetMetadata.mockResolvedValue({ + version: 123456789, + sessionId, + userId: 'test-user-123', + timestamp: 123456789, + sandboxId, + githubToken: 'secret-token-should-not-be-returned', + } satisfies CloudAgentSessionState); + mockListProcesses.mockRejectedValue(new Error('sandbox unavailable')); + + const result = await caller.getSessionHealth({ cloudAgentSessionId: sessionId }); + + expect(result).toEqual({ + cloudAgentSessionId: sessionId, + sandboxId, + sandboxStatus: 'unreachable', + executionHealth: 'none', + activeExecutionId: undefined, + activeExecutionStatus: undefined, + }); + expect(result).not.toHaveProperty('githubToken'); + }); + }); + describe('getLatestAssistantMessage procedure', () => { let mockContext: TRPCContext; let caller: ReturnType; diff --git a/services/cloud-agent-next/src/router/handlers/session-management.ts b/services/cloud-agent-next/src/router/handlers/session-management.ts index ea5677cffd..cfe5981297 100644 --- a/services/cloud-agent-next/src/router/handlers/session-management.ts +++ b/services/cloud-agent-next/src/router/handlers/session-management.ts @@ -19,10 +19,13 @@ import { sessionIdSchema, GetSessionInput, GetSessionOutput, + GetSessionHealthInput, + GetSessionHealthOutput, GetLatestAssistantMessageInput, GetLatestAssistantMessageOutput, } from '../schemas.js'; import { computeExecutionHealth } from '../../core/execution.js'; +import type { ExecutionMetadata } from '../../session/types.js'; /** * Creates session management handlers. @@ -469,6 +472,100 @@ export function createSessionManagementHandlers() { }); }), + getSessionHealth: protectedProcedure + .input(GetSessionHealthInput) + .output(GetSessionHealthOutput) + .query(async ({ input, ctx }) => { + return withLogTags({ source: 'getSessionHealth' }, async () => { + const sessionId = input.cloudAgentSessionId as SessionId; + const { userId, env } = ctx; + + logger.setTags({ userId, sessionId }); + logger.info('Fetching session health'); + + const doKey = `${userId}:${sessionId}`; + const getStub = () => + env.CLOUD_AGENT_SESSION.get(env.CLOUD_AGENT_SESSION.idFromName(doKey)); + + const metadata = await withDORetry< + ReturnType, + CloudAgentSessionState | null + >(getStub, s => s.getMetadata(), 'getMetadata'); + + if (!metadata) { + logger.info('Session not found'); + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'Session not found', + }); + } + + const sandboxId: SandboxId = + metadata.sandboxId ?? + (await generateSandboxId( + env.PER_SESSION_SANDBOX_ORG_IDS, + metadata.orgId, + userId, + metadata.sessionId, + metadata.botId + )); + + logger.setTags({ sandboxId, orgId: metadata.orgId ?? '(personal)' }); + + const activeExecutionId = await withDORetry( + getStub, + s => s.getActiveExecutionId(), + 'getActiveExecutionId' + ); + + let activeExecution: ExecutionMetadata | null = null; + if (activeExecutionId) { + activeExecution = await withDORetry( + getStub, + s => s.getExecution(activeExecutionId), + 'getExecution' + ); + } + + const executionHealth = activeExecutionId + ? activeExecution + ? (computeExecutionHealth( + activeExecution.status, + activeExecution.startedAt, + activeExecution.lastHeartbeat + ) ?? 'unknown') + : 'unknown' + : 'none'; + + const sandbox = getSandbox(getSandboxNamespace(env, sandboxId), sandboxId); + let sandboxStatus: 'healthy' | 'unreachable' = 'healthy'; + try { + await sandbox.listProcesses(); + } catch (error) { + sandboxStatus = 'unreachable'; + logger + .withFields({ error: error instanceof Error ? error.message : String(error) }) + .warn('Sandbox health probe failed'); + } + + logger.info('Session health retrieved successfully', { + sandboxStatus, + executionHealth, + activeExecutionId: activeExecutionId ?? undefined, + activeExecutionStatus: activeExecution?.status, + }); + + return { + cloudAgentSessionId: sessionId, + sandboxId, + sandboxStatus, + executionHealth, + activeExecutionId: activeExecutionId ?? undefined, + activeExecutionStatus: activeExecution?.status, + }; + }); + }), + getLatestAssistantMessage: protectedProcedure .input(GetLatestAssistantMessageInput) .output(GetLatestAssistantMessageOutput) diff --git a/services/cloud-agent-next/src/router/schemas.ts b/services/cloud-agent-next/src/router/schemas.ts index c8a4ad6dff..d81024cceb 100644 --- a/services/cloud-agent-next/src/router/schemas.ts +++ b/services/cloud-agent-next/src/router/schemas.ts @@ -401,6 +401,33 @@ export const GetSessionInput = z.object({ cloudAgentSessionId: sessionIdSchema.describe('Cloud-agent session ID to retrieve'), }); +export const SandboxStatusSchema = z + .enum(['healthy', 'destroyed', 'unreachable', 'unknown']) + .describe('Sandbox reachability status for the session container'); + +export const SessionHealthExecutionSchema = z + .enum(['healthy', 'unknown', 'stale', 'none']) + .describe('Health status for the active execution, or none when no execution is active'); + +export const ActiveExecutionStatusSchema = z + .enum(['pending', 'running', 'completed', 'failed', 'interrupted']) + .describe('Current status of the active execution'); + +export const GetSessionHealthInput = z.object({ + cloudAgentSessionId: sessionIdSchema.describe('Cloud-agent session ID to inspect'), +}); + +export const GetSessionHealthOutput = z.object({ + cloudAgentSessionId: z.string().describe('Cloud-agent session ID'), + sandboxId: z.string().optional().describe('Sandbox ID for the session'), + sandboxStatus: SandboxStatusSchema, + executionHealth: SessionHealthExecutionSchema, + activeExecutionStatus: ActiveExecutionStatusSchema.optional(), + activeExecutionId: z.string().optional(), +}); + +export type GetSessionHealthResponse = z.infer; + /** * Output schema for getSession endpoint. * Returns sanitized session metadata with lifecycle timestamps for idempotency. diff --git a/services/code-review-infra/src/code-review-orchestrator.ts b/services/code-review-infra/src/code-review-orchestrator.ts index 0aa69ee3c6..1716d79b9c 100644 --- a/services/code-review-infra/src/code-review-orchestrator.ts +++ b/services/code-review-infra/src/code-review-orchestrator.ts @@ -11,6 +11,7 @@ import { createCloudAgentNextFetchClient, CloudAgentNextBillingError, type CloudAgentNextFetchClient, + type CloudAgentSessionHealthOutput, type CloudAgentTerminalReason, } from '@kilocode/worker-utils'; import type { @@ -26,6 +27,10 @@ import { InternalStatusResponseSchema } from './types'; type UpdateStatusResult = 'updated' | 'db-terminal'; +function canContinueCloudAgentNextSession(health: CloudAgentSessionHealthOutput): boolean { + return health.sandboxStatus === 'healthy' && health.executionHealth !== 'stale'; +} + /** Shape of an SSE event parsed from the cloud agent stream */ type SseEventPayload = { say?: string; @@ -110,6 +115,23 @@ export class CodeReviewOrchestrator extends DurableObject { return this.cloudAgentNextClient; } + private async runFreshCloudAgentNextFallback(previousSessionId: string): Promise { + this.state.previousCloudAgentSessionId = undefined; + + try { + await this.runWithCloudAgentNext(); + } catch (freshError) { + // runWithCloudAgentNext handles its own error/status updates, so this catch + // is only for unexpected throws that bypass its internal error handling. + const freshErrorMessage = freshError instanceof Error ? freshError.message : 'Unknown error'; + console.error('[CodeReviewOrchestrator] Fresh session fallback also failed', { + reviewId: this.state.reviewId, + previousCloudAgentSessionId: previousSessionId, + error: freshErrorMessage, + }); + } + } + /** * Alarm handler for review recovery and scheduled cleanup tasks. */ @@ -838,6 +860,45 @@ export class CodeReviewOrchestrator extends DurableObject { const statusUpdateResult = await this.updateStatus('running'); if (statusUpdateResult === 'db-terminal') return; + const userHeaders: Record = { + Authorization: `Bearer ${this.state.authToken}`, + }; + if (this.state.skipBalanceCheck) { + userHeaders['x-skip-balance-check'] = 'true'; + } + + let health: CloudAgentSessionHealthOutput; + try { + health = await client.getSessionHealth(userHeaders, { + cloudAgentSessionId: previousSessionId, + }); + } catch (error) { + if (error instanceof CloudAgentNextBillingError) { + throw error; + } + + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + console.warn('[CodeReviewOrchestrator] Session health preflight failed', { + reviewId: this.state.reviewId, + previousCloudAgentSessionId: previousSessionId, + error: errorMessage, + }); + await this.runFreshCloudAgentNextFallback(previousSessionId); + return; + } + + if (!canContinueCloudAgentNextSession(health)) { + console.warn('[CodeReviewOrchestrator] Previous cloud-agent-next session is unhealthy', { + reviewId: this.state.reviewId, + previousCloudAgentSessionId: previousSessionId, + sandboxStatus: health.sandboxStatus, + executionHealth: health.executionHealth, + activeExecutionId: health.activeExecutionId, + }); + await this.runFreshCloudAgentNextFallback(previousSessionId); + return; + } + // Build internal headers (internalApiProtectedProcedure — API key + Bearer token) const internalHeaders: Record = { Authorization: `Bearer ${this.state.authToken}`, @@ -863,13 +924,6 @@ export class CodeReviewOrchestrator extends DurableObject { }); // Step 2: Send follow-up message (user-facing, no callbackTarget) - const userHeaders: Record = { - Authorization: `Bearer ${this.state.authToken}`, - }; - if (this.state.skipBalanceCheck) { - userHeaders['x-skip-balance-check'] = 'true'; - } - console.log('[CodeReviewOrchestrator] Calling sendMessageV2', { reviewId: this.state.reviewId, cloudAgentSessionId: previousSessionId, @@ -908,7 +962,7 @@ export class CodeReviewOrchestrator extends DurableObject { }); console.warn( - '[CodeReviewOrchestrator] sendMessageV2 billing failure, skipping fresh session fallback', + '[CodeReviewOrchestrator] cloud-agent-next billing failure, skipping fresh session fallback', { reviewId: this.state.reviewId, previousCloudAgentSessionId: previousSessionId, @@ -928,20 +982,7 @@ export class CodeReviewOrchestrator extends DurableObject { // Reset status to running (it may have been set to running already, but ensure clean state) // Clear previousCloudAgentSessionId so the fresh session path doesn't try followup again - this.state.previousCloudAgentSessionId = undefined; - - try { - await this.runWithCloudAgentNext(); - } catch (freshError) { - // runWithCloudAgentNext handles its own error/status updates, so this catch - // is only for unexpected throws that bypass its internal error handling - const freshErrorMessage = - freshError instanceof Error ? freshError.message : 'Unknown error'; - console.error('[CodeReviewOrchestrator] Fresh session fallback also failed', { - reviewId: this.state.reviewId, - error: freshErrorMessage, - }); - } + await this.runFreshCloudAgentNextFallback(previousSessionId); } } diff --git a/services/code-review-infra/test/integration/code-review-orchestrator.test.ts b/services/code-review-infra/test/integration/code-review-orchestrator.test.ts index 14587aa003..a8eb29292e 100644 --- a/services/code-review-infra/test/integration/code-review-orchestrator.test.ts +++ b/services/code-review-infra/test/integration/code-review-orchestrator.test.ts @@ -39,6 +39,14 @@ function workerAuthHeaders(): HeadersInit { return { Authorization: 'Bearer test-backend-token' }; } +function hasFetchCall(fetchMock: ReturnType, path: string): boolean { + return fetchMock.mock.calls.some(([request]) => String(request).includes(path)); +} + +function getFetchCall(fetchMock: ReturnType, path: string) { + return fetchMock.mock.calls.find(([request]) => String(request).includes(path)); +} + describe('CodeReviewOrchestrator recovery', () => { const originalFetch = globalThis.fetch; @@ -145,6 +153,324 @@ describe('CodeReviewOrchestrator recovery', () => { ); }); + it('continues a healthy previous cloud-agent-next session for follow-up reviews', async () => { + const stub = getReviewStub(); + const previousSessionId = 'agent_previous_session'; + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + if (url.includes('/trpc/getSessionHealth')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: previousSessionId, + sandboxId: 'ses-healthy', + sandboxStatus: 'healthy', + executionHealth: 'none', + }, + }, + }); + } + if (url.includes('/trpc/updateSession')) { + return Response.json({ result: { data: { success: true } } }); + } + if (url.includes('/trpc/sendMessageV2')) { + return Response.json({ + result: { data: { executionId: 'exec-followup', status: 'running' } }, + }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + previousCloudAgentSessionId: previousSessionId, + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const status = await stub.status(); + expect(status).toMatchObject({ + status: 'running', + sessionId: previousSessionId, + }); + expect(status.cliSessionId).toBeUndefined(); + expect(hasFetchCall(fetchMock, '/trpc/getSessionHealth')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/updateSession')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/sendMessageV2')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/initiateFromKilocodeSessionV2')).toBe(false); + }); + + it('skips continuation and prepares a fresh session when previous sandbox is unreachable', async () => { + const stub = getReviewStub(); + const previousSessionId = 'agent_previous_unreachable'; + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + if (url.includes('/trpc/getSessionHealth')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: previousSessionId, + sandboxStatus: 'unreachable', + executionHealth: 'none', + }, + }, + }); + } + if (url.includes('/trpc/prepareSession')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: 'agent-fresh-session', + kiloSessionId: 'ses_fresh_session', + }, + }, + }); + } + if (url.includes('/trpc/initiateFromKilocodeSessionV2')) { + return Response.json({ + result: { data: { executionId: 'exec-fresh', status: 'running' } }, + }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + previousCloudAgentSessionId: previousSessionId, + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const status = await stub.status(); + expect(status).toMatchObject({ + status: 'running', + sessionId: 'agent-fresh-session', + cliSessionId: 'ses_fresh_session', + }); + expect(hasFetchCall(fetchMock, '/trpc/getSessionHealth')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/updateSession')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/sendMessageV2')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/initiateFromKilocodeSessionV2')).toBe(true); + }); + + it('skips continuation and prepares a fresh session when previous execution is stale', async () => { + const stub = getReviewStub(); + const previousSessionId = 'agent_previous_stale'; + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + if (url.includes('/trpc/getSessionHealth')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: previousSessionId, + sandboxStatus: 'healthy', + executionHealth: 'stale', + activeExecutionId: 'exec-stale', + }, + }, + }); + } + if (url.includes('/trpc/prepareSession')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: 'agent-fresh-stale', + kiloSessionId: 'ses_fresh_stale', + }, + }, + }); + } + if (url.includes('/trpc/initiateFromKilocodeSessionV2')) { + return Response.json({ + result: { data: { executionId: 'exec-fresh', status: 'running' } }, + }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + previousCloudAgentSessionId: previousSessionId, + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const status = await stub.status(); + expect(status).toMatchObject({ + status: 'running', + sessionId: 'agent-fresh-stale', + cliSessionId: 'ses_fresh_stale', + }); + expect(hasFetchCall(fetchMock, '/trpc/updateSession')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/sendMessageV2')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(true); + }); + + it('falls back to a fresh session when health preflight returns an error', async () => { + const stub = getReviewStub(); + const previousSessionId = 'agent_previous_missing'; + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + if (url.includes('/trpc/getSessionHealth')) { + return new Response('Session not found', { status: 404 }); + } + if (url.includes('/trpc/prepareSession')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: 'agent-fresh-after-error', + kiloSessionId: 'ses_fresh_after_error', + }, + }, + }); + } + if (url.includes('/trpc/initiateFromKilocodeSessionV2')) { + return Response.json({ + result: { data: { executionId: 'exec-fresh', status: 'running' } }, + }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + previousCloudAgentSessionId: previousSessionId, + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const status = await stub.status(); + expect(status).toMatchObject({ + status: 'running', + sessionId: 'agent-fresh-after-error', + cliSessionId: 'ses_fresh_after_error', + }); + expect(hasFetchCall(fetchMock, '/trpc/getSessionHealth')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/updateSession')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/sendMessageV2')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(true); + }); + + it('falls back to a fresh session when sendMessageV2 fails after healthy preflight', async () => { + const stub = getReviewStub(); + const previousSessionId = 'agent_previous_send_failure'; + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + if (url.includes('/trpc/getSessionHealth')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: previousSessionId, + sandboxStatus: 'healthy', + executionHealth: 'none', + }, + }, + }); + } + if (url.includes('/trpc/updateSession')) { + return Response.json({ result: { data: { success: true } } }); + } + if (url.includes('/trpc/sendMessageV2')) { + return new Response('Session not found', { status: 404 }); + } + if (url.includes('/trpc/prepareSession')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: 'agent-fresh-after-send-failure', + kiloSessionId: 'ses_fresh_after_send_failure', + }, + }, + }); + } + if (url.includes('/trpc/initiateFromKilocodeSessionV2')) { + return Response.json({ + result: { data: { executionId: 'exec-fresh', status: 'running' } }, + }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + previousCloudAgentSessionId: previousSessionId, + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const status = await stub.status(); + expect(status).toMatchObject({ + status: 'running', + sessionId: 'agent-fresh-after-send-failure', + cliSessionId: 'ses_fresh_after_send_failure', + }); + expect(hasFetchCall(fetchMock, '/trpc/getSessionHealth')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/updateSession')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/sendMessageV2')).toBe(true); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(true); + + const updateCall = getFetchCall(fetchMock, '/trpc/updateSession'); + const updateBody = JSON.parse(String(updateCall?.[1]?.body)); + expect(updateBody).toMatchObject({ + cloudAgentSessionId: previousSessionId, + callbackTarget: { + url: expect.stringContaining('/api/internal/code-review-status/'), + }, + }); + }); + it('aborts alarm recovery before cloud-agent calls when DB is already terminal', async () => { const stub = getReviewStub(); const fetchMock = vi.fn(async (request: RequestInfo | URL) => { From 50a2d32288b9ee871ded4b7a71a81982f6a57ecb Mon Sep 17 00:00:00 2001 From: Alex Alecu Date: Thu, 7 May 2026 16:40:14 +0300 Subject: [PATCH 2/3] fix(cloud-agent-next): accept health preflight posts --- .../cloud-agent-next/src/router/handlers/session-management.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/cloud-agent-next/src/router/handlers/session-management.ts b/services/cloud-agent-next/src/router/handlers/session-management.ts index cfe5981297..272fe36c58 100644 --- a/services/cloud-agent-next/src/router/handlers/session-management.ts +++ b/services/cloud-agent-next/src/router/handlers/session-management.ts @@ -475,7 +475,7 @@ export function createSessionManagementHandlers() { getSessionHealth: protectedProcedure .input(GetSessionHealthInput) .output(GetSessionHealthOutput) - .query(async ({ input, ctx }) => { + .mutation(async ({ input, ctx }) => { return withLogTags({ source: 'getSessionHealth' }, async () => { const sessionId = input.cloudAgentSessionId as SessionId; const { userId, env } = ctx; From 1dde2a1b222068450f7f929c18fe072c6c7f826d Mon Sep 17 00:00:00 2001 From: Alex Alecu Date: Thu, 7 May 2026 18:31:07 +0300 Subject: [PATCH 3/3] fix(review): avoid active follow-ups --- .../src/code-review-orchestrator.ts | 6 +- .../code-review-orchestrator.test.ts | 64 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/services/code-review-infra/src/code-review-orchestrator.ts b/services/code-review-infra/src/code-review-orchestrator.ts index 1716d79b9c..d3d41686db 100644 --- a/services/code-review-infra/src/code-review-orchestrator.ts +++ b/services/code-review-infra/src/code-review-orchestrator.ts @@ -28,7 +28,11 @@ import { InternalStatusResponseSchema } from './types'; type UpdateStatusResult = 'updated' | 'db-terminal'; function canContinueCloudAgentNextSession(health: CloudAgentSessionHealthOutput): boolean { - return health.sandboxStatus === 'healthy' && health.executionHealth !== 'stale'; + return ( + health.sandboxStatus === 'healthy' && + health.executionHealth === 'none' && + health.activeExecutionId === undefined + ); } /** Shape of an SSE event parsed from the cloud agent stream */ diff --git a/services/code-review-infra/test/integration/code-review-orchestrator.test.ts b/services/code-review-infra/test/integration/code-review-orchestrator.test.ts index a8eb29292e..b9ab9401d5 100644 --- a/services/code-review-infra/test/integration/code-review-orchestrator.test.ts +++ b/services/code-review-infra/test/integration/code-review-orchestrator.test.ts @@ -338,6 +338,70 @@ describe('CodeReviewOrchestrator recovery', () => { expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(true); }); + it('skips continuation and prepares a fresh session when previous execution is active', async () => { + const stub = getReviewStub(); + const previousSessionId = 'agent_previous_active'; + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + if (url.includes('/trpc/getSessionHealth')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: previousSessionId, + sandboxStatus: 'healthy', + executionHealth: 'healthy', + activeExecutionId: 'exec-active', + activeExecutionStatus: 'running', + }, + }, + }); + } + if (url.includes('/trpc/prepareSession')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: 'agent-fresh-active', + kiloSessionId: 'ses_fresh_active', + }, + }, + }); + } + if (url.includes('/trpc/initiateFromKilocodeSessionV2')) { + return Response.json({ + result: { data: { executionId: 'exec-fresh', status: 'running' } }, + }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + previousCloudAgentSessionId: previousSessionId, + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const status = await stub.status(); + expect(status).toMatchObject({ + status: 'running', + sessionId: 'agent-fresh-active', + cliSessionId: 'ses_fresh_active', + }); + expect(hasFetchCall(fetchMock, '/trpc/updateSession')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/sendMessageV2')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(true); + }); + it('falls back to a fresh session when health preflight returns an error', async () => { const stub = getReviewStub(); const previousSessionId = 'agent_previous_missing';