From 550cc22bec69eb7cc3c8541f143700c7816fd0c8 Mon Sep 17 00:00:00 2001 From: Roger Chappel Date: Thu, 21 May 2026 12:55:10 +1000 Subject: [PATCH 1/3] feat: forward realtime tool calls to openclaw --- src/lib/gateway-client-realtime.test.ts | 21 +++++++++++++++++++++ src/lib/gateway-client.ts | 19 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/src/lib/gateway-client-realtime.test.ts b/src/lib/gateway-client-realtime.test.ts index b59f977..58ef1c4 100644 --- a/src/lib/gateway-client-realtime.test.ts +++ b/src/lib/gateway-client-realtime.test.ts @@ -88,4 +88,25 @@ describe("GatewayClient realtime Talk compatibility", () => { reason: "barge-in", }); }); + + it("forwards realtime provider tool calls through OpenClaw client tool calls", async () => { + const client = new GatewayClient("ws://localhost:18789", null, device); + const rpc = vi.spyOn(client, "rpc").mockResolvedValue({ runId: "run_1" }); + + await expect(client.realtimeClientToolCall({ + sessionKey: "main", + relaySessionId: "relay_1", + callId: "call_1", + name: "openclaw_agent_consult", + args: { prompt: "Inspect this repo" }, + })).resolves.toEqual({ runId: "run_1" }); + + expect(rpc).toHaveBeenCalledWith("talk.client.toolCall", { + sessionKey: "main", + relaySessionId: "relay_1", + callId: "call_1", + name: "openclaw_agent_consult", + args: { prompt: "Inspect this repo" }, + }); + }); }); diff --git a/src/lib/gateway-client.ts b/src/lib/gateway-client.ts index e8dfea8..f8ba0b4 100644 --- a/src/lib/gateway-client.ts +++ b/src/lib/gateway-client.ts @@ -207,6 +207,19 @@ export interface GatewayRealtimeRelayToolResultParams extends Record { + sessionKey: string; + callId: string; + name: string; + args?: unknown; + relaySessionId?: string; +} + +export interface GatewayRealtimeClientToolCallResult { + runId?: string; + idempotencyKey?: string; +} + export interface GatewayCronJob { id: string; agentId?: string; @@ -794,6 +807,12 @@ export class GatewayClient { } } + async realtimeClientToolCall( + params: GatewayRealtimeClientToolCallParams, + ): Promise { + return this.rpc("talk.client.toolCall", withoutUndefined(params)); + } + async realtimeRelayStop(relaySessionId: string): Promise<{ ok?: boolean }> { try { return await this.rpc<{ ok?: boolean }>("talk.session.close", { sessionId: relaySessionId }); From 68873df3600897853b0e4178a100b2424b02f0a8 Mon Sep 17 00:00:00 2001 From: Roger Chappel Date: Thu, 21 May 2026 12:57:19 +1000 Subject: [PATCH 2/3] feat: delegate realtime voice tool calls --- .../[id]/talk/realtime/relay/route.test.ts | 70 +++++++- .../[id]/talk/realtime/relay/route.ts | 168 +++++++++++++++++- 2 files changed, 233 insertions(+), 5 deletions(-) diff --git a/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts b/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts index 79b8933..f2dbf9b 100644 --- a/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts +++ b/src/app/api/runtimes/[id]/talk/realtime/relay/route.test.ts @@ -8,9 +8,11 @@ type RuntimeRow = { type Field = { key: keyof RuntimeRow }; type Predicate = (row: RuntimeRow) => boolean; -const { mockRuntimeRows, mockGetGatewayClientForRuntime } = vi.hoisted(() => ({ +const { mockRuntimeRows, mockGetGatewayClientForRuntime, mockHoldClient, mockReleaseClient } = vi.hoisted(() => ({ mockRuntimeRows: [] as RuntimeRow[], mockGetGatewayClientForRuntime: vi.fn(), + mockHoldClient: vi.fn(), + mockReleaseClient: vi.fn(), })); vi.mock("@/db/schema", () => ({ @@ -46,6 +48,8 @@ vi.mock("@/lib/agent-access", () => ({ vi.mock("@/lib/gateway-chat-pool", () => ({ getGatewayClientForRuntime: (...args: unknown[]) => mockGetGatewayClientForRuntime(...args), + holdClient: (...args: unknown[]) => mockHoldClient(...args), + releaseClient: (...args: unknown[]) => mockReleaseClient(...args), })); import { POST } from "./route"; @@ -105,6 +109,70 @@ describe("POST /api/runtimes/[id]/talk/realtime/relay", () => { expect(realtimeRelayCancelOutput).toHaveBeenCalledWith("relay_1", "barge-in"); }); + it("delegates realtime tool calls to OpenClaw and submits the final result", async () => { + let gatewayHandler: ((payload: unknown) => void) | null = null; + const client = { + realtimeClientToolCall: vi.fn().mockResolvedValue({ runId: "run_1" }), + realtimeRelayToolResult: vi.fn().mockResolvedValue({ ok: true }), + on: vi.fn((event: string, handler: (payload: unknown) => void) => { + if (event === "*") gatewayHandler = handler; + }), + off: vi.fn(), + }; + mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" }); + mockGetGatewayClientForRuntime.mockResolvedValue(client); + + const responsePromise = POST( + new Request("http://localhost/api/runtimes/rt_1/talk/realtime/relay", { + method: "POST", + body: JSON.stringify({ + action: "toolCall", + relaySessionId: "relay_1", + sessionKey: "main", + callId: "call_1", + name: "openclaw_agent_consult", + args: { prompt: "Inspect this repo" }, + }), + }), + { params: Promise.resolve({ id: "rt_1" }) }, + ); + + await vi.waitFor(() => { + expect(client.realtimeClientToolCall).toHaveBeenCalledWith({ + relaySessionId: "relay_1", + sessionKey: "main", + callId: "call_1", + name: "openclaw_agent_consult", + args: { prompt: "Inspect this repo" }, + }); + expect(gatewayHandler).toBeTypeOf("function"); + }); + + (gatewayHandler as ((payload: unknown) => void) | null)?.({ + event: "chat", + runId: "run_1", + state: "final", + message: { content: "The repo is a CrewCMD app." }, + }); + + const response = await responsePromise; + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual({ + result: { + delegated: true, + runId: "run_1", + result: { ok: true }, + }, + }); + expect(client.realtimeRelayToolResult).toHaveBeenCalledWith({ + relaySessionId: "relay_1", + callId: "call_1", + result: { result: "The repo is a CrewCMD app." }, + }); + expect(mockHoldClient).toHaveBeenCalledWith(client); + expect(mockReleaseClient).toHaveBeenCalledWith(client); + }); + it("rejects invalid relay actions before calling the gateway", async () => { mockRuntimeRows.push({ id: "rt_1", ownerUserId: "user_1" }); diff --git a/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts b/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts index 8293166..c184307 100644 --- a/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts +++ b/src/app/api/runtimes/[id]/talk/realtime/relay/route.ts @@ -3,11 +3,13 @@ import { and, eq } from "drizzle-orm"; import { db, withRetry } from "@/db"; import { companyRuntimes } from "@/db/schema"; import { buildRuntimeReadWhere, getAgentAccessContext } from "@/lib/agent-access"; -import { getGatewayClientForRuntime } from "@/lib/gateway-chat-pool"; +import { getGatewayClientForRuntime, holdClient, releaseClient } from "@/lib/gateway-chat-pool"; +import type { GatewayClient } from "@/lib/gateway-client"; export const dynamic = "force-dynamic"; +export const maxDuration = 120; -type RelayAction = "audio" | "cancelOutput" | "mark" | "toolResult" | "stop"; +type RelayAction = "audio" | "cancelOutput" | "mark" | "toolCall" | "toolResult" | "stop"; export async function POST( request: Request, @@ -71,6 +73,17 @@ export async function POST( return NextResponse.json({ result }); } + if (action === "toolCall") { + const result = await runRealtimeToolCall(client, { + relaySessionId, + sessionKey: readRequiredString(body.sessionKey, "sessionKey"), + callId: readRequiredString(body.callId, "callId"), + name: readRequiredString(body.name, "name"), + args: body.args, + }); + return NextResponse.json({ result }); + } + const result = await client.realtimeRelayStop(relaySessionId); return NextResponse.json({ result }); } catch (err) { @@ -82,6 +95,94 @@ export async function POST( class ValidationError extends Error {} +async function runRealtimeToolCall( + client: GatewayClient, + params: { + relaySessionId: string; + sessionKey: string; + callId: string; + name: string; + args: unknown; + }, +) { + if (params.name !== "openclaw_agent_consult") { + const result = await client.realtimeRelayToolResult({ + relaySessionId: params.relaySessionId, + callId: params.callId, + result: { + error: `Unsupported realtime tool call: ${params.name}`, + name: params.name, + }, + }); + return { delegated: false, result }; + } + + holdClient(client); + try { + try { + const toolCall = await client.realtimeClientToolCall(params); + const runId = firstString(toolCall.runId, toolCall.idempotencyKey); + if (!runId) throw new Error("OpenClaw realtime tool call did not return a run id"); + + const text = await waitForChatFinal(client, runId); + const result = await client.realtimeRelayToolResult({ + relaySessionId: params.relaySessionId, + callId: params.callId, + result: { result: text }, + }); + return { delegated: true, runId, result }; + } catch (error) { + const message = error instanceof Error ? error.message : "OpenClaw realtime tool call failed"; + await client.realtimeRelayToolResult({ + relaySessionId: params.relaySessionId, + callId: params.callId, + result: { error: message, name: params.name }, + }).catch(() => {}); + throw error; + } + } finally { + releaseClient(client); + } +} + +function waitForChatFinal(client: GatewayClient, runId: string, timeoutMs = 110_000) { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + cleanup(); + reject(new Error("Timed out waiting for OpenClaw realtime tool result")); + }, timeoutMs); + + const cleanup = () => { + clearTimeout(timer); + client.off("*", onEvent); + }; + + const onEvent = (payload: unknown) => { + const event = asRecord(payload); + if (!event) return; + const runIds = extractEventRunIds(event); + if (!runIds.includes(runId)) return; + + const state = firstString(event.state, event.status)?.toLowerCase(); + if (state === "final" || state === "complete" || state === "completed") { + const text = extractText(event.message ?? event); + cleanup(); + resolve(text || "OpenClaw completed without returning text."); + return; + } + + if (state === "aborted" || state === "error" || state === "failed") { + const message = firstString(event.errorMessage, event.error, event.message) ?? + "OpenClaw realtime tool call failed"; + cleanup(); + reject(new Error(message)); + } + }; + + client.on("*", onEvent); + }); +} + function readRequiredString(value: unknown, name: string) { if (typeof value === "string" && value.trim().length > 0) return value.trim(); throw new ValidationError(`${name} is required`); @@ -96,6 +197,65 @@ function readOptionalString(value: unknown) { } function readRelayAction(value: unknown): RelayAction { - if (value === "audio" || value === "cancelOutput" || value === "mark" || value === "toolResult" || value === "stop") return value; - throw new ValidationError("action must be audio, cancelOutput, mark, toolResult, or stop"); + if ( + value === "audio" || + value === "cancelOutput" || + value === "mark" || + value === "toolCall" || + value === "toolResult" || + value === "stop" + ) return value; + throw new ValidationError("action must be audio, cancelOutput, mark, toolCall, toolResult, or stop"); +} + +function asRecord(value: unknown): Record | null { + return value && typeof value === "object" && !Array.isArray(value) + ? value as Record + : null; +} + +function firstString(...values: unknown[]) { + for (const value of values) { + if (typeof value === "string" && value.trim().length > 0) return value.trim(); + } + return undefined; +} + +function extractEventRunIds(payload: Record) { + return [ + payload.runId, + payload.run_id, + payload.id, + payload.requestId, + payload.request_id, + payload.parentRunId, + payload.parent_run_id, + ] + .map((value) => typeof value === "string" ? value : null) + .filter((value): value is string => Boolean(value)); +} + +function extractText(value: unknown, seen = new WeakSet()): string { + if (typeof value === "string") return value; + if (value === null || value === undefined) return ""; + if (typeof value !== "object") return ""; + if (seen.has(value)) return ""; + seen.add(value); + + if (Array.isArray(value)) { + return value.map((item) => extractText(item, seen)).filter(Boolean).join(""); + } + + const record = value as Record; + const direct = firstString(record.text, record.content, record.output, record.result); + if (direct) return direct; + + return [ + record.message, + record.delta, + record.data, + record.payload, + record.parts, + record.items, + ].map((item) => extractText(item, seen)).filter(Boolean).join(""); } From f20b097d8f4a16f419dab15e3b787495e528588b Mon Sep 17 00:00:00 2001 From: Roger Chappel Date: Thu, 21 May 2026 12:58:10 +1000 Subject: [PATCH 3/3] feat: run realtime relay tool calls from browser --- src/lib/realtime-voice-client.test.ts | 31 ++++++++++++++++++++++- src/lib/realtime-voice-client.ts | 22 ++++++++++++++++- src/lib/realtime-voice-gateway-relay.ts | 33 +++++++++++++++++++------ 3 files changed, 77 insertions(+), 9 deletions(-) diff --git a/src/lib/realtime-voice-client.test.ts b/src/lib/realtime-voice-client.test.ts index 05c50e2..1ecda75 100644 --- a/src/lib/realtime-voice-client.test.ts +++ b/src/lib/realtime-voice-client.test.ts @@ -3,6 +3,7 @@ import { cancelRealtimeRelayOutput, openRealtimeRelayEvents, sendRealtimeRelayAudio, + sendRealtimeRelayToolCall, startRealtimeVoiceSession, } from "./realtime-voice-client"; @@ -21,7 +22,7 @@ describe("realtime voice client helpers", () => { runtimeId: "rt 1", sessionKey: "main", provider: "openai", - })).resolves.toEqual({ transport: "gateway-relay", relaySessionId: "relay_1" }); + })).resolves.toEqual({ transport: "gateway-relay", relaySessionId: "relay_1", sessionKey: "main" }); expect(fetchMock).toHaveBeenCalledWith("/api/runtimes/rt%201/talk/realtime/session", { method: "POST", @@ -77,6 +78,34 @@ describe("realtime voice client helpers", () => { }); }); + it("sends provider tool calls through the runtime relay route", async () => { + const fetchMock = vi.spyOn(globalThis, "fetch").mockResolvedValue({ + ok: true, + json: () => Promise.resolve({ result: { delegated: true } }), + } as Response); + + await sendRealtimeRelayToolCall("rt_1", { + relaySessionId: "relay_1", + sessionKey: "main", + callId: "call_1", + name: "openclaw_agent_consult", + args: { prompt: "Inspect this repo" }, + }); + + expect(fetchMock).toHaveBeenCalledWith("/api/runtimes/rt_1/talk/realtime/relay", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + action: "toolCall", + relaySessionId: "relay_1", + sessionKey: "main", + callId: "call_1", + name: "openclaw_agent_consult", + args: { prompt: "Inspect this repo" }, + }), + }); + }); + it("opens relay events for the selected runtime and session", () => { const eventSourceMock = vi.fn(); vi.stubGlobal("EventSource", eventSourceMock); diff --git a/src/lib/realtime-voice-client.ts b/src/lib/realtime-voice-client.ts index d69989a..b348055 100644 --- a/src/lib/realtime-voice-client.ts +++ b/src/lib/realtime-voice-client.ts @@ -12,6 +12,7 @@ export interface RealtimeVoiceSessionRequest { export interface RealtimeVoiceSession { sessionId?: string; relaySessionId?: string; + sessionKey?: string; transport?: RealtimeVoiceTransport | string; provider?: string; model?: string; @@ -37,6 +38,14 @@ export interface RealtimeRelayAudioChunk { timestamp?: number; } +export interface RealtimeRelayToolCall { + relaySessionId: string; + sessionKey: string; + callId: string; + name: string; + args?: unknown; +} + export async function startRealtimeVoiceSession( request: RealtimeVoiceSessionRequest, ): Promise { @@ -57,7 +66,11 @@ export async function startRealtimeVoiceSession( } const data = await response.json(); - return data.session as RealtimeVoiceSession; + const session = data.session as RealtimeVoiceSession; + return { + ...session, + sessionKey: session.sessionKey ?? request.sessionKey ?? "main", + }; } export async function sendRealtimeRelayAudio(runtimeId: string, chunk: RealtimeRelayAudioChunk): Promise { @@ -101,6 +114,13 @@ export async function sendRealtimeRelayToolResult( }); } +export async function sendRealtimeRelayToolCall(runtimeId: string, toolCall: RealtimeRelayToolCall): Promise { + await postRealtimeRelay(runtimeId, { + action: "toolCall", + ...toolCall, + }); +} + export async function stopRealtimeRelay(runtimeId: string, relaySessionId: string): Promise { await postRealtimeRelay(runtimeId, { action: "stop", diff --git a/src/lib/realtime-voice-gateway-relay.ts b/src/lib/realtime-voice-gateway-relay.ts index 56f798c..c708756 100644 --- a/src/lib/realtime-voice-gateway-relay.ts +++ b/src/lib/realtime-voice-gateway-relay.ts @@ -3,6 +3,7 @@ import { openRealtimeRelayEvents, sendRealtimeRelayAudio, sendRealtimeRelayMark, + sendRealtimeRelayToolCall, sendRealtimeRelayToolResult, stopRealtimeRelay, type RealtimeVoiceSession, @@ -192,7 +193,7 @@ export class RealtimeGatewayRelaySession { } return; case "toolCall": - this.submitUnavailableToolResult(event); + this.handleToolCall(event); return; case "error": this.callbacks.onStatus?.("error", event.message ?? "Realtime relay failed"); @@ -240,12 +241,30 @@ export class RealtimeGatewayRelaySession { }, delayMs); } - private submitUnavailableToolResult(event: Extract): void { - if (!this.session.relaySessionId || !event.callId) return; - void sendRealtimeRelayToolResult(this.runtimeId, this.session.relaySessionId, event.callId, { - error: "Realtime browser tool calls are not wired into CrewCMD yet.", - name: event.name ?? null, - }).catch(() => {}); + private handleToolCall(event: Extract): void { + const relaySessionId = this.session.relaySessionId; + const sessionKey = typeof this.session.sessionKey === "string" && this.session.sessionKey.trim() + ? this.session.sessionKey.trim() + : "main"; + if (!relaySessionId || !event.callId || !event.name) return; + const callId = event.callId; + const name = event.name; + + this.callbacks.onStatus?.("processing", "Consulting OpenClaw"); + void sendRealtimeRelayToolCall(this.runtimeId, { + relaySessionId, + sessionKey, + callId, + name, + args: event.args ?? {}, + }).catch((error) => { + const message = error instanceof Error ? error.message : String(error); + this.callbacks.onError?.(message); + void sendRealtimeRelayToolResult(this.runtimeId, relaySessionId, callId, { + error: message, + name, + }).catch(() => {}); + }); } private cancelOutputForBargeIn(): void {