Skip to content
29 changes: 26 additions & 3 deletions core/src/agents/base_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ export abstract class BaseAgent {
* @returns An AsyncGenerator that yields the events generated by the agent.
*/
async *runLive(
parentContext: InvocationContext, // eslint-disable-line @typescript-eslint/no-unused-vars
parentContext: InvocationContext,
): AsyncGenerator<Event, void, void> {
const span = tracer.startSpan(`invoke_agent ${this.name}`);
const ctx = trace.setSpan(context.active(), span);
Expand All @@ -226,10 +226,33 @@ export abstract class BaseAgent {
ctx,
this,
async function* () {
// TODO(b/425992518): Implement live mode.
const context = this.createInvocationContext(parentContext);

const beforeAgentCallbackEvent =
await this.handleBeforeAgentCallback(context);
if (beforeAgentCallbackEvent) {
yield beforeAgentCallbackEvent;
}

if (context.endInvocation || parentContext.abortSignal?.aborted) {
return;
}

for await (const event of this.runLiveImpl(context)) {
yield event;
}

if (context.endInvocation || parentContext.abortSignal?.aborted) {
return;
}

const afterAgentCallbackEvent =
await this.handleAfterAgentCallback(context);
if (afterAgentCallbackEvent) {
yield afterAgentCallbackEvent;
}
},
);
throw new Error('Live mode is not implemented yet.');
} finally {
span.end();
}
Expand Down
11 changes: 0 additions & 11 deletions core/src/agents/invocation_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {randomUUID} from '../utils/env_aware_utils.js';

import {ActiveStreamingTool} from './active_streaming_tool.js';
import {BaseAgent} from './base_agent.js';
import {LiveRequestQueue} from './live_request_queue.js';
import {RunConfig} from './run_config.js';
import {TranscriptionEntry} from './transcription_entry.js';

Expand All @@ -36,7 +35,6 @@ export interface InvocationContextParams {
endInvocation?: boolean;
transcriptionCache?: TranscriptionEntry[];
runConfig?: RunConfig;
liveRequestQueue?: LiveRequestQueue;
activeStreamingTools?: Record<string, ActiveStreamingTool>;
pluginManager: PluginManager;
abortSignal?: AbortSignal;
Expand Down Expand Up @@ -171,11 +169,6 @@ export class InvocationContext {
*/
private readonly invocationCostManager = new InvocationCostManager();

/**
* The queue to receive live requests.
*/
liveRequestQueue?: LiveRequestQueue;

/**
* The running streaming tools of this invocation.
*/
Expand All @@ -186,9 +179,6 @@ export class InvocationContext {
*/
pluginManager: PluginManager;

/**
* The abort signal for the invocation.
*/
readonly abortSignal?: AbortSignal;

/**
Expand All @@ -206,7 +196,6 @@ export class InvocationContext {
this.endInvocation = params.endInvocation || false;
this.transcriptionCache = params.transcriptionCache;
this.runConfig = params.runConfig;
this.liveRequestQueue = params.liveRequestQueue;
this.activeStreamingTools = params.activeStreamingTools;
this.pluginManager = params.pluginManager;
this.abortSignal = params.abortSignal;
Expand Down
14 changes: 14 additions & 0 deletions core/src/agents/llm_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,20 @@ export class LlmAgent extends BaseAgent {
// --------------------------------------------------------------------------
// #START LlmFlow Logic
// --------------------------------------------------------------------------
/**
* Runs the bidirectional (live) flow for this agent.
*
* Establishes a live connection to the model, drains the invocation's
* `liveRequestQueue` into the connection on a parallel task, and yields
* events derived from server messages until the queue closes, the model
* finishes, or an agent transfer occurs.
*
* If the live connection drops (network failure, server `goAway`) and a
* session resumption handle has been observed, the flow transparently
* reconnects using that handle up to {@link MAX_LIVE_RECONNECT_ATTEMPTS}
* times. Subsequent reconnects skip `sendHistory` because the server
* already holds the conversation state associated with the handle.
*/
// eslint-disable-next-line require-yield
private async *runLiveFlow(
_invocationContext: InvocationContext,
Expand Down
2 changes: 1 addition & 1 deletion core/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export type {
export {VertexRagRetrievalTool} from './tools/vertex_rag_retrieval_tool.js';
export {LogLevel, getLogger, setLogLevel, setLogger} from './utils/logger.js';
export type {Logger} from './utils/logger.js';
export {isGemini2OrAbove} from './utils/model_name.js';
export {isGemini2OrAbove, isGemini3xFlashLive} from './utils/model_name.js';
export {zodObjectToSchema} from './utils/simple_zod_to_json.js';
export {GoogleLLMVariant} from './utils/variant_utils.js';
export {version} from './version.js';
Expand Down
12 changes: 12 additions & 0 deletions core/src/models/base_llm_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ export interface BaseLlmConnection {
*/
sendRealtime(blob: Blob): Promise<void>;

/**
* Optionally signals the start of user activity (e.g. user begins speaking)
* for models that support manual activity boundaries.
*/
sendActivityStart?(): Promise<void>;

/**
* Optionally signals the end of user activity (e.g. user finishes speaking)
* for models that support manual activity boundaries.
*/
sendActivityEnd?(): Promise<void>;

/**
* Receives the model response using the llm server connection.
*
Expand Down
100 changes: 79 additions & 21 deletions core/src/models/gemini_llm_connection.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
/**
* @license
* Copyright 2025 Google LLC
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/

import {Blob, Content, FunctionResponse, Session} from '@google/genai';
import {
Blob,
Content,
FunctionResponse,
LiveServerMessage,
Session,
} from '@google/genai';

import {LiveResponseAggregator} from '../utils/live_connection_utils.js';
import {logger} from '../utils/logger.js';
import {isGemini3xFlashLive} from '../utils/model_name.js';

import {BaseLlmConnection} from './base_llm_connection.js';
import {LlmResponse} from './llm_response.js';

/** The Gemini model connection. */
export class GeminiLlmConnection implements BaseLlmConnection {
constructor(private readonly geminiSession: Session) {}
constructor(
private readonly geminiSession: Session,
private readonly modelVersion?: string,
private readonly messageQueue?: AsyncIterable<LiveServerMessage>,
) {}

/**
* Sends the conversation history to the gemini model.
Expand All @@ -31,9 +43,12 @@ export class GeminiLlmConnection implements BaseLlmConnection {
);

if (contents.length > 0) {
const isGemini3x = isGemini3xFlashLive(this.modelVersion);
this.geminiSession.sendClientContent({
turns: contents,
turnComplete: contents[contents.length - 1].role === 'user',
turnComplete: isGemini3x
? true
: contents[contents.length - 1].role === 'user',
});
} else {
logger.info('no content is sent');
Expand Down Expand Up @@ -64,10 +79,16 @@ export class GeminiLlmConnection implements BaseLlmConnection {
});
} else {
logger.debug('Sending LLM new content', content);
this.geminiSession.sendClientContent({
turns: [content],
turnComplete: true,
});
const isGemini3x = isGemini3xFlashLive(this.modelVersion);
if (isGemini3x && content.parts.length === 1 && content.parts[0].text) {
logger.debug('Using sendRealtimeInput for Gemini 3.x text input');
this.geminiSession.sendRealtimeInput({text: content.parts[0].text});

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this works only fro lite model? not flash not pro?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the Live API (bidirectional realtime WebSocket interface) is only supported on low-latency Flash/Lite live variants (like gemini-3.1-flash-live-preview). There are no Pro models supported by the Live API gateway currently. This helper targets model names matching the gemini-3.x-flash-live naming structure.

} else {
this.geminiSession.sendClientContent({
turns: [content],
turnComplete: true,
});
}
}
}

Expand All @@ -78,7 +99,37 @@ export class GeminiLlmConnection implements BaseLlmConnection {
*/
async sendRealtime(blob: Blob): Promise<void> {
logger.debug('Sending LLM Blob:', blob);
this.geminiSession.sendRealtimeInput({media: blob});
const isGemini3x = isGemini3xFlashLive(this.modelVersion);
const isNativeAudio = this.modelVersion?.includes('native-audio');

if (isGemini3x || isNativeAudio) {
if (blob.mimeType?.startsWith('audio/')) {
this.geminiSession.sendRealtimeInput({audio: blob});
} else if (blob.mimeType?.startsWith('image/')) {
this.geminiSession.sendRealtimeInput({video: blob});
} else {
logger.warn(
'Blob not sent. Unknown or empty mime type for sendRealtimeInput:',
blob.mimeType,
);
}
} else {
this.geminiSession.sendRealtimeInput({media: blob});
}
}

/**
* Sends an activity start signal to the model.
*/
async sendActivityStart(): Promise<void> {
this.geminiSession.sendRealtimeInput({activityStart: {}});
}

/**
* Sends an activity end signal to the model.
*/
async sendActivityEnd(): Promise<void> {
this.geminiSession.sendRealtimeInput({activityEnd: {}});
}

/**
Expand All @@ -88,21 +139,28 @@ export class GeminiLlmConnection implements BaseLlmConnection {
* partial.
*
* @param text The text to be included in the response.
* @param isThought Whether the text is a thought.
* @param groundingMetadata The grounding metadata to include.
* @returns An LlmResponse containing the full text.
*/
private buildFullTextResponse(text: string): LlmResponse {
return {
content: {
role: 'model',
parts: [{text}],
},
};
}

// TODO(b/425992518): GenAI SDK inconsistent API, missing methods.
// eslint-disable-next-line require-yield
async *receive(): AsyncGenerator<LlmResponse, void, void> {
throw new Error('Not Implemented.');
if (!this.messageQueue) {
throw new Error('Message queue is not initialized.');
}

const aggregator = new LiveResponseAggregator(this.modelVersion);

for await (const message of this.messageQueue) {
logger.debug('Got LLM Live message:', message);

for (const response of aggregator.processMessage(message)) {
yield response;
}
}

for (const response of aggregator.close()) {
yield response;
}
}

/**
Expand Down
37 changes: 29 additions & 8 deletions core/src/models/google_llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import {
FileData,
GoogleGenAI,
HttpOptions,
LiveServerMessage,
} from '@google/genai';

import {getBooleanEnvVar, isBrowser} from '../utils/env_aware_utils.js';
import {logger} from '../utils/logger.js';
import {GoogleLLMVariant} from '../utils/variant_utils.js';

import {AsyncQueue} from '../utils/async_queue.js';
import {StreamingResponseAggregator} from '../utils/streaming_utils.js';
import {BaseLlm} from './base_llm.js';
import {BaseLlmConnection} from './base_llm_connection.js';
Expand Down Expand Up @@ -231,10 +233,19 @@ export class Gemini extends BaseLlm {

get liveApiClient(): GoogleGenAI {
if (!this._liveApiClient) {
this._liveApiClient = new GoogleGenAI({
apiKey: this.apiKey,
httpOptions: this.getLiveHttpOptions(),
});
if (this.vertexai) {
this._liveApiClient = new GoogleGenAI({
vertexai: this.vertexai,
project: this.project,
location: this.location || 'global',
httpOptions: this.getLiveHttpOptions(),
});
} else {
this._liveApiClient = new GoogleGenAI({
apiKey: this.apiKey,
httpOptions: this.getLiveHttpOptions(),
});
}
}
return this._liveApiClient;
}
Expand Down Expand Up @@ -272,15 +283,25 @@ export class Gemini extends BaseLlm {

llmRequest.liveConnectConfig.tools = llmRequest.config?.tools;

const modelVersion = llmRequest.model ?? this.model;
const messageQueue = new AsyncQueue<LiveServerMessage>();

const liveSession = await this.liveApiClient.live.connect({
model: llmRequest.model ?? this.model,
model: modelVersion,
config: llmRequest.liveConnectConfig,
callbacks: {
// TODO - b/425992518: GenAI SDK inconsistent API, missing methods.
onmessage: () => {},
onmessage: (message) => {
messageQueue.push(message);
},
onerror: (error) => {
messageQueue.error(error);
},
onclose: () => {
messageQueue.close();
},
},
});
return new GeminiLlmConnection(liveSession);
return new GeminiLlmConnection(liveSession, modelVersion, messageQueue);
}

private preprocessRequest(llmRequest: LlmRequest): void {
Expand Down
13 changes: 13 additions & 0 deletions core/src/models/llm_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
GenerateContentResponse,
GenerateContentResponseUsageMetadata,
GroundingMetadata,
LiveServerGoAway,
LiveServerSessionResumptionUpdate,
Transcription,
} from '@google/genai';
Expand Down Expand Up @@ -85,6 +86,12 @@ export interface LlmResponse {
*/
liveSessionResumptionUpdate?: LiveServerSessionResumptionUpdate;

/**
* Server-side signal that the live connection will be closed soon. The
* caller should reconnect using the latest session resumption handle.
*/
goAway?: LiveServerGoAway;

/**
* Audio transcription of user input.
*/
Expand All @@ -94,6 +101,12 @@ export interface LlmResponse {
* Audio transcription of model output.
*/
outputTranscription?: Transcription;

/** The model version used to generate the response. */
modelVersion?: string;

/** The session ID of the Live session. */
liveSessionId?: string;
}

/**
Expand Down
Loading
Loading