Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/app/api/runtimes/[id]/talk/realtime/events/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ export async function GET(
};
cleanup = () => {
if (!cleanup) return;
client.off("talk.event", onRelay);
client.off("talk.realtime.relay", onRelay);
request.signal.removeEventListener("abort", cleanup);
releaseClient(client);
cleanup = null;
};

client.on("talk.event", onRelay);
client.on("talk.realtime.relay", onRelay);
request.signal.addEventListener("abort", cleanup, { once: true });
send("realtime_ready", { relaySessionId });
Expand Down
79 changes: 79 additions & 0 deletions src/lib/gateway-client-realtime.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { describe, expect, it, vi } from "vitest";
import { GatewayClient, type DeviceIdentity } from "./gateway-client";

const device: DeviceIdentity = {
deviceId: "device_1",
publicKeyRawBase64Url: "pub",
privateKeyPem: "private",
source: "configured",
};

describe("GatewayClient realtime Talk compatibility", () => {
it("creates gateway relay sessions through the unified Talk session API", async () => {
const client = new GatewayClient("ws://localhost:18789", null, device);
const rpc = vi.spyOn(client, "rpc").mockResolvedValueOnce({
transport: "gateway-relay",
relaySessionId: "relay_1",
});

await expect(client.realtimeTalkSession({
sessionKey: "main",
provider: "openai",
agentId: "NEO",
})).resolves.toMatchObject({
transport: "gateway-relay",
relaySessionId: "relay_1",
});

expect(rpc).toHaveBeenCalledWith("talk.session.create", {
sessionKey: "main",
provider: "openai",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
});
});

it("maps relay audio, tool results, and stop onto unified session methods", async () => {
const client = new GatewayClient("ws://localhost:18789", null, device);
const rpc = vi.spyOn(client, "rpc").mockResolvedValue({ ok: true });

await client.realtimeRelayAudio({
relaySessionId: "relay_1",
audioBase64: "AAAA",
timestamp: 123,
});
await client.realtimeRelayToolResult({
relaySessionId: "relay_1",
callId: "call_1",
result: { ok: true },
});
await client.realtimeRelayStop("relay_1");

expect(rpc).toHaveBeenNthCalledWith(1, "talk.session.appendAudio", {
sessionId: "relay_1",
audioBase64: "AAAA",
timestamp: 123,
});
expect(rpc).toHaveBeenNthCalledWith(2, "talk.session.submitToolResult", {
sessionId: "relay_1",
callId: "call_1",
result: { ok: true },
});
expect(rpc).toHaveBeenNthCalledWith(3, "talk.session.close", {
sessionId: "relay_1",
});
});

it("keeps relay mark acknowledgements local for the current OpenClaw API", async () => {
const client = new GatewayClient("ws://localhost:18789", null, device);
const rpc = vi.spyOn(client, "rpc");

await expect(client.realtimeRelayMark({
relaySessionId: "relay_1",
markName: "done",
})).resolves.toEqual({ ok: true });

expect(rpc).not.toHaveBeenCalled();
});
});
68 changes: 63 additions & 5 deletions src/lib/gateway-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ export interface GatewayRealtimeTalkSessionParams extends Record<string, unknown
model?: string;
voice?: string;
agentId?: string;
mode?: string;
transport?: string;
brain?: string;
}

export interface GatewayRealtimeTalkSessionResult {
Expand Down Expand Up @@ -198,6 +201,10 @@ export interface GatewayRealtimeRelayToolResultParams extends Record<string, unk
relaySessionId: string;
callId: string;
result: unknown;
options?: {
suppressResponse?: boolean;
willContinue?: boolean;
};
}

export interface GatewayCronJob {
Expand Down Expand Up @@ -292,6 +299,20 @@ function signPayload(privateKeyPem: string, payload: string): string {
return base64UrlEncode(sig);
}

function withoutUndefined<T extends Record<string, unknown>>(record: T): Record<string, unknown> {
return Object.fromEntries(Object.entries(record).filter(([, value]) => value !== undefined));
}

function isLikelyMissingGatewayMethod(err: unknown): boolean {
const message = err instanceof Error ? err.message.toLowerCase() : String(err).toLowerCase();
return (
message.includes("unknown method")
|| message.includes("method not found")
|| message.includes("unsupported method")
|| message.includes("no handler")
);
}

/**
* Build v3 device auth payload string for signing.
*/
Expand Down Expand Up @@ -714,23 +735,60 @@ export class GatewayClient {
async realtimeTalkSession(
params: GatewayRealtimeTalkSessionParams = {},
): Promise<GatewayRealtimeTalkSessionResult> {
return this.rpc<GatewayRealtimeTalkSessionResult>("talk.realtime.session", params);
try {
const sessionParams = withoutUndefined(params);
delete sessionParams.agentId;
return await this.rpc<GatewayRealtimeTalkSessionResult>("talk.session.create", {
...sessionParams,
mode: params.mode ?? "realtime",
transport: params.transport ?? "gateway-relay",
brain: params.brain ?? "agent-consult",
});
} catch (err) {
if (!isLikelyMissingGatewayMethod(err)) throw err;
return this.rpc<GatewayRealtimeTalkSessionResult>("talk.realtime.session", params);
}
}

async realtimeRelayAudio(params: GatewayRealtimeRelayAudioParams): Promise<{ ok?: boolean }> {
return this.rpc<{ ok?: boolean }>("talk.realtime.relayAudio", params);
try {
return await this.rpc<{ ok?: boolean }>("talk.session.appendAudio", withoutUndefined({
sessionId: params.relaySessionId,
audioBase64: params.audioBase64,
timestamp: params.timestamp,
}));
} catch (err) {
if (!isLikelyMissingGatewayMethod(err)) throw err;
return this.rpc<{ ok?: boolean }>("talk.realtime.relayAudio", params);
}
}

async realtimeRelayMark(params: GatewayRealtimeRelayMarkParams): Promise<{ ok?: boolean }> {
return this.rpc<{ ok?: boolean }>("talk.realtime.relayMark", params);
void params;
return { ok: true };
}

async realtimeRelayToolResult(params: GatewayRealtimeRelayToolResultParams): Promise<{ ok?: boolean }> {
return this.rpc<{ ok?: boolean }>("talk.realtime.relayToolResult", params);
try {
return await this.rpc<{ ok?: boolean }>("talk.session.submitToolResult", withoutUndefined({
sessionId: params.relaySessionId,
callId: params.callId,
result: params.result,
options: params.options,
}));
} catch (err) {
if (!isLikelyMissingGatewayMethod(err)) throw err;
return this.rpc<{ ok?: boolean }>("talk.realtime.relayToolResult", params);
}
}

async realtimeRelayStop(relaySessionId: string): Promise<{ ok?: boolean }> {
return this.rpc<{ ok?: boolean }>("talk.realtime.relayStop", { relaySessionId });
try {
return await this.rpc<{ ok?: boolean }>("talk.session.close", { sessionId: relaySessionId });
} catch (err) {
if (!isLikelyMissingGatewayMethod(err)) throw err;
return this.rpc<{ ok?: boolean }>("talk.realtime.relayStop", { relaySessionId });
}
}

async cronList(): Promise<GatewayCronListResult> {
Expand Down
3 changes: 2 additions & 1 deletion src/lib/runtime-capabilities.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ describe("deriveRuntimeCapabilitySnapshot realtime voice", () => {
configuredProviders: [],
transports: ["webrtc-sdp", "json-pcm-websocket", "gateway-relay"],
});
expect(snapshot.realtimeVoice?.gatewayMethods).toContain("talk.realtime.session");
expect(snapshot.realtimeVoice?.gatewayMethods).toContain("talk.session.create");
expect(snapshot.realtimeVoice?.gatewayMethods).toContain("talk.event");
});

it("reads explicit realtime talk provider config when present", () => {
Expand Down
10 changes: 5 additions & 5 deletions src/lib/runtime-capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ function deriveRealtimeVoiceSummary(params: {
configuredProviders,
transports: ["webrtc-sdp", "json-pcm-websocket", "gateway-relay"],
gatewayMethods: [
"talk.realtime.session",
"talk.realtime.relayAudio",
"talk.realtime.relayMark",
"talk.realtime.relayToolResult",
"talk.realtime.relayStop",
"talk.session.create",
"talk.session.appendAudio",
"talk.session.submitToolResult",
"talk.session.close",
"talk.event",
],
notes: [
"Capability is config-derived only; route-level probing still determines whether the selected runtime accepts realtime talk sessions.",
Expand Down
Loading