Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ type RuntimeRow = {
type Field = { key: keyof RuntimeRow };
type Predicate = (row: RuntimeRow) => boolean;

const { mockRuntimeRows, mockGetGatewayClientForRuntime } = vi.hoisted(() => ({
const { mockRuntimeRows, mockGetGatewayClientForRuntime, mockHoldClient, mockReleaseClient } = vi.hoisted(() => ({
mockRuntimeRows: [] as RuntimeRow[],
mockGetGatewayClientForRuntime: vi.fn(),
mockHoldClient: vi.fn(),
mockReleaseClient: vi.fn(),
}));

vi.mock("@/db/schema", () => ({
Expand Down Expand Up @@ -46,6 +48,8 @@ vi.mock("@/lib/agent-access", () => ({

vi.mock("@/lib/gateway-chat-pool", () => ({
getGatewayClientForRuntime: (...args: unknown[]) => mockGetGatewayClientForRuntime(...args),
holdClient: (...args: unknown[]) => mockHoldClient(...args),
releaseClient: (...args: unknown[]) => mockReleaseClient(...args),
}));

import { POST } from "./route";
Expand Down Expand Up @@ -105,6 +109,70 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => {
expect(realtimeRelayCancelOutput).toHaveBeenCalledWith("relay_1", "barge-in");
});

it("delegates realtime tool calls to OpenClaw and submits the final result", async () => {
let gatewayHandler: ((payload: unknown) => void) | null = null;
const client = {
realtimeClientToolCall: vi.fn().mockResolvedValue({ runId: "run_1" }),
realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }),
on: vi.fn((event: string, handler: (payload: unknown) => void) => {
if (event === "*") gatewayHandler = handler;
}),
off: vi.fn(),
};
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });
mockGetGatewayClientForRuntime.mockResolvedValue(client);

const responsePromise = POST(
new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", {
method: "POST",
body: JSON.stringify({
action: "toolCall",
relaySessionId: "relay_1",
sessionKey: "main",
callId: "call_1",
name: "openclaw_agent_consult",
args: { prompt: "Inspect this repo" },
}),
}),
{ params: Promise.resolve({ id: "rt_1" }) },
);

await vi.waitFor(() => {
expect(client.realtimeClientToolCall).toHaveBeenCalledWith({
relaySessionId: "relay_1",
sessionKey: "main",
callId: "call_1",
name: "openclaw_agent_consult",
args: { prompt: "Inspect this repo" },
});
expect(gatewayHandler).toBeTypeOf("function");
});

(gatewayHandler as ((payload: unknown) => void) | null)?.({
event: "chat",
runId: "run_1",
state: "final",
message: { content: "The repo is a CrewCMD app." },
});

const response = await responsePromise;
expect(response.status).toBe(200);
await expect(response.json()).resolves.toEqual({
result: {
delegated: true,
runId: "run_1",
result: { ok: true },
},
});
expect(client.realtimeRelayToolResult).toHaveBeenCalledWith({
relaySessionId: "relay_1",
callId: "call_1",
result: { result: "The repo is a CrewCMD app." },
});
expect(mockHoldClient).toHaveBeenCalledWith(client);
expect(mockReleaseClient).toHaveBeenCalledWith(client);
});

it("rejects invalid relay actions before calling the gateway", async () => {
mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" });

Expand Down
168 changes: 164 additions & 4 deletions src/app/api/runtimes/[id]/talk/realtime/relay/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import { and, eq } from "drizzle-orm";
import { db, withRetry } from "@/db";
import { companyRuntimes } from "@/db/schema";
import { buildRuntimeReadWhere, getAgentAccessContext } from "@/lib/agent-access";
import { getGatewayClientForRuntime } from "@/lib/gateway-chat-pool";
import { getGatewayClientForRuntime, holdClient, releaseClient } from "@/lib/gateway-chat-pool";
import type { GatewayClient } from "@/lib/gateway-client";

export const dynamic = "force-dynamic";
export const maxDuration = 120;

type RelayAction = "audio" | "cancelOutput" | "mark" | "toolResult" | "stop";
type RelayAction = "audio" | "cancelOutput" | "mark" | "toolCall" | "toolResult" | "stop";

export async function POST(
request: Request,
Expand Down Expand Up @@ -71,6 +73,17 @@ export async function POST(
return NextResponse.json({ result });
}

if (action === "toolCall") {
const result = await runRealtimeToolCall(client, {
relaySessionId,
sessionKey: readRequiredString(body.sessionKey, "sessionKey"),
callId: readRequiredString(body.callId, "callId"),
name: readRequiredString(body.name, "name"),
args: body.args,
});
return NextResponse.json({ result });
}

const result = await client.realtimeRelayStop(relaySessionId);
return NextResponse.json({ result });
} catch (err) {
Expand All @@ -82,6 +95,94 @@ export async function POST(

class ValidationError extends Error {}

async function runRealtimeToolCall(
client: GatewayClient,
params: {
relaySessionId: string;
sessionKey: string;
callId: string;
name: string;
args: unknown;
},
) {
if (params.name !== "openclaw_agent_consult") {
const result = await client.realtimeRelayToolResult({
relaySessionId: params.relaySessionId,
callId: params.callId,
result: {
error: `Unsupported realtime tool call: ${params.name}`,
name: params.name,
},
});
return { delegated: false, result };
}

holdClient(client);
try {
try {
const toolCall = await client.realtimeClientToolCall(params);
const runId = firstString(toolCall.runId, toolCall.idempotencyKey);
if (!runId) throw new Error("OpenClaw realtime tool call did not return a run id");

const text = await waitForChatFinal(client, runId);
const result = await client.realtimeRelayToolResult({
relaySessionId: params.relaySessionId,
callId: params.callId,
result: { result: text },
});
return { delegated: true, runId, result };
} catch (error) {
const message = error instanceof Error ? error.message : "OpenClaw realtime tool call failed";
await client.realtimeRelayToolResult({
relaySessionId: params.relaySessionId,
callId: params.callId,
result: { error: message, name: params.name },
}).catch(() => {});
throw error;
}
} finally {
releaseClient(client);
}
}

function waitForChatFinal(client: GatewayClient, runId: string, timeoutMs = 110_000) {
return new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
reject(new Error("Timed out waiting for OpenClaw realtime tool result"));
}, timeoutMs);

const cleanup = () => {
clearTimeout(timer);
client.off("*", onEvent);
};

const onEvent = (payload: unknown) => {
const event = asRecord(payload);
if (!event) return;
const runIds = extractEventRunIds(event);
if (!runIds.includes(runId)) return;

const state = firstString(event.state, event.status)?.toLowerCase();
if (state === "final" || state === "complete" || state === "completed") {
const text = extractText(event.message ?? event);
cleanup();
resolve(text || "OpenClaw completed without returning text.");
return;
}

if (state === "aborted" || state === "error" || state === "failed") {
const message = firstString(event.errorMessage, event.error, event.message) ??
"OpenClaw realtime tool call failed";
cleanup();
reject(new Error(message));
}
};

client.on("*", onEvent);
});
}

function readRequiredString(value: unknown, name: string) {
if (typeof value === "string" && value.trim().length > 0) return value.trim();
throw new ValidationError(`${name} is required`);
Expand All @@ -96,6 +197,65 @@ function readOptionalString(value: unknown) {
}

function readRelayAction(value: unknown): RelayAction {
if (value === "audio" || value === "cancelOutput" || value === "mark" || value === "toolResult" || value === "stop") return value;
throw new ValidationError("action must be audio, cancelOutput, mark, toolResult, or stop");
if (
value === "audio" ||
value === "cancelOutput" ||
value === "mark" ||
value === "toolCall" ||
value === "toolResult" ||
value === "stop"
) return value;
throw new ValidationError("action must be audio, cancelOutput, mark, toolCall, toolResult, or stop");
}

function asRecord(value: unknown): Record<string, unknown> | null {
return value && typeof value === "object" && !Array.isArray(value)
? value as Record<string, unknown>
: null;
}

function firstString(...values: unknown[]) {
for (const value of values) {
if (typeof value === "string" && value.trim().length > 0) return value.trim();
}
return undefined;
}

function extractEventRunIds(payload: Record<string, unknown>) {
return [
payload.runId,
payload.run_id,
payload.id,
payload.requestId,
payload.request_id,
payload.parentRunId,
payload.parent_run_id,
]
.map((value) => typeof value === "string" ? value : null)
.filter((value): value is string => Boolean(value));
}

function extractText(value: unknown, seen = new WeakSet<object>()): string {
if (typeof value === "string") return value;
if (value === null || value === undefined) return "";
if (typeof value !== "object") return "";
if (seen.has(value)) return "";
seen.add(value);

if (Array.isArray(value)) {
return value.map((item) => extractText(item, seen)).filter(Boolean).join("");
}

const record = value as Record<string, unknown>;
const direct = firstString(record.text, record.content, record.output, record.result);
if (direct) return direct;

return [
record.message,
record.delta,
record.data,
record.payload,
record.parts,
record.items,
].map((item) => extractText(item, seen)).filter(Boolean).join("");
}
21 changes: 21 additions & 0 deletions src/lib/gateway-client-realtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,25 @@ describe("GatewayClient realtime Talk compatibility", () => {
reason: "barge-in",
});
});

it("forwards realtime provider tool calls through OpenClaw client tool calls", async () => {
const client = new GatewayClient("ws://localhost:18789", null, device);
const rpc = vi.spyOn(client, "rpc").mockResolvedValue({ runId: "run_1" });

await expect(client.realtimeClientToolCall({
sessionKey: "main",
relaySessionId: "relay_1",
callId: "call_1",
name: "openclaw_agent_consult",
args: { prompt: "Inspect this repo" },
})).resolves.toEqual({ runId: "run_1" });

expect(rpc).toHaveBeenCalledWith("talk.client.toolCall", {
sessionKey: "main",
relaySessionId: "relay_1",
callId: "call_1",
name: "openclaw_agent_consult",
args: { prompt: "Inspect this repo" },
});
});
});
19 changes: 19 additions & 0 deletions src/lib/gateway-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ export interface GatewayRealtimeRelayToolResultParams extends Record<string, unk
};
}

export interface GatewayRealtimeClientToolCallParams extends Record<string, unknown> {
sessionKey: string;
callId: string;
name: string;
args?: unknown;
relaySessionId?: string;
}

export interface GatewayRealtimeClientToolCallResult {
runId?: string;
idempotencyKey?: string;
}

export interface GatewayCronJob {
id: string;
agentId?: string;
Expand Down Expand Up @@ -794,6 +807,12 @@ export class GatewayClient {
}
}

async realtimeClientToolCall(
params: GatewayRealtimeClientToolCallParams,
): Promise<GatewayRealtimeClientToolCallResult> {
return this.rpc<GatewayRealtimeClientToolCallResult>("talk.client.toolCall", withoutUndefined(params));
}

async realtimeRelayStop(relaySessionId: string): Promise<{ ok?: boolean }> {
try {
return await this.rpc<{ ok?: boolean }>("talk.session.close", { sessionId: relaySessionId });
Expand Down
Loading
Loading