Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions packages/worker-utils/src/cloud-agent-next-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,63 @@ describe('CloudAgentNextFetchClient billing error detection', () => {
).rejects.not.toThrow(CloudAgentNextBillingError);
});
});

describe('CloudAgentNextFetchClient getSessionHealth', () => {
it('posts to getSessionHealth and parses a healthy response', async () => {
const fetchMock = mockFetch(200, {
result: {
data: {
cloudAgentSessionId: 'agent_123',
sandboxId: 'ses-abc123',
sandboxStatus: 'healthy',
executionHealth: 'none',
},
},
});
vi.stubGlobal('fetch', fetchMock);
const client = createCloudAgentNextFetchClient(BASE_URL);

const result = await client.getSessionHealth(
{ Authorization: 'Bearer token' },
{ cloudAgentSessionId: 'agent_123' }
);

expect(result).toEqual({
cloudAgentSessionId: 'agent_123',
sandboxId: 'ses-abc123',
sandboxStatus: 'healthy',
executionHealth: 'none',
});
expect(fetchMock).toHaveBeenCalledWith(
`${BASE_URL}/trpc/getSessionHealth`,
expect.objectContaining({
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer token',
},
body: JSON.stringify({ cloudAgentSessionId: 'agent_123' }),
})
);
});

it('rejects malformed health responses', async () => {
vi.stubGlobal(
'fetch',
mockFetch(200, {
result: {
data: {
cloudAgentSessionId: 'agent_123',
sandboxStatus: 'on-fire',
executionHealth: 'none',
},
},
})
);
const client = createCloudAgentNextFetchClient(BASE_URL);

await expect(client.getSessionHealth({}, { cloudAgentSessionId: 'agent_123' })).rejects.toThrow(
'Unexpected getSessionHealth response shape'
);
});
});
88 changes: 88 additions & 0 deletions packages/worker-utils/src/cloud-agent-next-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,30 @@ export type CloudAgentSendMessageOutput = {
status?: string;
};

export type CloudAgentSessionHealthInput = {
cloudAgentSessionId: string;
};

export type CloudAgentSandboxStatus = 'healthy' | 'destroyed' | 'unreachable' | 'unknown';

export type CloudAgentSessionExecutionHealth = 'healthy' | 'unknown' | 'stale' | 'none';

export type CloudAgentActiveExecutionStatus =
| 'pending'
| 'running'
| 'completed'
| 'failed'
| 'interrupted';

export type CloudAgentSessionHealthOutput = {
cloudAgentSessionId: string;
sandboxId?: string;
sandboxStatus: CloudAgentSandboxStatus;
executionHealth: CloudAgentSessionExecutionHealth;
activeExecutionStatus?: CloudAgentActiveExecutionStatus;
activeExecutionId?: string;
};

export type CloudAgentInterruptInput = {
sessionId: string;
};
Expand Down Expand Up @@ -128,6 +152,30 @@ function isBillingErrorBody(body: string): boolean {
);
}

function isCloudAgentSandboxStatus(value: unknown): value is CloudAgentSandboxStatus {
return (
value === 'healthy' || value === 'destroyed' || value === 'unreachable' || value === 'unknown'
);
}

function isCloudAgentSessionExecutionHealth(
value: unknown
): value is CloudAgentSessionExecutionHealth {
return value === 'healthy' || value === 'unknown' || value === 'stale' || value === 'none';
}

function isCloudAgentActiveExecutionStatus(
value: unknown
): value is CloudAgentActiveExecutionStatus {
return (
value === 'pending' ||
value === 'running' ||
value === 'completed' ||
value === 'failed' ||
value === 'interrupted'
);
}

/**
* Parse a tRPC JSON-RPC envelope and return `result.data`, throwing on
* non-200 responses or unexpected shapes.
Expand Down Expand Up @@ -187,6 +235,11 @@ export type CloudAgentNextFetchClient = {
input: CloudAgentSendMessageInput
): Promise<CloudAgentSendMessageOutput>;

getSessionHealth(
headers: Record<string, string>,
input: CloudAgentSessionHealthInput
): Promise<CloudAgentSessionHealthOutput>;

interruptSession(
headers: Record<string, string>,
input: CloudAgentInterruptInput
Expand Down Expand Up @@ -253,6 +306,41 @@ export function createCloudAgentNextFetchClient(baseUrl: string): CloudAgentNext
return data as unknown as CloudAgentSendMessageOutput;
},

async getSessionHealth(headers, input) {
const data = await trpcPost<Record<string, unknown>>(
trpc('getSessionHealth'),
headers,
input,
'getSessionHealth'
);

if (
typeof data.cloudAgentSessionId !== 'string' ||
!isCloudAgentSandboxStatus(data.sandboxStatus) ||
!isCloudAgentSessionExecutionHealth(data.executionHealth) ||
(data.sandboxId !== undefined && typeof data.sandboxId !== 'string') ||
(data.activeExecutionId !== undefined && typeof data.activeExecutionId !== 'string') ||
(data.activeExecutionStatus !== undefined &&
!isCloudAgentActiveExecutionStatus(data.activeExecutionStatus))
) {
throw new Error(
`Unexpected getSessionHealth response shape: ${JSON.stringify(data).slice(0, 500)}`
);
}

const health: CloudAgentSessionHealthOutput = {
cloudAgentSessionId: data.cloudAgentSessionId,
sandboxStatus: data.sandboxStatus,
executionHealth: data.executionHealth,
};
if (data.sandboxId !== undefined) health.sandboxId = data.sandboxId;
if (data.activeExecutionId !== undefined) health.activeExecutionId = data.activeExecutionId;
if (data.activeExecutionStatus !== undefined) {
health.activeExecutionStatus = data.activeExecutionStatus;
}
return health;
},

async interruptSession(headers, input) {
return trpcPost<CloudAgentInterruptOutput>(
trpc('interruptSession'),
Expand Down
5 changes: 5 additions & 0 deletions packages/worker-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ export type {
CloudAgentUpdateSessionInput,
CloudAgentSendMessageInput,
CloudAgentSendMessageOutput,
CloudAgentSessionHealthInput,
CloudAgentSessionHealthOutput,
CloudAgentSandboxStatus,
CloudAgentSessionExecutionHealth,
CloudAgentActiveExecutionStatus,
CloudAgentInterruptInput,
CloudAgentInterruptOutput,
} from './cloud-agent-next-client.js';
Expand Down
149 changes: 149 additions & 0 deletions services/cloud-agent-next/src/router.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,155 @@ describe('router sessionId validation', () => {
});
});

describe('getSessionHealth procedure', () => {
let mockContext: TRPCContext;
let caller: ReturnType<typeof appRouter.createCaller>;
let cloudAgentSession: MockCAS;
let mockGetMetadata: ReturnType<typeof vi.fn>;
let mockGetActiveExecutionId: ReturnType<typeof vi.fn>;
let mockGetExecution: ReturnType<typeof vi.fn>;
let mockListProcesses: ReturnType<typeof vi.fn>;

beforeEach(() => {
vi.clearAllMocks();

mockGetMetadata = vi.fn();
mockGetActiveExecutionId = vi.fn().mockResolvedValue(null);
mockGetExecution = vi.fn().mockResolvedValue(null);
mockListProcesses = vi.fn().mockResolvedValue([]);

mockContext = {
userId: 'test-user-123',
authToken: 'test-token',
botId: undefined,
request: {} as Request,
env: {
Sandbox: {} as TRPCContext['env']['Sandbox'],
SandboxSmall: {} as TRPCContext['env']['SandboxSmall'],
CLOUD_AGENT_SESSION: {
idFromName: vi.fn((id: string) => ({ id })),
get: vi.fn(() => ({
getMetadata: mockGetMetadata,
getActiveExecutionId: mockGetActiveExecutionId,
getExecution: mockGetExecution,
})),
} as unknown as TRPCContext['env']['CLOUD_AGENT_SESSION'],
SESSION_INGEST: {
fetch: vi.fn(),
} as unknown as TRPCContext['env']['SESSION_INGEST'],
R2_BUCKET: {} as TRPCContext['env']['R2_BUCKET'],
GIT_TOKEN_SERVICE: {} as Env['GIT_TOKEN_SERVICE'],
NEXTAUTH_SECRET: 'test-secret',
INTERNAL_API_SECRET_PROD: {
get: vi.fn().mockResolvedValue('test-secret'),
} as unknown as TRPCContext['env']['INTERNAL_API_SECRET_PROD'],
HYPERDRIVE: {
connectionString: 'postgresql://test',
} as unknown as TRPCContext['env']['HYPERDRIVE'],
},
};
cloudAgentSession = mockContext.env.CLOUD_AGENT_SESSION as unknown as MockCAS;
vi.mocked(getSandbox).mockReturnValue({
listProcesses: mockListProcesses,
} as unknown as ReturnType<typeof getSandbox>);
caller = appRouter.createCaller(mockContext);
});

it('returns healthy sandbox and none execution health when no execution is active', async () => {
const sessionId: SessionId = 'agent_88888888-8888-8888-8888-888888888888';
const sandboxId = 'ses-a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6';
mockGetMetadata.mockResolvedValue({
version: 123456789,
sessionId,
userId: 'test-user-123',
timestamp: 123456789,
sandboxId,
} satisfies CloudAgentSessionState);

const result = await caller.getSessionHealth({ cloudAgentSessionId: sessionId });

expect(result).toEqual({
cloudAgentSessionId: sessionId,
sandboxId,
sandboxStatus: 'healthy',
executionHealth: 'none',
activeExecutionId: undefined,
activeExecutionStatus: undefined,
});
expect(cloudAgentSession.idFromName).toHaveBeenCalledWith(`test-user-123:${sessionId}`);
expect(getSandbox).toHaveBeenCalledWith(mockContext.env.SandboxSmall, sandboxId);
expect(mockListProcesses).toHaveBeenCalled();
});

it('returns stale execution health for a stale running execution', async () => {
const sessionId: SessionId = 'agent_99999999-9999-9999-9999-999999999999';
const activeExecutionId = 'exc_stale_execution';
mockGetMetadata.mockResolvedValue({
version: 123456789,
sessionId,
orgId: 'org-123',
userId: 'test-user-123',
timestamp: 123456789,
} satisfies CloudAgentSessionState);
mockGetActiveExecutionId.mockResolvedValue(activeExecutionId);
mockGetExecution.mockResolvedValue({
executionId: activeExecutionId,
status: 'running',
startedAt: Date.now() - 20 * 60 * 1000,
mode: 'code',
streamingMode: 'websocket',
lastHeartbeat: Date.now() - 11 * 60 * 1000,
});

const result = await caller.getSessionHealth({ cloudAgentSessionId: sessionId });

expect(result).toMatchObject({
cloudAgentSessionId: sessionId,
sandboxStatus: 'healthy',
executionHealth: 'stale',
activeExecutionId,
activeExecutionStatus: 'running',
});
expect(mockGetExecution).toHaveBeenCalledWith(activeExecutionId);
});

it('returns NOT_FOUND for missing session metadata', async () => {
const sessionId: SessionId = 'agent_aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa';
mockGetMetadata.mockResolvedValue(null);

await expect(caller.getSessionHealth({ cloudAgentSessionId: sessionId })).rejects.toThrow(
'Session not found'
);
expect(getSandbox).not.toHaveBeenCalled();
});

it('returns unreachable when sandbox process listing fails', async () => {
const sessionId: SessionId = 'agent_bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb';
const sandboxId = 'ses-b1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6';
mockGetMetadata.mockResolvedValue({
version: 123456789,
sessionId,
userId: 'test-user-123',
timestamp: 123456789,
sandboxId,
githubToken: 'secret-token-should-not-be-returned',
} satisfies CloudAgentSessionState);
mockListProcesses.mockRejectedValue(new Error('sandbox unavailable'));

const result = await caller.getSessionHealth({ cloudAgentSessionId: sessionId });

expect(result).toEqual({
cloudAgentSessionId: sessionId,
sandboxId,
sandboxStatus: 'unreachable',
executionHealth: 'none',
activeExecutionId: undefined,
activeExecutionStatus: undefined,
});
expect(result).not.toHaveProperty('githubToken');
});
});

describe('getLatestAssistantMessage procedure', () => {
let mockContext: TRPCContext;
let caller: ReturnType<typeof appRouter.createCaller>;
Expand Down
Loading