From fd6b2be4d348683883a23fed8543cf0857547160 Mon Sep 17 00:00:00 2001 From: Catalin Stavaru Date: Fri, 8 May 2026 18:40:42 +0300 Subject: [PATCH] feat(agent-tool): stream sub-agent events live (parity with adk-python #3991) Previously, when an LlmAgent invoked a sub-agent via AgentTool, all of the sub-agent's intermediate events (model output, tool calls, nested AgentTool invocations, tool responses) were swallowed by AgentTool.runAsync and only a single final function-response event was emitted to the parent runner. This made it impossible to surface real-time sub-agent activity to the user, and broke parity with the Python ADK once it shipped the streaming variant. This change mirrors adk-python PR #3991 (https://github.com/google/adk-python/pull/3991) for the JS SDK: functions.ts - Add handleFunctionCallsStreamingAsync(), an AsyncGenerator variant of handleFunctionCallsAsync(). It separates AgentTool calls from regular tool calls; for each AgentTool call it iterates AgentTool.runAsyncWithEvents() and yields every event live, while tracking the last content event to build the final tool-response. Regular tool calls continue to delegate to handleFunctionCallList. The terminal yield is always either the single response event or the merged parallel-function-response event - this contract lets the caller capture it with a one-step buffer. agent_tool.ts - Refactor AgentTool.runAsync(): split the runner/session setup into a private setupRunnerAndSession() and the result extraction into a public buildToolResultFromContent() helper. - Add AgentTool.runAsyncWithEvents(): an AsyncGenerator that yields the sub-agent's events as they are produced, without building the tool result. Callers (handleFunctionCallsStreamingAsync) are responsible for assembling the final response via buildToolResultFromContent(). - Pass invocationContext.runConfig down into runner.runAsync() so the sub-agent inherits the parent's run configuration, matching the Python implementation. llm_agent.ts - Switch the function-call handling site from handleFunctionCallsAsync to handleFunctionCallsStreamingAsync. Use a one-step buffer over the generator so every intermediate sub-agent event is yielded to the parent runner, and only the final function-response event is retained as functionResponseEvent for the subsequent processing. Refs: https://github.com/google/adk-python/pull/3991 --- core/src/agents/functions.ts | 121 +++++++++++++++++++++++++++++++++++ core/src/agents/llm_agent.ts | 23 ++++++- core/src/tools/agent_tool.ts | 114 +++++++++++++++++++++++++++------ 3 files changed, 235 insertions(+), 23 deletions(-) diff --git a/core/src/agents/functions.ts b/core/src/agents/functions.ts index 8697cdd4..b51bb0df 100644 --- a/core/src/agents/functions.ts +++ b/core/src/agents/functions.ts @@ -10,6 +10,7 @@ import {isEmpty} from 'lodash-es'; import {InvocationContext} from '../agents/invocation_context.js'; import {createEvent, Event, getFunctionCalls} from '../events/event.js'; import {mergeEventActions} from '../events/event_actions.js'; +import {AgentTool, isAgentTool} from '../tools/agent_tool.js'; import {BaseTool} from '../tools/base_tool.js'; import {ToolConfirmation} from '../tools/tool_confirmation.js'; import {randomUUID} from '../utils/env_aware_utils.js'; @@ -313,6 +314,126 @@ export async function handleFunctionCallsAsync({ }); } +/** + * Handles function calls and yields events from {@link AgentTool} sub-agents + * as they are produced, then yields the final merged function-response event + * last. + * + * Non-{@link AgentTool} calls are delegated to {@link handleFunctionCallList} + * (preserving plugin/before/after-callback semantics). {@link AgentTool} calls + * are streamed via {@link AgentTool.runAsyncWithEvents} so the parent runner + * can surface sub-agent activity in real time. The caller is expected to + * identify the terminal function-response event via + * {@link getFunctionResponses}. + */ +export async function* handleFunctionCallsStreamingAsync({ + invocationContext, + functionCallEvent, + toolsDict, + beforeToolCallbacks, + afterToolCallbacks, + toolConfirmationDict, +}: { + invocationContext: InvocationContext; + functionCallEvent: Event; + toolsDict: Record; + beforeToolCallbacks: SingleBeforeToolCallback[]; + afterToolCallbacks: SingleAfterToolCallback[]; + toolConfirmationDict?: Record; +}): AsyncGenerator { + const functionCalls = getFunctionCalls(functionCallEvent); + if (!functionCalls.length) { + return; + } + + const agentToolCalls: Array<{call: FunctionCall; tool: AgentTool}> = []; + const regularCalls: FunctionCall[] = []; + for (const functionCall of functionCalls) { + const tool = functionCall.name ? toolsDict[functionCall.name] : undefined; + if (tool && isAgentTool(tool)) { + agentToolCalls.push({call: functionCall, tool}); + } else { + regularCalls.push(functionCall); + } + } + + // Stream sub-agent events for each AgentTool call and build response events. + const agentToolResponseEvents: Event[] = []; + for (const {call, tool} of agentToolCalls) { + const toolContext = new Context({ + invocationContext, + functionCallId: call.id || undefined, + toolConfirmation: + toolConfirmationDict && call.id + ? toolConfirmationDict[call.id] + : undefined, + }); + + let lastContent: Content | undefined; + for await (const event of tool.runAsyncWithEvents({ + args: call.args ?? {}, + toolContext, + })) { + if (invocationContext.abortSignal?.aborted) { + return; + } + if (event.content) { + lastContent = event.content; + } + yield event; + } + + const toolResult = tool.buildToolResultFromContent(lastContent); + const responseValue = + typeof toolResult !== 'object' || toolResult == null + ? {result: toolResult} + : (toolResult as Record); + + agentToolResponseEvents.push( + createEvent({ + invocationId: invocationContext.invocationId, + author: invocationContext.agent.name, + content: createUserContent({ + functionResponse: { + id: call.id, + name: tool.name, + response: responseValue, + }, + }), + actions: toolContext.actions, + branch: invocationContext.branch, + }), + ); + } + + // Run any remaining (non-AgentTool) calls through the standard pipeline. + let regularResponseEvent: Event | null = null; + if (regularCalls.length) { + regularResponseEvent = await handleFunctionCallList({ + invocationContext, + functionCalls: regularCalls, + toolsDict, + beforeToolCallbacks, + afterToolCallbacks, + toolConfirmationDict, + }); + } + + const allResponseEvents: Event[] = []; + if (regularResponseEvent) { + allResponseEvents.push(regularResponseEvent); + } + allResponseEvents.push(...agentToolResponseEvents); + + if (!allResponseEvents.length) { + return; + } + + yield allResponseEvents.length === 1 + ? allResponseEvents[0] + : mergeParallelFunctionResponseEvents(allResponseEvents); +} + /** * The underlying implementation of handleFunctionCalls, but takes a list of * function calls instead of an event. diff --git a/core/src/agents/llm_agent.ts b/core/src/agents/llm_agent.ts index 30aba287..515355c3 100644 --- a/core/src/agents/llm_agent.ts +++ b/core/src/agents/llm_agent.ts @@ -49,7 +49,7 @@ import { generateAuthEvent, generateRequestConfirmationEvent, getLongRunningFunctionCalls, - handleFunctionCallsAsync, + handleFunctionCallsStreamingAsync, populateClientFunctionCallId, } from './functions.js'; @@ -907,13 +907,30 @@ export class LlmAgent extends BaseAgent { // Call functions // TODO - b/425992518: bloated funciton input, fix. // Tool callback passed to get rid of cyclic dependency. - const functionResponseEvent = await handleFunctionCallsAsync({ + // Use the streaming variant so events from AgentTool sub-agents are + // surfaced live (Issue parity with adk-python PR #3991). The terminal + // event of the generator is always the merged function-response event; + // hold the latest event in a one-step buffer so we yield every + // intermediate sub-agent event (including the sub-agent's own tool + // responses) and capture only the final one as the response. + let functionResponseEvent: Event | null = null; + let pendingEvent: Event | null = null; + for await (const event of handleFunctionCallsStreamingAsync({ invocationContext: invocationContext, functionCallEvent: mergedEvent, toolsDict: llmRequest.toolsDict, beforeToolCallbacks: this.canonicalBeforeToolCallbacks, afterToolCallbacks: this.canonicalAfterToolCallbacks, - }); + })) { + if (invocationContext.abortSignal?.aborted) { + return; + } + if (pendingEvent) { + yield pendingEvent; + } + pendingEvent = event; + } + functionResponseEvent = pendingEvent; if (!functionResponseEvent || invocationContext.abortSignal?.aborted) { return; diff --git a/core/src/tools/agent_tool.ts b/core/src/tools/agent_tool.ts index 16edb581..0fe178be 100644 --- a/core/src/tools/agent_tool.ts +++ b/core/src/tools/agent_tool.ts @@ -116,14 +116,20 @@ export class AgentTool extends BaseTool { return declaration; } - override async runAsync({ + /** + * Sets up the Runner and Session for sub-agent execution. + * + * Shared by {@link runAsync} and {@link runAsyncWithEvents}. + */ + private async setupRunnerAndSession({ args, toolContext, - }: RunAsyncToolRequest): Promise { - if (this.skipSummarization) { - toolContext.actions.skipSummarization = true; - } - + }: RunAsyncToolRequest): Promise<{ + runner: Runner; + content: Content; + sessionUserId: string; + sessionId: string; + }> { const hasInputSchema = isLlmAgent(this.agent) && this.agent.inputSchema; const content: Content = { role: 'user', @@ -158,15 +164,55 @@ export class AgentTool extends BaseTool { state: toolContext.state.toRecord(), }); + return { + runner, + content, + sessionUserId: session.userId, + sessionId: session.id, + }; + } + + /** + * Builds the tool result from the last content event of the sub-agent. + * + * Excludes thought parts and applies the output schema (if any). + */ + buildToolResultFromContent(lastContent: Content | undefined): unknown { + if (!lastContent?.parts?.length) { + return ''; + } + const hasOutputSchema = isLlmAgent(this.agent) && this.agent.outputSchema; + const mergedText = lastContent.parts + .filter((part) => !part.thought) + .map((part) => part.text) + .filter((text) => text) + .join('\n'); + // TODO - b/425992518: In case of output schema, the output should be + // validated. Consider similar logic to one we have in Python ADK. + return hasOutputSchema ? JSON.parse(mergedText) : mergedText; + } + + override async runAsync({ + args, + toolContext, + }: RunAsyncToolRequest): Promise { + if (this.skipSummarization) { + toolContext.actions.skipSummarization = true; + } + + const {runner, content, sessionUserId, sessionId} = + await this.setupRunnerAndSession({args, toolContext}); + if (toolContext.abortSignal?.aborted) { return ''; } let lastEvent: Event | undefined; for await (const event of runner.runAsync({ - userId: session.userId, - sessionId: session.id, + userId: sessionUserId, + sessionId, newMessage: content, + runConfig: toolContext.invocationContext.runConfig, abortSignal: toolContext.abortSignal, })) { if (toolContext.abortSignal?.aborted) { @@ -180,20 +226,48 @@ export class AgentTool extends BaseTool { lastEvent = event; } - if (!lastEvent?.content?.parts?.length) { - return ''; + return this.buildToolResultFromContent(lastEvent?.content); + } + + /** + * Runs the wrapped agent and yields the sub-agent's events as they are + * produced, providing real-time visibility into sub-agent progress. + * + * Counterpart to {@link runAsync}; the caller is responsible for tracking + * the last content event and building the final tool result via + * {@link buildToolResultFromContent}. + */ + async *runAsyncWithEvents({ + args, + toolContext, + }: RunAsyncToolRequest): AsyncGenerator { + if (this.skipSummarization) { + toolContext.actions.skipSummarization = true; } - const hasOutputSchema = isLlmAgent(this.agent) && this.agent.outputSchema; - // Exclude thoughts from the merged text. - const mergedText = lastEvent.content.parts - .filter((part) => !part.thought) - .map((part) => part.text) - .filter((text) => text) - .join('\n'); + const {runner, content, sessionUserId, sessionId} = + await this.setupRunnerAndSession({args, toolContext}); - // TODO - b/425992518: In case of output schema, the output should be - // validated. Consider similar logic to one we have in Python ADK. - return hasOutputSchema ? JSON.parse(mergedText) : mergedText; + if (toolContext.abortSignal?.aborted) { + return; + } + + for await (const event of runner.runAsync({ + userId: sessionUserId, + sessionId, + newMessage: content, + runConfig: toolContext.invocationContext.runConfig, + abortSignal: toolContext.abortSignal, + })) { + if (toolContext.abortSignal?.aborted) { + return; + } + + if (event.actions.stateDelta) { + toolContext.state.update(event.actions.stateDelta); + } + + yield event; + } } }