From f4a54c48afd1cd307fcce04ce55bf99ed98bcfac Mon Sep 17 00:00:00 2001 From: Amaad Martin Date: Tue, 2 Jun 2026 13:52:29 -0700 Subject: [PATCH 1/7] feat(core): Support Gemini 2.5 and 3.1 Live Models in ADK JS --- core/src/common.ts | 2 +- core/src/models/gemini_llm_connection.ts | 345 ++++++- core/src/models/google_llm.ts | 40 +- core/src/models/llm_response.ts | 10 + core/src/utils/async_queue.ts | 65 ++ core/src/utils/model_name.ts | 14 + .../test/models/gemini_llm_connection_test.ts | 972 ++++++++++++++++++ core/test/models/google_llm_test.ts | 69 ++ core/test/utils/async_queue_test.ts | 88 ++ core/test/utils/model_name_test.ts | 21 +- tests/e2e/live_model_test.ts | 79 ++ 11 files changed, 1680 insertions(+), 25 deletions(-) create mode 100644 core/src/utils/async_queue.ts create mode 100644 core/test/models/gemini_llm_connection_test.ts create mode 100644 core/test/utils/async_queue_test.ts create mode 100644 tests/e2e/live_model_test.ts diff --git a/core/src/common.ts b/core/src/common.ts index 264b33dd..bf6e0d01 100644 --- a/core/src/common.ts +++ b/core/src/common.ts @@ -239,7 +239,7 @@ export type { export {VertexRagRetrievalTool} from './tools/vertex_rag_retrieval_tool.js'; export {LogLevel, getLogger, setLogLevel, setLogger} from './utils/logger.js'; export type {Logger} from './utils/logger.js'; -export {isGemini2OrAbove} from './utils/model_name.js'; +export {isGemini2OrAbove, isGemini31FlashLive} from './utils/model_name.js'; export {zodObjectToSchema} from './utils/simple_zod_to_json.js'; export {GoogleLLMVariant} from './utils/variant_utils.js'; export {version} from './version.js'; diff --git a/core/src/models/gemini_llm_connection.ts b/core/src/models/gemini_llm_connection.ts index 3eb6da28..48963cdb 100644 --- a/core/src/models/gemini_llm_connection.ts +++ b/core/src/models/gemini_llm_connection.ts @@ -1,19 +1,35 @@ /** * @license - * Copyright 2025 Google LLC + * Copyright 2026 Google LLC * SPDX-License-Identifier: Apache-2.0 */ -import {Blob, Content, FunctionResponse, Session} from '@google/genai'; +import { + Blob, + Content, + FunctionResponse, + GroundingMetadata, + LiveServerMessage, + Part, + Session, +} from '@google/genai'; import {logger} from '../utils/logger.js'; +import {isGemini31FlashLive} from '../utils/model_name.js'; import {BaseLlmConnection} from './base_llm_connection.js'; import {LlmResponse} from './llm_response.js'; /** The Gemini model connection. */ export class GeminiLlmConnection implements BaseLlmConnection { - constructor(private readonly geminiSession: Session) {} + private _inputTranscriptionText = ''; + private _outputTranscriptionText = ''; + + constructor( + private readonly geminiSession: Session, + private readonly modelVersion?: string, + private readonly messageQueue?: AsyncIterable, + ) {} /** * Sends the conversation history to the gemini model. @@ -31,9 +47,12 @@ export class GeminiLlmConnection implements BaseLlmConnection { ); if (contents.length > 0) { + const isGemini31 = isGemini31FlashLive(this.modelVersion); this.geminiSession.sendClientContent({ turns: contents, - turnComplete: contents[contents.length - 1].role === 'user', + turnComplete: isGemini31 + ? true + : contents[contents.length - 1].role === 'user', }); } else { logger.info('no content is sent'); @@ -64,10 +83,16 @@ export class GeminiLlmConnection implements BaseLlmConnection { }); } else { logger.debug('Sending LLM new content', content); - this.geminiSession.sendClientContent({ - turns: [content], - turnComplete: true, - }); + const isGemini31 = isGemini31FlashLive(this.modelVersion); + if (isGemini31 && content.parts.length === 1 && content.parts[0].text) { + logger.debug('Using sendRealtimeInput for Gemini 3.1 text input'); + this.geminiSession.sendRealtimeInput({text: content.parts[0].text}); + } else { + this.geminiSession.sendClientContent({ + turns: [content], + turnComplete: true, + }); + } } } @@ -78,7 +103,23 @@ export class GeminiLlmConnection implements BaseLlmConnection { */ async sendRealtime(blob: Blob): Promise { logger.debug('Sending LLM Blob:', blob); - this.geminiSession.sendRealtimeInput({media: blob}); + const isGemini31 = isGemini31FlashLive(this.modelVersion); + const isNativeAudio = this.modelVersion?.includes('native-audio'); + + if (isGemini31 || isNativeAudio) { + if (blob.mimeType?.startsWith('audio/')) { + this.geminiSession.sendRealtimeInput({audio: blob}); + } else if (blob.mimeType?.startsWith('image/')) { + this.geminiSession.sendRealtimeInput({video: blob}); + } else { + logger.warn( + 'Blob not sent. Unknown or empty mime type for sendRealtimeInput:', + blob.mimeType, + ); + } + } else { + this.geminiSession.sendRealtimeInput({media: blob}); + } } /** @@ -88,21 +129,295 @@ export class GeminiLlmConnection implements BaseLlmConnection { * partial. * * @param text The text to be included in the response. + * @param isThought Whether the text is a thought. + * @param groundingMetadata The grounding metadata to include. * @returns An LlmResponse containing the full text. */ - private buildFullTextResponse(text: string): LlmResponse { - return { + private buildFullTextResponse( + text: string, + isThought = false, + groundingMetadata?: GroundingMetadata, + ): LlmResponse { + const part: Part = {text}; + if (isThought) { + part.thought = true; + } + const response: LlmResponse = { content: { role: 'model', - parts: [{text}], + parts: [part], }, + partial: false, }; + if (groundingMetadata !== undefined && groundingMetadata !== null) { + response.groundingMetadata = groundingMetadata; + } + if (this.modelVersion) { + response.modelVersion = this.modelVersion; + } + return response; } - // TODO(b/425992518): GenAI SDK inconsistent API, missing methods. - // eslint-disable-next-line require-yield async *receive(): AsyncGenerator { - throw new Error('Not Implemented.'); + if (!this.messageQueue) { + throw new Error('Message queue is not initialized.'); + } + + let text = ''; + let isThought = false; + let toolCallParts: Part[] = []; + let pendingGroundingMetadata: GroundingMetadata | undefined = undefined; + + for await (const message of this.messageQueue) { + logger.debug('Got LLM Live message:', message); + + if (message.usageMetadata) { + yield { + usageMetadata: message.usageMetadata, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + + if (message.serverContent) { + const serverContent = message.serverContent; + const content = serverContent.modelTurn; + + if (serverContent.groundingMetadata) { + pendingGroundingMetadata = serverContent.groundingMetadata; + } + + // Standalone groundingMetadata event (when content is empty) + if ( + !(content && content.parts) && + serverContent.groundingMetadata && + !serverContent.turnComplete + ) { + yield { + groundingMetadata: serverContent.groundingMetadata, + ...(serverContent.interrupted !== undefined + ? {interrupted: serverContent.interrupted} + : {}), + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + + if (content && content.parts) { + const llmResponse: LlmResponse = { + content: content as Content, + ...(serverContent.interrupted !== undefined + ? {interrupted: serverContent.interrupted} + : {}), + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + + if (!serverContent.turnComplete && serverContent.groundingMetadata) { + llmResponse.groundingMetadata = serverContent.groundingMetadata; + } + + const hasInlineData = content.parts.some((p) => p.inlineData); + for (const part of content.parts) { + if (part.text) { + const currentIsThought = !!part.thought; + if (text && currentIsThought !== isThought) { + yield this.buildFullTextResponse(text, isThought); + text = ''; + isThought = false; + } + text += part.text; + isThought = currentIsThought; + llmResponse.partial = true; + } + } + + // don't yield the merged text event when receiving audio data + if (text && !content.parts.some((p) => p.text) && !hasInlineData) { + yield this.buildFullTextResponse(text, isThought); + text = ''; + isThought = false; + } + + yield llmResponse; + } + + if (serverContent.inputTranscription) { + if (serverContent.inputTranscription.text) { + this._inputTranscriptionText += + serverContent.inputTranscription.text; + yield { + inputTranscription: { + text: serverContent.inputTranscription.text, + finished: false, + }, + partial: true, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + if (serverContent.inputTranscription.finished) { + yield { + inputTranscription: { + text: this._inputTranscriptionText, + finished: true, + }, + partial: false, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this._inputTranscriptionText = ''; + } + } + + if (serverContent.outputTranscription) { + if (serverContent.outputTranscription.text) { + this._outputTranscriptionText += + serverContent.outputTranscription.text; + yield { + outputTranscription: { + text: serverContent.outputTranscription.text, + finished: false, + }, + partial: true, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + if (serverContent.outputTranscription.finished) { + yield { + outputTranscription: { + text: this._outputTranscriptionText, + finished: true, + }, + partial: false, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this._outputTranscriptionText = ''; + } + } + + if ( + serverContent.interrupted || + serverContent.turnComplete || + serverContent.generationComplete + ) { + if (this._inputTranscriptionText) { + yield { + inputTranscription: { + text: this._inputTranscriptionText, + finished: true, + }, + partial: false, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this._inputTranscriptionText = ''; + } + if (this._outputTranscriptionText) { + yield { + outputTranscription: { + text: this._outputTranscriptionText, + finished: true, + }, + partial: false, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this._outputTranscriptionText = ''; + } + } + + if (serverContent.turnComplete) { + let gMetadataToYield = pendingGroundingMetadata; + if (text) { + yield this.buildFullTextResponse(text, isThought, gMetadataToYield); + text = ''; + isThought = false; + gMetadataToYield = undefined; + } + if (toolCallParts.length > 0) { + logger.debug('Returning aggregated toolCallParts'); + yield { + content: {role: 'model', parts: toolCallParts}, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + toolCallParts = []; + } + const finalResponse: LlmResponse = { + turnComplete: true, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + if (serverContent.interrupted !== undefined) { + finalResponse.interrupted = serverContent.interrupted; + } + const finalGrounding = + serverContent.groundingMetadata || gMetadataToYield; + if (finalGrounding !== undefined && finalGrounding !== null) { + finalResponse.groundingMetadata = finalGrounding; + } + yield finalResponse; + break; + } + + if (serverContent.interrupted) { + if (text) { + yield this.buildFullTextResponse(text, isThought); + text = ''; + isThought = false; + } else { + yield { + interrupted: serverContent.interrupted, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + } + } + + if (message.toolCall) { + logger.debug('Received tool call:', message.toolCall); + if (text) { + yield this.buildFullTextResponse(text, isThought); + text = ''; + isThought = false; + } + if (message.toolCall.functionCalls) { + toolCallParts.push( + ...message.toolCall.functionCalls.map((fc) => ({ + functionCall: fc, + })), + ); + } + + const isGemini31 = isGemini31FlashLive(this.modelVersion); + if (isGemini31 && toolCallParts.length > 0) { + logger.debug( + 'Yielding toolCallParts immediately for Gemini 3.1 live tool call', + ); + yield { + content: {role: 'model', parts: toolCallParts}, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + toolCallParts = []; + } + } + + if (message.sessionResumptionUpdate) { + logger.debug('Received session resumption message:', message); + yield { + liveSessionResumptionUpdate: message.sessionResumptionUpdate, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + + if (message.goAway) { + logger.debug('Received GoAway message:', message.goAway); + yield { + goAway: message.goAway, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + } + + if (toolCallParts.length > 0) { + logger.debug('Exited loop with pending toolCallParts'); + yield { + content: {role: 'model', parts: toolCallParts}, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } } /** diff --git a/core/src/models/google_llm.ts b/core/src/models/google_llm.ts index 55737629..3590bf02 100644 --- a/core/src/models/google_llm.ts +++ b/core/src/models/google_llm.ts @@ -10,12 +10,14 @@ import { FileData, GoogleGenAI, HttpOptions, + LiveServerMessage, } from '@google/genai'; import {getBooleanEnvVar, isBrowser} from '../utils/env_aware_utils.js'; import {logger} from '../utils/logger.js'; import {GoogleLLMVariant} from '../utils/variant_utils.js'; +import {AsyncQueue} from '../utils/async_queue.js'; import {StreamingResponseAggregator} from '../utils/streaming_utils.js'; import {BaseLlm} from './base_llm.js'; import {BaseLlmConnection} from './base_llm_connection.js'; @@ -231,10 +233,19 @@ export class Gemini extends BaseLlm { get liveApiClient(): GoogleGenAI { if (!this._liveApiClient) { - this._liveApiClient = new GoogleGenAI({ - apiKey: this.apiKey, - httpOptions: this.getLiveHttpOptions(), - }); + if (this.vertexai) { + this._liveApiClient = new GoogleGenAI({ + vertexai: this.vertexai, + project: this.project, + location: 'global', + httpOptions: this.getLiveHttpOptions(), + }); + } else { + this._liveApiClient = new GoogleGenAI({ + apiKey: this.apiKey, + httpOptions: this.getLiveHttpOptions(), + }); + } } return this._liveApiClient; } @@ -272,15 +283,28 @@ export class Gemini extends BaseLlm { llmRequest.liveConnectConfig.tools = llmRequest.config?.tools; + const modelVersion = llmRequest.model ?? this.model; + const messageQueue = new AsyncQueue(); + const liveSession = await this.liveApiClient.live.connect({ - model: llmRequest.model ?? this.model, + model: modelVersion, config: llmRequest.liveConnectConfig, callbacks: { - // TODO - b/425992518: GenAI SDK inconsistent API, missing methods. - onmessage: () => {}, + onmessage: (message) => { + console.log('E2E Debug: onmessage', JSON.stringify(message)); + messageQueue.push(message); + }, + onerror: (error) => { + console.error('E2E Debug: onerror', error); + messageQueue.error(error); + }, + onclose: () => { + console.log('E2E Debug: onclose'); + messageQueue.close(); + }, }, }); - return new GeminiLlmConnection(liveSession); + return new GeminiLlmConnection(liveSession, modelVersion, messageQueue); } private preprocessRequest(llmRequest: LlmRequest): void { diff --git a/core/src/models/llm_response.ts b/core/src/models/llm_response.ts index 4a869c7d..c5059842 100644 --- a/core/src/models/llm_response.ts +++ b/core/src/models/llm_response.ts @@ -11,6 +11,7 @@ import { GenerateContentResponse, GenerateContentResponseUsageMetadata, GroundingMetadata, + LiveServerGoAway, LiveServerSessionResumptionUpdate, Transcription, } from '@google/genai'; @@ -94,6 +95,15 @@ export interface LlmResponse { * Audio transcription of model output. */ outputTranscription?: Transcription; + + /** The model version used to generate the response. */ + modelVersion?: string; + + /** The session ID of the Live session. */ + liveSessionId?: string; + + /** The GoAway signal from the Live model. */ + goAway?: LiveServerGoAway; } /** diff --git a/core/src/utils/async_queue.ts b/core/src/utils/async_queue.ts new file mode 100644 index 00000000..0a2fb0f6 --- /dev/null +++ b/core/src/utils/async_queue.ts @@ -0,0 +1,65 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * A generic, async-safe queue that implements AsyncIterable. + */ +export class AsyncQueue implements AsyncIterable { + private queue: T[] = []; + private resolvers: Array<{ + resolve: (value: IteratorResult) => void; + reject: (reason?: unknown) => void; + }> = []; + private closed = false; + private errorVal?: unknown; + + push(value: T) { + if (this.closed) return; + if (this.resolvers.length > 0) { + const {resolve} = this.resolvers.shift()!; + resolve({value, done: false}); + } else { + this.queue.push(value); + } + } + + error(err: unknown) { + this.errorVal = err; + while (this.resolvers.length > 0) { + const {reject} = this.resolvers.shift()!; + reject(err); + } + } + + close() { + this.closed = true; + while (this.resolvers.length > 0) { + const {resolve} = this.resolvers.shift()!; + resolve({value: undefined as never, done: true}); + } + } + + [Symbol.asyncIterator](): AsyncIterator { + return { + next: () => { + if (this.errorVal) { + const err = this.errorVal; + this.errorVal = undefined; + return Promise.reject(err); + } + if (this.queue.length > 0) { + return Promise.resolve({value: this.queue.shift()!, done: false}); + } + if (this.closed) { + return Promise.resolve({value: undefined as never, done: true}); + } + return new Promise>((resolve, reject) => { + this.resolvers.push({resolve, reject}); + }); + }, + }; + } +} diff --git a/core/src/utils/model_name.ts b/core/src/utils/model_name.ts index 6b1ea728..79bd6007 100644 --- a/core/src/utils/model_name.ts +++ b/core/src/utils/model_name.ts @@ -94,6 +94,20 @@ export function isGemini2OrAbove(modelString: string): boolean { return parsedVersion.valid && parsedVersion.major >= 2; } +/** + * Check if the model is a Gemini 3.1 Flash Live model. + * + * @param modelString Either a simple model name or path-based model name + * @return true if it's a Gemini 3.1 Flash Live model, false otherwise. + */ +export function isGemini31FlashLive(modelString: string | undefined): boolean { + if (!modelString) { + return false; + } + const modelName = extractModelName(modelString); + return modelName.startsWith('gemini-3.1-flash-live'); +} + /** * Returns True when Gemini model-id validation should be bypassed. */ diff --git a/core/test/models/gemini_llm_connection_test.ts b/core/test/models/gemini_llm_connection_test.ts new file mode 100644 index 00000000..be59dc7d --- /dev/null +++ b/core/test/models/gemini_llm_connection_test.ts @@ -0,0 +1,972 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + Blob, + Content, + GroundingMetadata, + LiveServerGoAway, + LiveServerMessage, +} from '@google/genai'; +import {beforeEach, describe, expect, it, vi} from 'vitest'; +import {GeminiLlmConnection} from '../../src/models/gemini_llm_connection.js'; +import {AsyncQueue} from '../../src/utils/async_queue.js'; + +describe('GeminiLlmConnection', () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let mockSession: any; + let messageQueue: AsyncQueue; + + beforeEach(() => { + mockSession = { + sendClientContent: vi.fn(), + sendToolResponse: vi.fn(), + sendRealtimeInput: vi.fn(), + close: vi.fn(), + }; + messageQueue = new AsyncQueue(); + }); + + describe('sendHistory', () => { + it('should send history with turnComplete based on role for non-Gemini 3.1', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + const history: Content[] = [ + {role: 'user', parts: [{text: 'hello'}]}, + {role: 'model', parts: [{text: 'hi'}]}, + ]; + + await connection.sendHistory(history); + + expect(mockSession.sendClientContent).toHaveBeenCalledWith({ + turns: history, + turnComplete: false, // last is model + }); + }); + + it('should send history with turnComplete=true for Gemini 3.1', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-3.1-flash-live', + ); + const history: Content[] = [ + {role: 'user', parts: [{text: 'hello'}]}, + {role: 'model', parts: [{text: 'hi'}]}, + ]; + + await connection.sendHistory(history); + + expect(mockSession.sendClientContent).toHaveBeenCalledWith({ + turns: history, + turnComplete: true, + }); + }); + + it('should not send history if empty', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + await connection.sendHistory([]); + expect(mockSession.sendClientContent).not.toHaveBeenCalled(); + }); + }); + + describe('sendContent', () => { + it('should send tool response if first part is functionResponse', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + const content: Content = { + parts: [ + { + functionResponse: { + name: 'tool_a', + response: {result: 'ok'}, + id: '1', + }, + }, + ], + }; + + await connection.sendContent(content); + + expect(mockSession.sendToolResponse).toHaveBeenCalledWith({ + functionResponses: [content.parts![0].functionResponse], + }); + }); + + it('should use sendRealtimeInput for Gemini 3.1 single-part text', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-3.1-flash-live', + ); + const content: Content = { + parts: [{text: 'hello'}], + }; + + await connection.sendContent(content); + + expect(mockSession.sendRealtimeInput).toHaveBeenCalledWith({ + text: 'hello', + }); + }); + + it('should use sendClientContent for non-Gemini 3.1 single-part text', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + const content: Content = { + parts: [{text: 'hello'}], + }; + + await connection.sendContent(content); + + expect(mockSession.sendClientContent).toHaveBeenCalledWith({ + turns: [content], + turnComplete: true, + }); + }); + + it('should throw error if content has no parts', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + await expect(connection.sendContent({})).rejects.toThrow( + 'Content must have parts.', + ); + }); + }); + + describe('sendRealtime', () => { + it('should use sendRealtimeInput with media for non-Gemini 3.1/non-Native-Audio', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + const blob: Blob = {mimeType: 'audio/pcm', data: 'base64data'}; + + await connection.sendRealtime(blob); + + expect(mockSession.sendRealtimeInput).toHaveBeenCalledWith({ + media: blob, + }); + }); + + it('should use sendRealtimeInput with audio for Gemini 3.1 audio', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-3.1-flash-live', + ); + const blob: Blob = {mimeType: 'audio/pcm', data: 'base64data'}; + + await connection.sendRealtime(blob); + + expect(mockSession.sendRealtimeInput).toHaveBeenCalledWith({ + audio: blob, + }); + }); + + it('should use sendRealtimeInput with video for Gemini 3.1 image', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-3.1-flash-live', + ); + const blob: Blob = {mimeType: 'image/jpeg', data: 'base64data'}; + + await connection.sendRealtime(blob); + + expect(mockSession.sendRealtimeInput).toHaveBeenCalledWith({ + video: blob, + }); + }); + + it('should use sendRealtimeInput with audio for Native Audio model audio', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash-preview-native-audio', + ); + const blob: Blob = {mimeType: 'audio/pcm', data: 'base64data'}; + + await connection.sendRealtime(blob); + + expect(mockSession.sendRealtimeInput).toHaveBeenCalledWith({ + audio: blob, + }); + }); + + it('should warn and not send if unknown mime type for Gemini 3.1', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-3.1-flash-live', + ); + const blob: Blob = {mimeType: 'text/plain', data: 'data'}; + + await connection.sendRealtime(blob); + + expect(mockSession.sendRealtimeInput).not.toHaveBeenCalled(); + }); + }); + + describe('close', () => { + it('should close the session', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + await connection.close(); + expect(mockSession.close).toHaveBeenCalled(); + }); + }); + + describe('receive', () => { + it('should throw error if message queue is not provided', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + const generator = connection.receive(); + await expect(generator.next()).rejects.toThrow( + 'Message queue is not initialized.', + ); + }); + + it('should yield usage metadata', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + const usageMetadata = { + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }; + messageQueue.push({usageMetadata}); + messageQueue.close(); + + const res = await generator.next(); + expect(res.value).toEqual({ + usageMetadata, + modelVersion: 'gemini-2.5-flash', + }); + expect((await generator.next()).done).toBe(true); + }); + + it('should stream text and yield full response on turnComplete', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + // Chunk 1: partial text + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{text: 'Hello'}], + }, + }, + }); + + // Chunk 2: partial text and turnComplete with interrupted and groundingMetadata + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{text: ' world!'}], + }, + turnComplete: true, + interrupted: false, + groundingMetadata: {groundingChunks: []} as GroundingMetadata, + }, + }); + + const res1 = await generator.next(); + expect(res1.value).toEqual({ + content: {parts: [{text: 'Hello'}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + }); + + const res2 = await generator.next(); + expect(res2.value).toEqual({ + content: {parts: [{text: ' world!'}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + interrupted: false, + }); + + // After turnComplete, it should flush the accumulated text first, including groundingMetadata + const res3 = await generator.next(); + expect(res3.value).toEqual({ + content: { + role: 'model', + parts: [{text: 'Hello world!'}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + groundingMetadata: {groundingChunks: []}, + }); + + // Then it yields the turnComplete status with interrupted and groundingMetadata + const res4 = await generator.next(); + expect(res4.value).toEqual({ + turnComplete: true, + modelVersion: 'gemini-2.5-flash', + interrupted: false, + groundingMetadata: {groundingChunks: []}, + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should flush text when transitioning between thought and non-thought', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + // Chunk 1: thought + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{text: 'Thinking...', thought: true}], + }, + }, + }); + + // Chunk 2: transition to text + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{text: 'Answer is 42.'}], + }, + }, + }); + + messageQueue.push({ + serverContent: { + turnComplete: true, + }, + }); + + const res1 = await generator.next(); // yields partial thought + expect(res1.value).toEqual({ + content: {parts: [{text: 'Thinking...', thought: true}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + }); + + const res2 = await generator.next(); // transitions, flushes 'Thinking...' as full thought + expect(res2.value).toEqual({ + content: { + role: 'model', + parts: [{text: 'Thinking...', thought: true}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + const res3 = await generator.next(); // yields partial text 'Answer is 42.' + expect(res3.value).toEqual({ + content: {parts: [{text: 'Answer is 42.'}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + }); + + const res4 = await generator.next(); // turnComplete flushes 'Answer is 42.' as full text + expect(res4.value).toEqual({ + content: { + role: 'model', + parts: [{text: 'Answer is 42.'}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + const res5 = await generator.next(); // yields turnComplete + expect(res5.value).toEqual({ + turnComplete: true, + modelVersion: 'gemini-2.5-flash', + }); + }); + + it('should handle input transcription partial and finished', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + serverContent: { + inputTranscription: {text: 'hello', finished: false}, + }, + }); + + messageQueue.push({ + serverContent: { + inputTranscription: {text: ' world', finished: true}, + }, + }); + + messageQueue.close(); + + const res1 = await generator.next(); + expect(res1.value).toEqual({ + inputTranscription: {text: 'hello', finished: false}, + partial: true, + modelVersion: 'gemini-2.5-flash', + }); + + const res2 = await generator.next(); + expect(res2.value).toEqual({ + inputTranscription: {text: ' world', finished: false}, + partial: true, + modelVersion: 'gemini-2.5-flash', + }); + + const res3 = await generator.next(); + expect(res3.value).toEqual({ + inputTranscription: {text: 'hello world', finished: true}, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should flush pending transcription on interrupted', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + serverContent: { + inputTranscription: {text: 'hello', finished: false}, + }, + }); + + messageQueue.push({ + serverContent: { + interrupted: true, + }, + }); + messageQueue.close(); + + const res1 = await generator.next(); // partial transcription + expect(res1.value.inputTranscription).toEqual({ + text: 'hello', + finished: false, + }); + + const res2 = await generator.next(); // flush transcription on interrupted + expect(res2.value).toEqual({ + inputTranscription: {text: 'hello', finished: true}, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + const res3 = await generator.next(); // interrupted status + expect(res3.value).toEqual({ + interrupted: true, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should yield groundingMetadata on partial response if turnComplete is not true', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{text: 'Partial text'}], + }, + groundingMetadata: { + groundingChunks: [ + {web: {uri: 'https://google.com', title: 'Google'}}, + ], + } as GroundingMetadata, + }, + }); + messageQueue.close(); + + const res1 = await generator.next(); + expect(res1.value).toEqual({ + content: {parts: [{text: 'Partial text'}]}, + partial: true, + modelVersion: 'gemini-2.5-flash', + groundingMetadata: { + groundingChunks: [ + {web: {uri: 'https://google.com', title: 'Google'}}, + ], + }, + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should yield standalone groundingMetadata when content is empty and turnComplete is not true', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + serverContent: { + groundingMetadata: { + groundingChunks: [ + {web: {uri: 'https://google.com', title: 'Google'}}, + ], + } as GroundingMetadata, + turnComplete: false, + interrupted: false, + }, + }); + messageQueue.close(); + + const res1 = await generator.next(); + expect(res1.value).toEqual({ + groundingMetadata: { + groundingChunks: [ + {web: {uri: 'https://google.com', title: 'Google'}}, + ], + }, + interrupted: false, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should flush accumulated text when receiving a non-text modelTurn part', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + // Push text part + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{text: 'Hello'}], + }, + }, + }); + + // Push non-text part (e.g. functionCall inside modelTurn parts) + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + }, + }); + messageQueue.close(); + + // First yield: the partial response for 'Hello' + const res1 = await generator.next(); + expect(res1.value).toEqual({ + content: {parts: [{text: 'Hello'}]}, + partial: true, + modelVersion: 'gemini-2.5-flash', + }); + + // Second yield: should flush 'Hello' as a full text response + const res2 = await generator.next(); + expect(res2.value).toEqual({ + content: { + role: 'model', + parts: [{text: 'Hello'}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + // Third yield: the modelTurn response with the functionCall + const res3 = await generator.next(); + expect(res3.value).toEqual({ + content: { + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should buffer tool calls and yield at turnComplete for non-Gemini 3.1', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + toolCall: { + functionCalls: [{name: 'tool_a', args: {x: 1}, id: '1'}], + }, + }); + + messageQueue.push({ + serverContent: { + turnComplete: true, + }, + }); + + // For non-Gemini 3.1, tool call is buffered. + // So we don't get anything on toolCall message (except if there was text, but there isn't). + // On turnComplete, it should yield the aggregated tool calls first, then turnComplete. + const res1 = await generator.next(); + expect(res1.value).toEqual({ + content: { + role: 'model', + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + modelVersion: 'gemini-2.5-flash', + }); + + const res2 = await generator.next(); + expect(res2.value).toEqual({ + turnComplete: true, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should yield tool calls immediately for Gemini 3.1', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-3.1-flash-live', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + toolCall: { + functionCalls: [{name: 'tool_a', args: {x: 1}, id: '1'}], + }, + }); + + const res1 = await generator.next(); + expect(res1.value).toEqual({ + content: { + role: 'model', + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + modelVersion: 'gemini-3.1-flash-live', + }); + + messageQueue.close(); + expect((await generator.next()).done).toBe(true); + }); + + it('should yield session resumption update', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + const resumptionUpdate = {resumed: true}; + messageQueue.push({sessionResumptionUpdate: resumptionUpdate}); + messageQueue.close(); + + const res = await generator.next(); + expect(res.value).toEqual({ + liveSessionResumptionUpdate: resumptionUpdate, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should yield go away', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + const goAway = {goAway: true}; // mock + messageQueue.push({goAway: goAway as LiveServerGoAway}); + messageQueue.close(); + + const res = await generator.next(); + expect(res.value).toEqual({ + goAway, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should yield pending tool calls on queue close', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + toolCall: { + functionCalls: [{name: 'tool_a', args: {x: 1}, id: '1'}], + }, + }); + messageQueue.close(); + + const res = await generator.next(); + expect(res.value).toEqual({ + content: { + role: 'model', + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + modelVersion: 'gemini-2.5-flash', + }); + expect((await generator.next()).done).toBe(true); + }); + + it('should handle undefined modelVersion in isGemini31FlashLive check', async () => { + const connection = new GeminiLlmConnection( + mockSession, + undefined, + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + toolCall: { + functionCalls: [{name: 'tool_a', args: {x: 1}, id: '1'}], + }, + }); + messageQueue.close(); + + const res = await generator.next(); + expect(res.value).toEqual({ + content: { + role: 'model', + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should yield accumulated text on interrupted', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{text: 'Hello'}], + }, + }, + }); + + messageQueue.push({ + serverContent: { + interrupted: true, + }, + }); + messageQueue.close(); + + const res1 = await generator.next(); + expect(res1.value).toEqual({ + content: {parts: [{text: 'Hello'}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + }); + + const res2 = await generator.next(); + expect(res2.value).toEqual({ + content: { + role: 'model', + parts: [{text: 'Hello'}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should yield accumulated text on tool call', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + serverContent: { + modelTurn: { + parts: [{text: 'Hello'}], + }, + }, + }); + + messageQueue.push({ + toolCall: { + functionCalls: [{name: 'tool_a', args: {x: 1}, id: '1'}], + }, + }); + messageQueue.close(); + + const res1 = await generator.next(); + expect(res1.value).toEqual({ + content: {parts: [{text: 'Hello'}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + }); + + const res2 = await generator.next(); + expect(res2.value).toEqual({ + content: { + role: 'model', + parts: [{text: 'Hello'}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + const res3 = await generator.next(); + expect(res3.value).toEqual({ + content: { + role: 'model', + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should handle output transcription partial and finished', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + serverContent: { + outputTranscription: {text: 'hello', finished: false}, + }, + }); + + messageQueue.push({ + serverContent: { + outputTranscription: {text: ' world', finished: true}, + }, + }); + + messageQueue.close(); + + const res1 = await generator.next(); + expect(res1.value).toEqual({ + outputTranscription: {text: 'hello', finished: false}, + partial: true, + modelVersion: 'gemini-2.5-flash', + }); + + const res2 = await generator.next(); + expect(res2.value).toEqual({ + outputTranscription: {text: ' world', finished: false}, + partial: true, + modelVersion: 'gemini-2.5-flash', + }); + + const res3 = await generator.next(); + expect(res3.value).toEqual({ + outputTranscription: {text: 'hello world', finished: true}, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + + it('should flush pending output transcription on interrupted', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + messageQueue, + ); + const generator = connection.receive(); + + messageQueue.push({ + serverContent: { + outputTranscription: {text: 'hello', finished: false}, + }, + }); + + messageQueue.push({ + serverContent: { + interrupted: true, + }, + }); + messageQueue.close(); + + const res1 = await generator.next(); + expect(res1.value.outputTranscription).toEqual({ + text: 'hello', + finished: false, + }); + + const res2 = await generator.next(); + expect(res2.value).toEqual({ + outputTranscription: {text: 'hello', finished: true}, + partial: false, + modelVersion: 'gemini-2.5-flash', + }); + + const res3 = await generator.next(); + expect(res3.value).toEqual({ + interrupted: true, + modelVersion: 'gemini-2.5-flash', + }); + + expect((await generator.next()).done).toBe(true); + }); + }); +}); diff --git a/core/test/models/google_llm_test.ts b/core/test/models/google_llm_test.ts index 986893c4..aec59641 100644 --- a/core/test/models/google_llm_test.ts +++ b/core/test/models/google_llm_test.ts @@ -29,6 +29,14 @@ vi.mock('@google/genai', async (importOriginal) => { generateContentStream: vi.fn(), generateContent: vi.fn(), }, + live: { + connect: vi.fn().mockResolvedValue({ + sendClientContent: vi.fn(), + sendToolResponse: vi.fn(), + sendRealtimeInput: vi.fn(), + close: vi.fn(), + }), + }, vertexai: options.vertexai || false, })), }; @@ -108,6 +116,29 @@ describe('GoogleLlm', () => { expect(liveOptions.apiVersion).toBeDefined(); }); + it('should initialize liveApiClient with global location for Vertex AI', () => { + const llm = new TestGemini({ + model: 'projects/p/locations/us-central1/models/gemini-2.5-flash', + vertexai: true, + project: 'p', + location: 'us-central1', + }); + + const spy = vi.mocked(GoogleGenAI); + spy.mockClear(); + + const client = llm.liveApiClient; + expect(client).toBeDefined(); + + expect(spy).toHaveBeenCalledWith( + expect.objectContaining({ + vertexai: true, + project: 'p', + location: 'global', + }), + ); + }); + describe('generateContentAsync streaming thoughtSignature propagation', () => { function makeStreamingChunk( parts: Record[], @@ -584,4 +615,42 @@ describe('GoogleLlm', () => { expect(responses.length).toBeGreaterThanOrEqual(2); }); }); + + describe('connect', () => { + it('should connect to live API and return a connection object', async () => { + const llm = new TestGemini({apiKey: 'test-key'}); + + const request: LlmRequest = { + model: 'gemini-2.5-flash', + liveConnectConfig: { + generationConfig: {responseModalities: ['audio']}, + }, + config: { + systemInstruction: 'You are a helpful assistant.', + tools: [{googleSearch: {}}], + }, + toolsDict: {}, + }; + + const connection = await llm.connect(request); + expect(connection).toBeDefined(); + expect(typeof connection.receive).toBe('function'); + expect(typeof connection.sendContent).toBe('function'); + expect(typeof connection.sendRealtime).toBe('function'); + + expect(llm.liveApiClient.live.connect).toHaveBeenCalledWith( + expect.objectContaining({ + model: 'gemini-2.5-flash', + config: expect.objectContaining({ + generationConfig: {responseModalities: ['audio']}, + systemInstruction: { + role: 'system', + parts: [{text: 'You are a helpful assistant.'}], + }, + tools: [{googleSearch: {}}], + }), + }), + ); + }); + }); }); diff --git a/core/test/utils/async_queue_test.ts b/core/test/utils/async_queue_test.ts new file mode 100644 index 00000000..a20a52c2 --- /dev/null +++ b/core/test/utils/async_queue_test.ts @@ -0,0 +1,88 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import {describe, expect, it} from 'vitest'; +import {AsyncQueue} from '../../src/utils/async_queue.js'; + +describe('AsyncQueue', () => { + it('should yield pushed values', async () => { + const queue = new AsyncQueue(); + queue.push(1); + queue.push(2); + queue.close(); + + const results: number[] = []; + for await (const val of queue) { + results.push(val); + } + + expect(results).toEqual([1, 2]); + }); + + it('should handle values pushed after iteration started', async () => { + const queue = new AsyncQueue(); + const results: number[] = []; + + const iteration = (async () => { + for await (const val of queue) { + results.push(val); + } + })(); + + queue.push(1); + queue.push(2); + queue.close(); + + await iteration; + + expect(results).toEqual([1, 2]); + }); + + it('should terminate iteration when closed empty', async () => { + const queue = new AsyncQueue(); + queue.close(); + + const results: number[] = []; + for await (const val of queue) { + results.push(val); + } + + expect(results).toEqual([]); + }); + + it('should propagate error to pending and subsequent next calls', async () => { + const queue = new AsyncQueue(); + const iterator = queue[Symbol.asyncIterator](); + + const pendingNext = iterator.next(); + queue.error(new Error('Test error')); + + await expect(pendingNext).rejects.toThrow('Test error'); + await expect(iterator.next()).rejects.toThrow('Test error'); + }); + + it('should ignore push after close', async () => { + const queue = new AsyncQueue(); + queue.close(); + queue.push(1); + + const results: number[] = []; + for await (const val of queue) { + results.push(val); + } + + expect(results).toEqual([]); + }); + + it('should resolve pending next() call when closed', async () => { + const queue = new AsyncQueue(); + const iterator = queue[Symbol.asyncIterator](); + const pending = iterator.next(); + queue.close(); + const res = await pending; + expect(res.done).toBe(true); + }); +}); diff --git a/core/test/utils/model_name_test.ts b/core/test/utils/model_name_test.ts index 21152384..4a03360c 100644 --- a/core/test/utils/model_name_test.ts +++ b/core/test/utils/model_name_test.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import {isGemini2OrAbove} from '@google/adk'; +import {isGemini2OrAbove, isGemini31FlashLive} from '@google/adk'; import {describe, expect, it} from 'vitest'; describe('isGemini2OrAbove', () => { @@ -54,3 +54,22 @@ describe('isGemini2OrAbove', () => { } }); }); + +describe('isGemini31FlashLive', () => { + it('should return true for valid Gemini 3.1 Flash Live models', () => { + expect(isGemini31FlashLive('gemini-3.1-flash-live')).toBe(true); + expect(isGemini31FlashLive('gemini-3.1-flash-live-preview')).toBe(true); + expect( + isGemini31FlashLive( + 'projects/my-project/locations/us-central1/publishers/google/models/gemini-3.1-flash-live-001', + ), + ).toBe(true); + }); + + it('should return false for other models', () => { + expect(isGemini31FlashLive('gemini-2.5-flash')).toBe(false); + expect(isGemini31FlashLive('gemini-3.0-flash')).toBe(false); + expect(isGemini31FlashLive(undefined)).toBe(false); + expect(isGemini31FlashLive('')).toBe(false); + }); +}); diff --git a/tests/e2e/live_model_test.ts b/tests/e2e/live_model_test.ts new file mode 100644 index 00000000..4b910d8d --- /dev/null +++ b/tests/e2e/live_model_test.ts @@ -0,0 +1,79 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import {Gemini, LlmRequest} from '@google/adk'; +import {describe, expect, it} from 'vitest'; + +describe('Live Gemini Live Connection E2E', () => { + const project = 'amaadmartin-claw-15058'; + const location = 'us-central1'; + + it('should connect and stream responses from Gemini Live using Vertex AI', async () => { + console.log( + `Connecting to Live API using project: ${project}, location: ${location}`, + ); + const llm = new Gemini({ + model: 'gemini-2.0-flash-exp', + vertexai: true, + project, + location, + }); + + const request: LlmRequest = { + model: 'gemini-2.0-flash-exp', + liveConnectConfig: { + responseModalities: ['text'], + }, + config: { + systemInstruction: + 'You are a helpful assistant. Answer concisely in one sentence.', + }, + toolsDict: {}, + }; + + const connection = await llm.connect(request); + expect(connection).toBeDefined(); + + const generator = connection.receive(); + + // Send a message to the live model + await connection.sendContent({ + parts: [{text: 'Hello Gemini Live! What is 2 + 2?'}], + }); + + // Consume events + let accumulatedText = ''; + let gotTurnComplete = false; + + for (let i = 0; i < 20; i++) { + const next = await generator.next(); + if (next.done) { + break; + } + const response = next.value; + console.log('Received response event:', JSON.stringify(response)); + + if (response.content?.parts) { + for (const part of response.content.parts) { + if (part.text) { + accumulatedText += part.text; + } + } + } + if (response.turnComplete) { + gotTurnComplete = true; + break; + } + } + + console.log('Accumulated text response:', accumulatedText); + expect(accumulatedText.length).toBeGreaterThan(0); + expect(accumulatedText).toContain('4'); + expect(gotTurnComplete).toBe(true); + + await connection.close(); + }, 30000); +}); From 954f286489e248bae86ffa7cbe4d843cc69c24ea Mon Sep 17 00:00:00 2001 From: Amaad Martin Date: Tue, 2 Jun 2026 14:02:31 -0700 Subject: [PATCH 2/7] feat(core): support general Gemini 3.x Live models and skip E2E in CI --- core/src/utils/model_name.ts | 2 +- core/test/utils/model_name_test.ts | 9 ++++++++- tests/e2e/live_model_test.ts | 8 +++++--- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/utils/model_name.ts b/core/src/utils/model_name.ts index 79bd6007..26f5a1a2 100644 --- a/core/src/utils/model_name.ts +++ b/core/src/utils/model_name.ts @@ -105,7 +105,7 @@ export function isGemini31FlashLive(modelString: string | undefined): boolean { return false; } const modelName = extractModelName(modelString); - return modelName.startsWith('gemini-3.1-flash-live'); + return modelName.startsWith('gemini-3.') && modelName.includes('-flash-live'); } /** diff --git a/core/test/utils/model_name_test.ts b/core/test/utils/model_name_test.ts index 4a03360c..e1485c93 100644 --- a/core/test/utils/model_name_test.ts +++ b/core/test/utils/model_name_test.ts @@ -56,14 +56,21 @@ describe('isGemini2OrAbove', () => { }); describe('isGemini31FlashLive', () => { - it('should return true for valid Gemini 3.1 Flash Live models', () => { + it('should return true for valid Gemini 3.x Flash Live models', () => { expect(isGemini31FlashLive('gemini-3.1-flash-live')).toBe(true); expect(isGemini31FlashLive('gemini-3.1-flash-live-preview')).toBe(true); + expect(isGemini31FlashLive('gemini-3.5-flash-live')).toBe(true); + expect(isGemini31FlashLive('gemini-3.5-flash-live-preview')).toBe(true); expect( isGemini31FlashLive( 'projects/my-project/locations/us-central1/publishers/google/models/gemini-3.1-flash-live-001', ), ).toBe(true); + expect( + isGemini31FlashLive( + 'projects/my-project/locations/us-central1/publishers/google/models/gemini-3.5-flash-live-001', + ), + ).toBe(true); }); it('should return false for other models', () => { diff --git a/tests/e2e/live_model_test.ts b/tests/e2e/live_model_test.ts index 4b910d8d..881df002 100644 --- a/tests/e2e/live_model_test.ts +++ b/tests/e2e/live_model_test.ts @@ -7,7 +7,9 @@ import {Gemini, LlmRequest} from '@google/adk'; import {describe, expect, it} from 'vitest'; -describe('Live Gemini Live Connection E2E', () => { +const isCI = process.env.CI === 'true'; + +describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { const project = 'amaadmartin-claw-15058'; const location = 'us-central1'; @@ -16,14 +18,14 @@ describe('Live Gemini Live Connection E2E', () => { `Connecting to Live API using project: ${project}, location: ${location}`, ); const llm = new Gemini({ - model: 'gemini-2.0-flash-exp', + model: 'gemini-2.5-flash', vertexai: true, project, location, }); const request: LlmRequest = { - model: 'gemini-2.0-flash-exp', + model: 'gemini-2.5-flash', liveConnectConfig: { responseModalities: ['text'], }, From 4edea3494f1157c8b36a7acdb2eac71e4fb5a6fb Mon Sep 17 00:00:00 2001 From: Amaad Martin Date: Tue, 2 Jun 2026 14:07:01 -0700 Subject: [PATCH 3/7] test(e2e): remove hardcoded personal GCP project ID --- tests/e2e/live_model_test.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/e2e/live_model_test.ts b/tests/e2e/live_model_test.ts index 881df002..f9bbab25 100644 --- a/tests/e2e/live_model_test.ts +++ b/tests/e2e/live_model_test.ts @@ -10,8 +10,11 @@ import {describe, expect, it} from 'vitest'; const isCI = process.env.CI === 'true'; describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { - const project = 'amaadmartin-claw-15058'; - const location = 'us-central1'; + const project = + process.env.GCP_PROJECT || + process.env.GOOGLE_CLOUD_PROJECT || + 'placeholder-project'; + const location = process.env.GCP_LOCATION || 'us-central1'; it('should connect and stream responses from Gemini Live using Vertex AI', async () => { console.log( From 1d7e7205318070d9375b444707ed5f5f24beef69 Mon Sep 17 00:00:00 2001 From: Amaad Martin Date: Thu, 4 Jun 2026 13:54:10 -0700 Subject: [PATCH 4/7] feat(core): generalize Gemini 3.1 checks to general 3.x series and rename connection helpers --- core/src/common.ts | 2 +- core/src/models/gemini_llm_connection.ts | 22 ++++++++--------- core/src/utils/model_name.ts | 6 ++--- .../test/models/gemini_llm_connection_test.ts | 24 +++++++++---------- core/test/utils/model_name_test.ts | 24 +++++++++---------- 5 files changed, 39 insertions(+), 39 deletions(-) diff --git a/core/src/common.ts b/core/src/common.ts index bf6e0d01..dfc607a4 100644 --- a/core/src/common.ts +++ b/core/src/common.ts @@ -239,7 +239,7 @@ export type { export {VertexRagRetrievalTool} from './tools/vertex_rag_retrieval_tool.js'; export {LogLevel, getLogger, setLogLevel, setLogger} from './utils/logger.js'; export type {Logger} from './utils/logger.js'; -export {isGemini2OrAbove, isGemini31FlashLive} from './utils/model_name.js'; +export {isGemini2OrAbove, isGemini3xFlashLive} from './utils/model_name.js'; export {zodObjectToSchema} from './utils/simple_zod_to_json.js'; export {GoogleLLMVariant} from './utils/variant_utils.js'; export {version} from './version.js'; diff --git a/core/src/models/gemini_llm_connection.ts b/core/src/models/gemini_llm_connection.ts index 48963cdb..e68cf61d 100644 --- a/core/src/models/gemini_llm_connection.ts +++ b/core/src/models/gemini_llm_connection.ts @@ -15,7 +15,7 @@ import { } from '@google/genai'; import {logger} from '../utils/logger.js'; -import {isGemini31FlashLive} from '../utils/model_name.js'; +import {isGemini3xFlashLive} from '../utils/model_name.js'; import {BaseLlmConnection} from './base_llm_connection.js'; import {LlmResponse} from './llm_response.js'; @@ -47,10 +47,10 @@ export class GeminiLlmConnection implements BaseLlmConnection { ); if (contents.length > 0) { - const isGemini31 = isGemini31FlashLive(this.modelVersion); + const isGemini3x = isGemini3xFlashLive(this.modelVersion); this.geminiSession.sendClientContent({ turns: contents, - turnComplete: isGemini31 + turnComplete: isGemini3x ? true : contents[contents.length - 1].role === 'user', }); @@ -83,9 +83,9 @@ export class GeminiLlmConnection implements BaseLlmConnection { }); } else { logger.debug('Sending LLM new content', content); - const isGemini31 = isGemini31FlashLive(this.modelVersion); - if (isGemini31 && content.parts.length === 1 && content.parts[0].text) { - logger.debug('Using sendRealtimeInput for Gemini 3.1 text input'); + const isGemini3x = isGemini3xFlashLive(this.modelVersion); + if (isGemini3x && content.parts.length === 1 && content.parts[0].text) { + logger.debug('Using sendRealtimeInput for Gemini 3.x text input'); this.geminiSession.sendRealtimeInput({text: content.parts[0].text}); } else { this.geminiSession.sendClientContent({ @@ -103,10 +103,10 @@ export class GeminiLlmConnection implements BaseLlmConnection { */ async sendRealtime(blob: Blob): Promise { logger.debug('Sending LLM Blob:', blob); - const isGemini31 = isGemini31FlashLive(this.modelVersion); + const isGemini3x = isGemini3xFlashLive(this.modelVersion); const isNativeAudio = this.modelVersion?.includes('native-audio'); - if (isGemini31 || isNativeAudio) { + if (isGemini3x || isNativeAudio) { if (blob.mimeType?.startsWith('audio/')) { this.geminiSession.sendRealtimeInput({audio: blob}); } else if (blob.mimeType?.startsWith('image/')) { @@ -381,10 +381,10 @@ export class GeminiLlmConnection implements BaseLlmConnection { ); } - const isGemini31 = isGemini31FlashLive(this.modelVersion); - if (isGemini31 && toolCallParts.length > 0) { + const isGemini3x = isGemini3xFlashLive(this.modelVersion); + if (isGemini3x && toolCallParts.length > 0) { logger.debug( - 'Yielding toolCallParts immediately for Gemini 3.1 live tool call', + 'Yielding toolCallParts immediately for Gemini 3.x live tool call', ); yield { content: {role: 'model', parts: toolCallParts}, diff --git a/core/src/utils/model_name.ts b/core/src/utils/model_name.ts index 26f5a1a2..6851eaba 100644 --- a/core/src/utils/model_name.ts +++ b/core/src/utils/model_name.ts @@ -95,12 +95,12 @@ export function isGemini2OrAbove(modelString: string): boolean { } /** - * Check if the model is a Gemini 3.1 Flash Live model. + * Check if the model is a Gemini 3.x Flash Live model. * * @param modelString Either a simple model name or path-based model name - * @return true if it's a Gemini 3.1 Flash Live model, false otherwise. + * @return true if it's a Gemini 3.x Flash Live model, false otherwise. */ -export function isGemini31FlashLive(modelString: string | undefined): boolean { +export function isGemini3xFlashLive(modelString: string | undefined): boolean { if (!modelString) { return false; } diff --git a/core/test/models/gemini_llm_connection_test.ts b/core/test/models/gemini_llm_connection_test.ts index be59dc7d..34c0dfa9 100644 --- a/core/test/models/gemini_llm_connection_test.ts +++ b/core/test/models/gemini_llm_connection_test.ts @@ -31,7 +31,7 @@ describe('GeminiLlmConnection', () => { }); describe('sendHistory', () => { - it('should send history with turnComplete based on role for non-Gemini 3.1', async () => { + it('should send history with turnComplete based on role for non-Gemini 3.x', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-2.5-flash', @@ -49,7 +49,7 @@ describe('GeminiLlmConnection', () => { }); }); - it('should send history with turnComplete=true for Gemini 3.1', async () => { + it('should send history with turnComplete=true for Gemini 3.x', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-3.1-flash-live', @@ -102,7 +102,7 @@ describe('GeminiLlmConnection', () => { }); }); - it('should use sendRealtimeInput for Gemini 3.1 single-part text', async () => { + it('should use sendRealtimeInput for Gemini 3.x single-part text', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-3.1-flash-live', @@ -118,7 +118,7 @@ describe('GeminiLlmConnection', () => { }); }); - it('should use sendClientContent for non-Gemini 3.1 single-part text', async () => { + it('should use sendClientContent for non-Gemini 3.x single-part text', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-2.5-flash', @@ -147,7 +147,7 @@ describe('GeminiLlmConnection', () => { }); describe('sendRealtime', () => { - it('should use sendRealtimeInput with media for non-Gemini 3.1/non-Native-Audio', async () => { + it('should use sendRealtimeInput with media for non-Gemini 3.x/non-Native-Audio', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-2.5-flash', @@ -161,7 +161,7 @@ describe('GeminiLlmConnection', () => { }); }); - it('should use sendRealtimeInput with audio for Gemini 3.1 audio', async () => { + it('should use sendRealtimeInput with audio for Gemini 3.x audio', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-3.1-flash-live', @@ -175,7 +175,7 @@ describe('GeminiLlmConnection', () => { }); }); - it('should use sendRealtimeInput with video for Gemini 3.1 image', async () => { + it('should use sendRealtimeInput with video for Gemini 3.x image', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-3.1-flash-live', @@ -203,7 +203,7 @@ describe('GeminiLlmConnection', () => { }); }); - it('should warn and not send if unknown mime type for Gemini 3.1', async () => { + it('should warn and not send if unknown mime type for Gemini 3.x', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-3.1-flash-live', @@ -623,7 +623,7 @@ describe('GeminiLlmConnection', () => { expect((await generator.next()).done).toBe(true); }); - it('should buffer tool calls and yield at turnComplete for non-Gemini 3.1', async () => { + it('should buffer tool calls and yield at turnComplete for non-Gemini 3.x', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-2.5-flash', @@ -643,7 +643,7 @@ describe('GeminiLlmConnection', () => { }, }); - // For non-Gemini 3.1, tool call is buffered. + // For non-Gemini 3.x, tool call is buffered. // So we don't get anything on toolCall message (except if there was text, but there isn't). // On turnComplete, it should yield the aggregated tool calls first, then turnComplete. const res1 = await generator.next(); @@ -664,7 +664,7 @@ describe('GeminiLlmConnection', () => { expect((await generator.next()).done).toBe(true); }); - it('should yield tool calls immediately for Gemini 3.1', async () => { + it('should yield tool calls immediately for Gemini 3.x', async () => { const connection = new GeminiLlmConnection( mockSession, 'gemini-3.1-flash-live', @@ -759,7 +759,7 @@ describe('GeminiLlmConnection', () => { expect((await generator.next()).done).toBe(true); }); - it('should handle undefined modelVersion in isGemini31FlashLive check', async () => { + it('should handle undefined modelVersion in isGemini3xFlashLive check', async () => { const connection = new GeminiLlmConnection( mockSession, undefined, diff --git a/core/test/utils/model_name_test.ts b/core/test/utils/model_name_test.ts index e1485c93..caabe402 100644 --- a/core/test/utils/model_name_test.ts +++ b/core/test/utils/model_name_test.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import {isGemini2OrAbove, isGemini31FlashLive} from '@google/adk'; +import {isGemini2OrAbove, isGemini3xFlashLive} from '@google/adk'; import {describe, expect, it} from 'vitest'; describe('isGemini2OrAbove', () => { @@ -55,28 +55,28 @@ describe('isGemini2OrAbove', () => { }); }); -describe('isGemini31FlashLive', () => { +describe('isGemini3xFlashLive', () => { it('should return true for valid Gemini 3.x Flash Live models', () => { - expect(isGemini31FlashLive('gemini-3.1-flash-live')).toBe(true); - expect(isGemini31FlashLive('gemini-3.1-flash-live-preview')).toBe(true); - expect(isGemini31FlashLive('gemini-3.5-flash-live')).toBe(true); - expect(isGemini31FlashLive('gemini-3.5-flash-live-preview')).toBe(true); + expect(isGemini3xFlashLive('gemini-3.1-flash-live')).toBe(true); + expect(isGemini3xFlashLive('gemini-3.1-flash-live-preview')).toBe(true); + expect(isGemini3xFlashLive('gemini-3.5-flash-live')).toBe(true); + expect(isGemini3xFlashLive('gemini-3.5-flash-live-preview')).toBe(true); expect( - isGemini31FlashLive( + isGemini3xFlashLive( 'projects/my-project/locations/us-central1/publishers/google/models/gemini-3.1-flash-live-001', ), ).toBe(true); expect( - isGemini31FlashLive( + isGemini3xFlashLive( 'projects/my-project/locations/us-central1/publishers/google/models/gemini-3.5-flash-live-001', ), ).toBe(true); }); it('should return false for other models', () => { - expect(isGemini31FlashLive('gemini-2.5-flash')).toBe(false); - expect(isGemini31FlashLive('gemini-3.0-flash')).toBe(false); - expect(isGemini31FlashLive(undefined)).toBe(false); - expect(isGemini31FlashLive('')).toBe(false); + expect(isGemini3xFlashLive('gemini-2.5-flash')).toBe(false); + expect(isGemini3xFlashLive('gemini-3.0-flash')).toBe(false); + expect(isGemini3xFlashLive(undefined)).toBe(false); + expect(isGemini3xFlashLive('')).toBe(false); }); }); From f6d7a0f6650829f79c539d75b053003b9ba4dfe1 Mon Sep 17 00:00:00 2001 From: Amaad Martin Date: Tue, 9 Jun 2026 11:05:11 -0700 Subject: [PATCH 5/7] refactor(core): Extract live message transformation logic into LiveResponseAggregator utility and add tests --- core/src/models/gemini_llm_connection.ts | 283 +------------- core/src/models/google_llm.ts | 3 - core/src/utils/live_connection_utils.ts | 312 ++++++++++++++++ core/test/utils/live_connection_utils_test.ts | 350 ++++++++++++++++++ 4 files changed, 670 insertions(+), 278 deletions(-) create mode 100644 core/src/utils/live_connection_utils.ts create mode 100644 core/test/utils/live_connection_utils_test.ts diff --git a/core/src/models/gemini_llm_connection.ts b/core/src/models/gemini_llm_connection.ts index e68cf61d..c294dfaf 100644 --- a/core/src/models/gemini_llm_connection.ts +++ b/core/src/models/gemini_llm_connection.ts @@ -8,12 +8,11 @@ import { Blob, Content, FunctionResponse, - GroundingMetadata, LiveServerMessage, - Part, Session, } from '@google/genai'; +import {LiveResponseAggregator} from '../utils/live_connection_utils.js'; import {logger} from '../utils/logger.js'; import {isGemini3xFlashLive} from '../utils/model_name.js'; @@ -22,9 +21,6 @@ import {LlmResponse} from './llm_response.js'; /** The Gemini model connection. */ export class GeminiLlmConnection implements BaseLlmConnection { - private _inputTranscriptionText = ''; - private _outputTranscriptionText = ''; - constructor( private readonly geminiSession: Session, private readonly modelVersion?: string, @@ -133,290 +129,27 @@ export class GeminiLlmConnection implements BaseLlmConnection { * @param groundingMetadata The grounding metadata to include. * @returns An LlmResponse containing the full text. */ - private buildFullTextResponse( - text: string, - isThought = false, - groundingMetadata?: GroundingMetadata, - ): LlmResponse { - const part: Part = {text}; - if (isThought) { - part.thought = true; - } - const response: LlmResponse = { - content: { - role: 'model', - parts: [part], - }, - partial: false, - }; - if (groundingMetadata !== undefined && groundingMetadata !== null) { - response.groundingMetadata = groundingMetadata; - } - if (this.modelVersion) { - response.modelVersion = this.modelVersion; - } - return response; - } - async *receive(): AsyncGenerator { if (!this.messageQueue) { throw new Error('Message queue is not initialized.'); } - let text = ''; - let isThought = false; - let toolCallParts: Part[] = []; - let pendingGroundingMetadata: GroundingMetadata | undefined = undefined; + const aggregator = new LiveResponseAggregator(this.modelVersion); for await (const message of this.messageQueue) { logger.debug('Got LLM Live message:', message); - if (message.usageMetadata) { - yield { - usageMetadata: message.usageMetadata, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - } - - if (message.serverContent) { - const serverContent = message.serverContent; - const content = serverContent.modelTurn; - - if (serverContent.groundingMetadata) { - pendingGroundingMetadata = serverContent.groundingMetadata; - } - - // Standalone groundingMetadata event (when content is empty) - if ( - !(content && content.parts) && - serverContent.groundingMetadata && - !serverContent.turnComplete - ) { - yield { - groundingMetadata: serverContent.groundingMetadata, - ...(serverContent.interrupted !== undefined - ? {interrupted: serverContent.interrupted} - : {}), - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - } - - if (content && content.parts) { - const llmResponse: LlmResponse = { - content: content as Content, - ...(serverContent.interrupted !== undefined - ? {interrupted: serverContent.interrupted} - : {}), - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - - if (!serverContent.turnComplete && serverContent.groundingMetadata) { - llmResponse.groundingMetadata = serverContent.groundingMetadata; - } - - const hasInlineData = content.parts.some((p) => p.inlineData); - for (const part of content.parts) { - if (part.text) { - const currentIsThought = !!part.thought; - if (text && currentIsThought !== isThought) { - yield this.buildFullTextResponse(text, isThought); - text = ''; - isThought = false; - } - text += part.text; - isThought = currentIsThought; - llmResponse.partial = true; - } - } - - // don't yield the merged text event when receiving audio data - if (text && !content.parts.some((p) => p.text) && !hasInlineData) { - yield this.buildFullTextResponse(text, isThought); - text = ''; - isThought = false; - } - - yield llmResponse; - } - - if (serverContent.inputTranscription) { - if (serverContent.inputTranscription.text) { - this._inputTranscriptionText += - serverContent.inputTranscription.text; - yield { - inputTranscription: { - text: serverContent.inputTranscription.text, - finished: false, - }, - partial: true, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - } - if (serverContent.inputTranscription.finished) { - yield { - inputTranscription: { - text: this._inputTranscriptionText, - finished: true, - }, - partial: false, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - this._inputTranscriptionText = ''; - } - } - - if (serverContent.outputTranscription) { - if (serverContent.outputTranscription.text) { - this._outputTranscriptionText += - serverContent.outputTranscription.text; - yield { - outputTranscription: { - text: serverContent.outputTranscription.text, - finished: false, - }, - partial: true, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - } - if (serverContent.outputTranscription.finished) { - yield { - outputTranscription: { - text: this._outputTranscriptionText, - finished: true, - }, - partial: false, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - this._outputTranscriptionText = ''; - } - } - - if ( - serverContent.interrupted || - serverContent.turnComplete || - serverContent.generationComplete - ) { - if (this._inputTranscriptionText) { - yield { - inputTranscription: { - text: this._inputTranscriptionText, - finished: true, - }, - partial: false, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - this._inputTranscriptionText = ''; - } - if (this._outputTranscriptionText) { - yield { - outputTranscription: { - text: this._outputTranscriptionText, - finished: true, - }, - partial: false, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - this._outputTranscriptionText = ''; - } - } - - if (serverContent.turnComplete) { - let gMetadataToYield = pendingGroundingMetadata; - if (text) { - yield this.buildFullTextResponse(text, isThought, gMetadataToYield); - text = ''; - isThought = false; - gMetadataToYield = undefined; - } - if (toolCallParts.length > 0) { - logger.debug('Returning aggregated toolCallParts'); - yield { - content: {role: 'model', parts: toolCallParts}, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - toolCallParts = []; - } - const finalResponse: LlmResponse = { - turnComplete: true, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - if (serverContent.interrupted !== undefined) { - finalResponse.interrupted = serverContent.interrupted; - } - const finalGrounding = - serverContent.groundingMetadata || gMetadataToYield; - if (finalGrounding !== undefined && finalGrounding !== null) { - finalResponse.groundingMetadata = finalGrounding; - } - yield finalResponse; - break; - } - - if (serverContent.interrupted) { - if (text) { - yield this.buildFullTextResponse(text, isThought); - text = ''; - isThought = false; - } else { - yield { - interrupted: serverContent.interrupted, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - } - } - } - - if (message.toolCall) { - logger.debug('Received tool call:', message.toolCall); - if (text) { - yield this.buildFullTextResponse(text, isThought); - text = ''; - isThought = false; - } - if (message.toolCall.functionCalls) { - toolCallParts.push( - ...message.toolCall.functionCalls.map((fc) => ({ - functionCall: fc, - })), - ); - } - - const isGemini3x = isGemini3xFlashLive(this.modelVersion); - if (isGemini3x && toolCallParts.length > 0) { - logger.debug( - 'Yielding toolCallParts immediately for Gemini 3.x live tool call', - ); - yield { - content: {role: 'model', parts: toolCallParts}, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; - toolCallParts = []; - } - } - - if (message.sessionResumptionUpdate) { - logger.debug('Received session resumption message:', message); - yield { - liveSessionResumptionUpdate: message.sessionResumptionUpdate, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; + for (const response of aggregator.processMessage(message)) { + yield response; } - if (message.goAway) { - logger.debug('Received GoAway message:', message.goAway); - yield { - goAway: message.goAway, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; + if (aggregator.isDone) { + break; } } - if (toolCallParts.length > 0) { - logger.debug('Exited loop with pending toolCallParts'); - yield { - content: {role: 'model', parts: toolCallParts}, - ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), - }; + for (const response of aggregator.close()) { + yield response; } } diff --git a/core/src/models/google_llm.ts b/core/src/models/google_llm.ts index 3590bf02..2117fc25 100644 --- a/core/src/models/google_llm.ts +++ b/core/src/models/google_llm.ts @@ -291,15 +291,12 @@ export class Gemini extends BaseLlm { config: llmRequest.liveConnectConfig, callbacks: { onmessage: (message) => { - console.log('E2E Debug: onmessage', JSON.stringify(message)); messageQueue.push(message); }, onerror: (error) => { - console.error('E2E Debug: onerror', error); messageQueue.error(error); }, onclose: () => { - console.log('E2E Debug: onclose'); messageQueue.close(); }, }, diff --git a/core/src/utils/live_connection_utils.ts b/core/src/utils/live_connection_utils.ts new file mode 100644 index 00000000..07ce6cb4 --- /dev/null +++ b/core/src/utils/live_connection_utils.ts @@ -0,0 +1,312 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + Content, + GroundingMetadata, + LiveServerMessage, + Part, +} from '@google/genai'; +import {LlmResponse} from '../models/llm_response.js'; +import {isGemini3xFlashLive} from './model_name.js'; + +/** + * Aggregator and mapper for Gemini Live WebSocket server messages. + * + * Translates incoming raw WebSocket server messages (push stream) into unified + * agent-consumable LlmResponse objects (pull stream), managing transcription buffers, + * grounding metadata, text segments, and tool calls. + */ +export class LiveResponseAggregator { + private text = ''; + private isThought = false; + private toolCallParts: Part[] = []; + private pendingGroundingMetadata: GroundingMetadata | undefined = undefined; + private inputTranscriptionText = ''; + private outputTranscriptionText = ''; + + public isDone = false; + + constructor(private readonly modelVersion?: string) {} + + *processMessage( + message: LiveServerMessage, + ): Generator { + if (message.usageMetadata) { + yield { + usageMetadata: message.usageMetadata, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + + if (message.serverContent) { + const serverContent = message.serverContent; + const content = serverContent.modelTurn; + + if (serverContent.groundingMetadata) { + this.pendingGroundingMetadata = serverContent.groundingMetadata; + } + + // Standalone groundingMetadata event (when content is empty) + if ( + !(content && content.parts) && + serverContent.groundingMetadata && + !serverContent.turnComplete + ) { + yield { + groundingMetadata: serverContent.groundingMetadata, + ...(serverContent.interrupted !== undefined + ? {interrupted: serverContent.interrupted} + : {}), + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + + if (content && content.parts) { + const llmResponse: LlmResponse = { + content: content as Content, + ...(serverContent.interrupted !== undefined + ? {interrupted: serverContent.interrupted} + : {}), + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + + if (!serverContent.turnComplete && serverContent.groundingMetadata) { + llmResponse.groundingMetadata = serverContent.groundingMetadata; + } + + const hasInlineData = content.parts.some((p) => p.inlineData); + for (const part of content.parts) { + if (part.text) { + const currentIsThought = !!part.thought; + if (this.text && currentIsThought !== this.isThought) { + yield this.buildFullTextResponse(this.text, this.isThought); + this.text = ''; + this.isThought = false; + } + this.text += part.text; + this.isThought = currentIsThought; + llmResponse.partial = true; + } + } + + // don't yield the merged text event when receiving audio data + if (this.text && !content.parts.some((p) => p.text) && !hasInlineData) { + yield this.buildFullTextResponse(this.text, this.isThought); + this.text = ''; + this.isThought = false; + } + + yield llmResponse; + } + + if (serverContent.inputTranscription) { + if (serverContent.inputTranscription.text) { + this.inputTranscriptionText += serverContent.inputTranscription.text; + yield { + inputTranscription: { + text: serverContent.inputTranscription.text, + finished: false, + }, + partial: true, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + if (serverContent.inputTranscription.finished) { + yield { + inputTranscription: { + text: this.inputTranscriptionText, + finished: true, + }, + partial: false, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this.inputTranscriptionText = ''; + } + } + + if (serverContent.outputTranscription) { + if (serverContent.outputTranscription.text) { + this.outputTranscriptionText += + serverContent.outputTranscription.text; + yield { + outputTranscription: { + text: serverContent.outputTranscription.text, + finished: false, + }, + partial: true, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + if (serverContent.outputTranscription.finished) { + yield { + outputTranscription: { + text: this.outputTranscriptionText, + finished: true, + }, + partial: false, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this.outputTranscriptionText = ''; + } + } + + if ( + serverContent.interrupted || + serverContent.turnComplete || + serverContent.generationComplete + ) { + if (this.inputTranscriptionText) { + yield { + inputTranscription: { + text: this.inputTranscriptionText, + finished: true, + }, + partial: false, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this.inputTranscriptionText = ''; + } + if (this.outputTranscriptionText) { + yield { + outputTranscription: { + text: this.outputTranscriptionText, + finished: true, + }, + partial: false, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this.outputTranscriptionText = ''; + } + } + + if (serverContent.turnComplete) { + let gMetadataToYield = this.pendingGroundingMetadata; + if (this.text) { + yield this.buildFullTextResponse( + this.text, + this.isThought, + gMetadataToYield, + ); + this.text = ''; + this.isThought = false; + gMetadataToYield = undefined; + } + if (this.toolCallParts.length > 0) { + yield { + content: {role: 'model', parts: this.toolCallParts}, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this.toolCallParts = []; + } + const finalResponse: LlmResponse = { + turnComplete: true, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + if (serverContent.interrupted !== undefined) { + finalResponse.interrupted = serverContent.interrupted; + } + const finalGrounding = + serverContent.groundingMetadata || gMetadataToYield; + if (finalGrounding !== undefined && finalGrounding !== null) { + finalResponse.groundingMetadata = finalGrounding; + } + yield finalResponse; + this.isDone = true; + return; + } + + if (serverContent.interrupted) { + if (this.text) { + yield this.buildFullTextResponse(this.text, this.isThought); + this.text = ''; + this.isThought = false; + } else { + yield { + interrupted: serverContent.interrupted, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + } + } + + if (message.toolCall) { + if (this.text) { + yield this.buildFullTextResponse(this.text, this.isThought); + this.text = ''; + this.isThought = false; + } + if (message.toolCall.functionCalls) { + this.toolCallParts.push( + ...message.toolCall.functionCalls.map((fc) => ({ + functionCall: fc, + })), + ); + } + + const isGemini3x = isGemini3xFlashLive(this.modelVersion); + if (isGemini3x && this.toolCallParts.length > 0) { + yield { + content: {role: 'model', parts: this.toolCallParts}, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this.toolCallParts = []; + } + } + + if (message.sessionResumptionUpdate) { + yield { + liveSessionResumptionUpdate: message.sessionResumptionUpdate, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + + if (message.goAway) { + yield { + goAway: message.goAway, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + } + } + + /** + * Flushes any remaining aggregated components when the connection is closed. + */ + *close(): Generator { + if (this.toolCallParts.length > 0) { + yield { + content: {role: 'model', parts: this.toolCallParts}, + ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), + }; + this.toolCallParts = []; + } + } + + private buildFullTextResponse( + text: string, + isThought: boolean, + groundingMetadata?: GroundingMetadata, + ): LlmResponse { + const part: Part = {text}; + if (isThought) { + part.thought = true; + } + const response: LlmResponse = { + content: { + role: 'model', + parts: [part], + }, + partial: false, + }; + if (groundingMetadata !== undefined && groundingMetadata !== null) { + response.groundingMetadata = groundingMetadata; + } + if (this.modelVersion) { + response.modelVersion = this.modelVersion; + } + return response; + } +} diff --git a/core/test/utils/live_connection_utils_test.ts b/core/test/utils/live_connection_utils_test.ts new file mode 100644 index 00000000..f18790fd --- /dev/null +++ b/core/test/utils/live_connection_utils_test.ts @@ -0,0 +1,350 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import {GroundingMetadata, LiveServerGoAway} from '@google/genai'; +import {describe, expect, it} from 'vitest'; +import {LiveResponseAggregator} from '../../src/utils/live_connection_utils.js'; + +describe('LiveResponseAggregator', () => { + it('should yield usage metadata', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + const usageMetadata = { + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }; + + const generator = aggregator.processMessage({usageMetadata}); + const results = Array.from(generator); + + expect(results).toEqual([ + { + usageMetadata, + modelVersion: 'gemini-2.5-flash', + }, + ]); + expect(aggregator.isDone).toBe(false); + }); + + it('should stream text and yield full response on turnComplete', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + + // Message 1: partial text + const gen1 = aggregator.processMessage({ + serverContent: { + modelTurn: { + parts: [{text: 'Hello'}], + }, + }, + }); + const res1 = Array.from(gen1); + expect(res1).toEqual([ + { + content: {parts: [{text: 'Hello'}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + }, + ]); + + // Message 2: partial text and turnComplete + const gen2 = aggregator.processMessage({ + serverContent: { + modelTurn: { + parts: [{text: ' world!'}], + }, + turnComplete: true, + interrupted: false, + groundingMetadata: {groundingChunks: []} as GroundingMetadata, + }, + }); + const res2 = Array.from(gen2); + expect(res2).toEqual([ + { + content: {parts: [{text: ' world!'}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + interrupted: false, + }, + { + content: { + role: 'model', + parts: [{text: 'Hello world!'}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + groundingMetadata: {groundingChunks: []}, + }, + { + turnComplete: true, + modelVersion: 'gemini-2.5-flash', + interrupted: false, + groundingMetadata: {groundingChunks: []}, + }, + ]); + expect(aggregator.isDone).toBe(true); + }); + + it('should flush text when transitioning between thought and non-thought', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + + // Message 1: thought + const res1 = Array.from( + aggregator.processMessage({ + serverContent: { + modelTurn: { + parts: [{text: 'Thinking...', thought: true}], + }, + }, + }), + ); + expect(res1).toEqual([ + { + content: {parts: [{text: 'Thinking...', thought: true}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + }, + ]); + + // Message 2: transition to text + const res2 = Array.from( + aggregator.processMessage({ + serverContent: { + modelTurn: { + parts: [{text: 'Answer is 42.'}], + }, + }, + }), + ); + expect(res2).toEqual([ + { + content: { + role: 'model', + parts: [{text: 'Thinking...', thought: true}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + }, + { + content: {parts: [{text: 'Answer is 42.'}]}, + modelVersion: 'gemini-2.5-flash', + partial: true, + }, + ]); + + // Message 3: turn complete + const res3 = Array.from( + aggregator.processMessage({ + serverContent: { + turnComplete: true, + }, + }), + ); + expect(res3).toEqual([ + { + content: { + role: 'model', + parts: [{text: 'Answer is 42.'}], + }, + partial: false, + modelVersion: 'gemini-2.5-flash', + }, + { + turnComplete: true, + modelVersion: 'gemini-2.5-flash', + }, + ]); + }); + + it('should handle input transcription partial and finished', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + + const res1 = Array.from( + aggregator.processMessage({ + serverContent: { + inputTranscription: {text: 'hello', finished: false}, + }, + }), + ); + expect(res1).toEqual([ + { + inputTranscription: {text: 'hello', finished: false}, + partial: true, + modelVersion: 'gemini-2.5-flash', + }, + ]); + + const res2 = Array.from( + aggregator.processMessage({ + serverContent: { + inputTranscription: {text: ' world', finished: true}, + }, + }), + ); + expect(res2).toEqual([ + { + inputTranscription: {text: ' world', finished: false}, + partial: true, + modelVersion: 'gemini-2.5-flash', + }, + { + inputTranscription: {text: 'hello world', finished: true}, + partial: false, + modelVersion: 'gemini-2.5-flash', + }, + ]); + }); + + it('should flush pending transcription on interrupted', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + + const res1 = Array.from( + aggregator.processMessage({ + serverContent: { + inputTranscription: {text: 'hello', finished: false}, + }, + }), + ); + expect(res1[0].inputTranscription).toEqual({ + text: 'hello', + finished: false, + }); + + const res2 = Array.from( + aggregator.processMessage({ + serverContent: { + interrupted: true, + }, + }), + ); + expect(res2).toEqual([ + { + inputTranscription: {text: 'hello', finished: true}, + partial: false, + modelVersion: 'gemini-2.5-flash', + }, + { + interrupted: true, + modelVersion: 'gemini-2.5-flash', + }, + ]); + }); + + it('should yield groundingMetadata on partial response', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + + const res1 = Array.from( + aggregator.processMessage({ + serverContent: { + modelTurn: { + parts: [{text: 'Partial text'}], + }, + groundingMetadata: { + groundingChunks: [ + {web: {uri: 'https://google.com', title: 'Google'}}, + ], + } as GroundingMetadata, + }, + }), + ); + expect(res1).toEqual([ + { + content: {parts: [{text: 'Partial text'}]}, + partial: true, + modelVersion: 'gemini-2.5-flash', + groundingMetadata: { + groundingChunks: [ + {web: {uri: 'https://google.com', title: 'Google'}}, + ], + }, + }, + ]); + }); + + it('should buffer tool calls and yield at turnComplete for non-Gemini 3.x', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + + const res1 = Array.from( + aggregator.processMessage({ + toolCall: { + functionCalls: [{name: 'tool_a', args: {x: 1}, id: '1'}], + }, + }), + ); + expect(res1).toEqual([]); // Buffered + + const res2 = Array.from( + aggregator.processMessage({ + serverContent: { + turnComplete: true, + }, + }), + ); + expect(res2).toEqual([ + { + content: { + role: 'model', + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + modelVersion: 'gemini-2.5-flash', + }, + { + turnComplete: true, + modelVersion: 'gemini-2.5-flash', + }, + ]); + }); + + it('should yield tool calls immediately for Gemini 3.x', () => { + const aggregator = new LiveResponseAggregator('gemini-3.1-flash-live'); + + const res1 = Array.from( + aggregator.processMessage({ + toolCall: { + functionCalls: [{name: 'tool_a', args: {x: 1}, id: '1'}], + }, + }), + ); + expect(res1).toEqual([ + { + content: { + role: 'model', + parts: [{functionCall: {name: 'tool_a', args: {x: 1}, id: '1'}}], + }, + modelVersion: 'gemini-3.1-flash-live', + }, + ]); + }); + + it('should yield session resumption update', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + const resumptionUpdate = {resumed: true}; + + const res = Array.from( + aggregator.processMessage({sessionResumptionUpdate: resumptionUpdate}), + ); + expect(res).toEqual([ + { + liveSessionResumptionUpdate: resumptionUpdate, + modelVersion: 'gemini-2.5-flash', + }, + ]); + }); + + it('should yield go away', () => { + const aggregator = new LiveResponseAggregator('gemini-2.5-flash'); + const goAway = {goAway: true}; + + const res = Array.from( + aggregator.processMessage({goAway: goAway as LiveServerGoAway}), + ); + expect(res).toEqual([ + { + goAway, + modelVersion: 'gemini-2.5-flash', + }, + ]); + }); +}); From a05b3899d183738b2e9d2d7dd36c9459071aaaf7 Mon Sep 17 00:00:00 2001 From: Amaad Martin Date: Wed, 10 Jun 2026 14:09:09 -0700 Subject: [PATCH 6/7] feat: live model and connection layer support for Gemini 2.5 and 3.1 with E2E tests --- core/src/agents/base_agent.ts | 29 ++- core/src/agents/invocation_context.ts | 11 -- core/src/agents/llm_agent.ts | 14 ++ core/src/models/base_llm_connection.ts | 12 ++ core/src/models/gemini_llm_connection.ts | 18 +- core/src/models/google_llm.ts | 2 +- core/src/models/llm_response.ts | 6 + core/src/utils/live_connection_utils.ts | 4 - .../test/models/gemini_llm_connection_test.ts | 31 ++++ core/test/models/google_llm_test.ts | 4 +- core/test/utils/live_connection_utils_test.ts | 2 - tests/e2e/live_model_test.ts | 169 +++++++++++++++++- 12 files changed, 270 insertions(+), 32 deletions(-) diff --git a/core/src/agents/base_agent.ts b/core/src/agents/base_agent.ts index 70ee2cf8..c76c5579 100644 --- a/core/src/agents/base_agent.ts +++ b/core/src/agents/base_agent.ts @@ -217,7 +217,7 @@ export abstract class BaseAgent { * @returns An AsyncGenerator that yields the events generated by the agent. */ async *runLive( - parentContext: InvocationContext, // eslint-disable-line @typescript-eslint/no-unused-vars + parentContext: InvocationContext, ): AsyncGenerator { const span = tracer.startSpan(`invoke_agent ${this.name}`); const ctx = trace.setSpan(context.active(), span); @@ -226,10 +226,33 @@ export abstract class BaseAgent { ctx, this, async function* () { - // TODO(b/425992518): Implement live mode. + const context = this.createInvocationContext(parentContext); + + const beforeAgentCallbackEvent = + await this.handleBeforeAgentCallback(context); + if (beforeAgentCallbackEvent) { + yield beforeAgentCallbackEvent; + } + + if (context.endInvocation || parentContext.abortSignal?.aborted) { + return; + } + + for await (const event of this.runLiveImpl(context)) { + yield event; + } + + if (context.endInvocation || parentContext.abortSignal?.aborted) { + return; + } + + const afterAgentCallbackEvent = + await this.handleAfterAgentCallback(context); + if (afterAgentCallbackEvent) { + yield afterAgentCallbackEvent; + } }, ); - throw new Error('Live mode is not implemented yet.'); } finally { span.end(); } diff --git a/core/src/agents/invocation_context.ts b/core/src/agents/invocation_context.ts index c80a8116..7d236140 100644 --- a/core/src/agents/invocation_context.ts +++ b/core/src/agents/invocation_context.ts @@ -16,7 +16,6 @@ import {randomUUID} from '../utils/env_aware_utils.js'; import {ActiveStreamingTool} from './active_streaming_tool.js'; import {BaseAgent} from './base_agent.js'; -import {LiveRequestQueue} from './live_request_queue.js'; import {RunConfig} from './run_config.js'; import {TranscriptionEntry} from './transcription_entry.js'; @@ -36,7 +35,6 @@ export interface InvocationContextParams { endInvocation?: boolean; transcriptionCache?: TranscriptionEntry[]; runConfig?: RunConfig; - liveRequestQueue?: LiveRequestQueue; activeStreamingTools?: Record; pluginManager: PluginManager; abortSignal?: AbortSignal; @@ -171,11 +169,6 @@ export class InvocationContext { */ private readonly invocationCostManager = new InvocationCostManager(); - /** - * The queue to receive live requests. - */ - liveRequestQueue?: LiveRequestQueue; - /** * The running streaming tools of this invocation. */ @@ -186,9 +179,6 @@ export class InvocationContext { */ pluginManager: PluginManager; - /** - * The abort signal for the invocation. - */ readonly abortSignal?: AbortSignal; /** @@ -206,7 +196,6 @@ export class InvocationContext { this.endInvocation = params.endInvocation || false; this.transcriptionCache = params.transcriptionCache; this.runConfig = params.runConfig; - this.liveRequestQueue = params.liveRequestQueue; this.activeStreamingTools = params.activeStreamingTools; this.pluginManager = params.pluginManager; this.abortSignal = params.abortSignal; diff --git a/core/src/agents/llm_agent.ts b/core/src/agents/llm_agent.ts index 30aba287..c5a42f8b 100644 --- a/core/src/agents/llm_agent.ts +++ b/core/src/agents/llm_agent.ts @@ -707,6 +707,20 @@ export class LlmAgent extends BaseAgent { // -------------------------------------------------------------------------- // #START LlmFlow Logic // -------------------------------------------------------------------------- + /** + * Runs the bidirectional (live) flow for this agent. + * + * Establishes a live connection to the model, drains the invocation's + * `liveRequestQueue` into the connection on a parallel task, and yields + * events derived from server messages until the queue closes, the model + * finishes, or an agent transfer occurs. + * + * If the live connection drops (network failure, server `goAway`) and a + * session resumption handle has been observed, the flow transparently + * reconnects using that handle up to {@link MAX_LIVE_RECONNECT_ATTEMPTS} + * times. Subsequent reconnects skip `sendHistory` because the server + * already holds the conversation state associated with the handle. + */ // eslint-disable-next-line require-yield private async *runLiveFlow( _invocationContext: InvocationContext, diff --git a/core/src/models/base_llm_connection.ts b/core/src/models/base_llm_connection.ts index f984ffda..8677eb1e 100644 --- a/core/src/models/base_llm_connection.ts +++ b/core/src/models/base_llm_connection.ts @@ -44,6 +44,18 @@ export interface BaseLlmConnection { */ sendRealtime(blob: Blob): Promise; + /** + * Optionally signals the start of user activity (e.g. user begins speaking) + * for models that support manual activity boundaries. + */ + sendActivityStart?(): Promise; + + /** + * Optionally signals the end of user activity (e.g. user finishes speaking) + * for models that support manual activity boundaries. + */ + sendActivityEnd?(): Promise; + /** * Receives the model response using the llm server connection. * diff --git a/core/src/models/gemini_llm_connection.ts b/core/src/models/gemini_llm_connection.ts index c294dfaf..4b49740e 100644 --- a/core/src/models/gemini_llm_connection.ts +++ b/core/src/models/gemini_llm_connection.ts @@ -118,6 +118,20 @@ export class GeminiLlmConnection implements BaseLlmConnection { } } + /** + * Sends an activity start signal to the model. + */ + async sendActivityStart(): Promise { + this.geminiSession.sendRealtimeInput({activityStart: {}}); + } + + /** + * Sends an activity end signal to the model. + */ + async sendActivityEnd(): Promise { + this.geminiSession.sendRealtimeInput({activityEnd: {}}); + } + /** * Builds a full text response. * @@ -142,10 +156,6 @@ export class GeminiLlmConnection implements BaseLlmConnection { for (const response of aggregator.processMessage(message)) { yield response; } - - if (aggregator.isDone) { - break; - } } for (const response of aggregator.close()) { diff --git a/core/src/models/google_llm.ts b/core/src/models/google_llm.ts index 2117fc25..603834cf 100644 --- a/core/src/models/google_llm.ts +++ b/core/src/models/google_llm.ts @@ -237,7 +237,7 @@ export class Gemini extends BaseLlm { this._liveApiClient = new GoogleGenAI({ vertexai: this.vertexai, project: this.project, - location: 'global', + location: this.location || 'global', httpOptions: this.getLiveHttpOptions(), }); } else { diff --git a/core/src/models/llm_response.ts b/core/src/models/llm_response.ts index c5059842..18980dc4 100644 --- a/core/src/models/llm_response.ts +++ b/core/src/models/llm_response.ts @@ -86,6 +86,12 @@ export interface LlmResponse { */ liveSessionResumptionUpdate?: LiveServerSessionResumptionUpdate; + /** + * Server-side signal that the live connection will be closed soon. The + * caller should reconnect using the latest session resumption handle. + */ + goAway?: LiveServerGoAway; + /** * Audio transcription of user input. */ diff --git a/core/src/utils/live_connection_utils.ts b/core/src/utils/live_connection_utils.ts index 07ce6cb4..08d8b068 100644 --- a/core/src/utils/live_connection_utils.ts +++ b/core/src/utils/live_connection_utils.ts @@ -28,8 +28,6 @@ export class LiveResponseAggregator { private inputTranscriptionText = ''; private outputTranscriptionText = ''; - public isDone = false; - constructor(private readonly modelVersion?: string) {} *processMessage( @@ -215,8 +213,6 @@ export class LiveResponseAggregator { finalResponse.groundingMetadata = finalGrounding; } yield finalResponse; - this.isDone = true; - return; } if (serverContent.interrupted) { diff --git a/core/test/models/gemini_llm_connection_test.ts b/core/test/models/gemini_llm_connection_test.ts index 34c0dfa9..34ab7902 100644 --- a/core/test/models/gemini_llm_connection_test.ts +++ b/core/test/models/gemini_llm_connection_test.ts @@ -216,6 +216,32 @@ describe('GeminiLlmConnection', () => { }); }); + describe('sendActivityStart', () => { + it('should send activityStart client message', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + await connection.sendActivityStart(); + expect(mockSession.sendRealtimeInput).toHaveBeenCalledWith({ + activityStart: {}, + }); + }); + }); + + describe('sendActivityEnd', () => { + it('should send activityEnd client message', async () => { + const connection = new GeminiLlmConnection( + mockSession, + 'gemini-2.5-flash', + ); + await connection.sendActivityEnd(); + expect(mockSession.sendRealtimeInput).toHaveBeenCalledWith({ + activityEnd: {}, + }); + }); + }); + describe('close', () => { it('should close the session', async () => { const connection = new GeminiLlmConnection( @@ -328,6 +354,7 @@ describe('GeminiLlmConnection', () => { groundingMetadata: {groundingChunks: []}, }); + messageQueue.close(); expect((await generator.next()).done).toBe(true); }); @@ -402,6 +429,9 @@ describe('GeminiLlmConnection', () => { turnComplete: true, modelVersion: 'gemini-2.5-flash', }); + + messageQueue.close(); + expect((await generator.next()).done).toBe(true); }); it('should handle input transcription partial and finished', async () => { @@ -661,6 +691,7 @@ describe('GeminiLlmConnection', () => { modelVersion: 'gemini-2.5-flash', }); + messageQueue.close(); expect((await generator.next()).done).toBe(true); }); diff --git a/core/test/models/google_llm_test.ts b/core/test/models/google_llm_test.ts index aec59641..1d745c40 100644 --- a/core/test/models/google_llm_test.ts +++ b/core/test/models/google_llm_test.ts @@ -116,7 +116,7 @@ describe('GoogleLlm', () => { expect(liveOptions.apiVersion).toBeDefined(); }); - it('should initialize liveApiClient with global location for Vertex AI', () => { + it('should respect configured location for Vertex AI liveApiClient', () => { const llm = new TestGemini({ model: 'projects/p/locations/us-central1/models/gemini-2.5-flash', vertexai: true, @@ -134,7 +134,7 @@ describe('GoogleLlm', () => { expect.objectContaining({ vertexai: true, project: 'p', - location: 'global', + location: 'us-central1', }), ); }); diff --git a/core/test/utils/live_connection_utils_test.ts b/core/test/utils/live_connection_utils_test.ts index f18790fd..b4518b06 100644 --- a/core/test/utils/live_connection_utils_test.ts +++ b/core/test/utils/live_connection_utils_test.ts @@ -26,7 +26,6 @@ describe('LiveResponseAggregator', () => { modelVersion: 'gemini-2.5-flash', }, ]); - expect(aggregator.isDone).toBe(false); }); it('should stream text and yield full response on turnComplete', () => { @@ -84,7 +83,6 @@ describe('LiveResponseAggregator', () => { groundingMetadata: {groundingChunks: []}, }, ]); - expect(aggregator.isDone).toBe(true); }); it('should flush text when transitioning between thought and non-thought', () => { diff --git a/tests/e2e/live_model_test.ts b/tests/e2e/live_model_test.ts index f9bbab25..9b36f151 100644 --- a/tests/e2e/live_model_test.ts +++ b/tests/e2e/live_model_test.ts @@ -5,6 +5,7 @@ */ import {Gemini, LlmRequest} from '@google/adk'; +import {Modality} from '@google/genai'; import {describe, expect, it} from 'vitest'; const isCI = process.env.CI === 'true'; @@ -21,16 +22,17 @@ describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { `Connecting to Live API using project: ${project}, location: ${location}`, ); const llm = new Gemini({ - model: 'gemini-2.5-flash', + model: 'gemini-live-2.5-flash-native-audio', vertexai: true, project, location, }); const request: LlmRequest = { - model: 'gemini-2.5-flash', + model: 'gemini-live-2.5-flash-native-audio', liveConnectConfig: { - responseModalities: ['text'], + responseModalities: [Modality.AUDIO], + outputAudioTranscription: {}, }, config: { systemInstruction: @@ -53,7 +55,7 @@ describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { let accumulatedText = ''; let gotTurnComplete = false; - for (let i = 0; i < 20; i++) { + while (true) { const next = await generator.next(); if (next.done) { break; @@ -68,6 +70,163 @@ describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { } } } + if (response.outputTranscription?.text) { + accumulatedText += response.outputTranscription.text; + } + if (response.turnComplete) { + gotTurnComplete = true; + break; + } + } + + console.log('Accumulated text response:', accumulatedText); + expect(accumulatedText.length).toBeGreaterThan(0); + expect(accumulatedText.toLowerCase()).toMatch(/4|four/); + expect(gotTurnComplete).toBe(true); + + await connection.close(); + }, 30000); + + it('should support multi-turn conversations over the same live connection', async () => { + console.log( + `Connecting to Live API for multi-turn test using project: ${project}, location: ${location}`, + ); + const llm = new Gemini({ + model: 'gemini-live-2.5-flash-native-audio', + vertexai: true, + project, + location, + }); + + const request: LlmRequest = { + model: 'gemini-live-2.5-flash-native-audio', + liveConnectConfig: { + responseModalities: [Modality.AUDIO], + outputAudioTranscription: {}, + }, + config: { + systemInstruction: 'You are a helpful assistant.', + }, + toolsDict: {}, + }; + + const connection = await llm.connect(request); + expect(connection).toBeDefined(); + + const generator = connection.receive(); + + // Turn 1: Tell model our name + console.log('Sending Turn 1: My name is Alice'); + await connection.sendContent({ + parts: [{text: 'Hello Gemini! My name is Alice.'}], + }); + + // Consume until turnComplete + while (true) { + const next = await generator.next(); + if (next.done) { + break; + } + const response = next.value; + if (response.turnComplete) { + break; + } + } + + // Turn 2: Ask model what our name is + console.log('Sending Turn 2: What is my name?'); + await connection.sendContent({ + parts: [{text: 'Can you tell me what my name is?'}], + }); + + let accumulatedText = ''; + let gotTurnComplete = false; + while (true) { + const next = await generator.next(); + if (next.done) { + break; + } + const response = next.value; + if (response.content?.parts) { + for (const part of response.content.parts) { + if (part.text) { + accumulatedText += part.text; + } + } + } + if (response.outputTranscription?.text) { + accumulatedText += response.outputTranscription.text; + } + if (response.turnComplete) { + gotTurnComplete = true; + break; + } + } + + console.log('Multi-turn response:', accumulatedText); + expect(accumulatedText.toLowerCase()).toContain('alice'); + expect(gotTurnComplete).toBe(true); + + await connection.close(); + }, 45000); + + // Note: Gemini 3.1 Live is currently in private preview and requires explicit project allowlisting on Vertex AI (yields 1008 Policy Violation otherwise). + it.skip('should connect and stream responses from Gemini 3.1 Live using Vertex AI', async () => { + console.log( + `Connecting to Live API using project: ${project}, location: ${location}`, + ); + const llm = new Gemini({ + model: 'gemini-3.1-flash-live-preview-04-2026', + vertexai: true, + project, + location, + }); + + const request: LlmRequest = { + model: 'gemini-3.1-flash-live-preview-04-2026', + liveConnectConfig: { + responseModalities: [Modality.AUDIO], + outputAudioTranscription: {}, + }, + config: { + systemInstruction: + 'You are a helpful assistant. Answer concisely in one sentence.', + }, + toolsDict: {}, + }; + + const connection = await llm.connect(request); + expect(connection).toBeDefined(); + + const generator = connection.receive(); + + // Send a message to the live model + await connection.sendContent({ + parts: [{text: 'Hello Gemini 3.1! What is 2 + 2?'}], + }); + + // Consume events + let accumulatedText = ''; + let gotTurnComplete = false; + + while (true) { + const next = await generator.next(); + if (next.done) { + break; + } + const response = next.value; + console.log('Received response event:', JSON.stringify(response)); + + if (response.content?.parts) { + for (const part of response.content.parts) { + if (part.text) { + accumulatedText += part.text; + } + } + } + if (response.outputTranscription?.text) { + accumulatedText += response.outputTranscription.text; + } if (response.turnComplete) { gotTurnComplete = true; break; @@ -76,7 +235,7 @@ describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { console.log('Accumulated text response:', accumulatedText); expect(accumulatedText.length).toBeGreaterThan(0); - expect(accumulatedText).toContain('4'); + expect(accumulatedText.toLowerCase()).toMatch(/4|four/); expect(gotTurnComplete).toBe(true); await connection.close(); From 11106e644018f71fe6b6574c09fd2ef80c953df4 Mon Sep 17 00:00:00 2001 From: Amaad Martin Date: Wed, 10 Jun 2026 14:23:18 -0700 Subject: [PATCH 7/7] fix: resolve duplicate goAway property and add missing contents property to E2E requests --- core/src/models/llm_response.ts | 3 --- tests/e2e/live_model_test.ts | 3 +++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/models/llm_response.ts b/core/src/models/llm_response.ts index 18980dc4..715fb1b1 100644 --- a/core/src/models/llm_response.ts +++ b/core/src/models/llm_response.ts @@ -107,9 +107,6 @@ export interface LlmResponse { /** The session ID of the Live session. */ liveSessionId?: string; - - /** The GoAway signal from the Live model. */ - goAway?: LiveServerGoAway; } /** diff --git a/tests/e2e/live_model_test.ts b/tests/e2e/live_model_test.ts index 9b36f151..9dbd30b1 100644 --- a/tests/e2e/live_model_test.ts +++ b/tests/e2e/live_model_test.ts @@ -30,6 +30,7 @@ describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { const request: LlmRequest = { model: 'gemini-live-2.5-flash-native-audio', + contents: [], liveConnectConfig: { responseModalities: [Modality.AUDIO], outputAudioTranscription: {}, @@ -100,6 +101,7 @@ describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { const request: LlmRequest = { model: 'gemini-live-2.5-flash-native-audio', + contents: [], liveConnectConfig: { responseModalities: [Modality.AUDIO], outputAudioTranscription: {}, @@ -184,6 +186,7 @@ describe.skipIf(isCI)('Live Gemini Live Connection E2E', () => { const request: LlmRequest = { model: 'gemini-3.1-flash-live-preview-04-2026', + contents: [], liveConnectConfig: { responseModalities: [Modality.AUDIO], outputAudioTranscription: {},