feat(core): Support Gemini 2.5 and 3.x Live Models in ADK JS#409
feat(core): Support Gemini 2.5 and 3.x Live Models in ADK JS#409AmaadMartin wants to merge 8 commits into
Conversation
…name connection helpers
| /** | ||
| * A generic, async-safe queue that implements AsyncIterable. | ||
| */ | ||
| export class AsyncQueue<T> implements AsyncIterable<T> { |
There was a problem hiding this comment.
What about /adk-js/core/src/agents/live_request_queue.ts ?
There was a problem hiding this comment.
The AsyncQueue<T> implemented here is a general utility to bridge a push-based stream (callbacks, like the WebSocket onmessage event) to a pull-based JS AsyncIterable<T> (so the caller can consume it as an async generator).
In contrast, live_request_queue.ts is a specialized request queuing mechanism designed to schedule/serialize outgoing request calls made by the agent runner. They work in opposite directions (one handles incoming streaming messages, the other manages outgoing worker calls).
| // TODO - b/425992518: GenAI SDK inconsistent API, missing methods. | ||
| onmessage: () => {}, | ||
| onmessage: (message) => { | ||
| console.log('E2E Debug: onmessage', JSON.stringify(message)); |
There was a problem hiding this comment.
Done. Removed the debug logs.
| messageQueue.push(message); | ||
| }, | ||
| onerror: (error) => { | ||
| console.error('E2E Debug: onerror', error); |
There was a problem hiding this comment.
Done. Removed the debug logs and forwarded the error directly to the queue.
| messageQueue.error(error); | ||
| }, | ||
| onclose: () => { | ||
| console.log('E2E Debug: onclose'); |
There was a problem hiding this comment.
Done. Removed the debug logs.
| logger.debug('Got LLM Live message:', message); | ||
|
|
||
| if (message.usageMetadata) { | ||
| yield { | ||
| usageMetadata: message.usageMetadata, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| } | ||
|
|
||
| if (message.serverContent) { | ||
| const serverContent = message.serverContent; | ||
| const content = serverContent.modelTurn; | ||
|
|
||
| if (serverContent.groundingMetadata) { | ||
| pendingGroundingMetadata = serverContent.groundingMetadata; | ||
| } | ||
|
|
||
| // Standalone groundingMetadata event (when content is empty) | ||
| if ( | ||
| !(content && content.parts) && | ||
| serverContent.groundingMetadata && | ||
| !serverContent.turnComplete | ||
| ) { | ||
| yield { | ||
| groundingMetadata: serverContent.groundingMetadata, | ||
| ...(serverContent.interrupted !== undefined | ||
| ? {interrupted: serverContent.interrupted} | ||
| : {}), | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| } | ||
|
|
||
| if (content && content.parts) { | ||
| const llmResponse: LlmResponse = { | ||
| content: content as Content, | ||
| ...(serverContent.interrupted !== undefined | ||
| ? {interrupted: serverContent.interrupted} | ||
| : {}), | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
|
|
||
| if (!serverContent.turnComplete && serverContent.groundingMetadata) { | ||
| llmResponse.groundingMetadata = serverContent.groundingMetadata; | ||
| } | ||
|
|
||
| const hasInlineData = content.parts.some((p) => p.inlineData); | ||
| for (const part of content.parts) { | ||
| if (part.text) { | ||
| const currentIsThought = !!part.thought; | ||
| if (text && currentIsThought !== isThought) { | ||
| yield this.buildFullTextResponse(text, isThought); | ||
| text = ''; | ||
| isThought = false; | ||
| } | ||
| text += part.text; | ||
| isThought = currentIsThought; | ||
| llmResponse.partial = true; | ||
| } | ||
| } | ||
|
|
||
| // don't yield the merged text event when receiving audio data | ||
| if (text && !content.parts.some((p) => p.text) && !hasInlineData) { | ||
| yield this.buildFullTextResponse(text, isThought); | ||
| text = ''; | ||
| isThought = false; | ||
| } | ||
|
|
||
| yield llmResponse; | ||
| } | ||
|
|
||
| if (serverContent.inputTranscription) { | ||
| if (serverContent.inputTranscription.text) { | ||
| this._inputTranscriptionText += | ||
| serverContent.inputTranscription.text; | ||
| yield { | ||
| inputTranscription: { | ||
| text: serverContent.inputTranscription.text, | ||
| finished: false, | ||
| }, | ||
| partial: true, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| } | ||
| if (serverContent.inputTranscription.finished) { | ||
| yield { | ||
| inputTranscription: { | ||
| text: this._inputTranscriptionText, | ||
| finished: true, | ||
| }, | ||
| partial: false, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| this._inputTranscriptionText = ''; | ||
| } | ||
| } | ||
|
|
||
| if (serverContent.outputTranscription) { | ||
| if (serverContent.outputTranscription.text) { | ||
| this._outputTranscriptionText += | ||
| serverContent.outputTranscription.text; | ||
| yield { | ||
| outputTranscription: { | ||
| text: serverContent.outputTranscription.text, | ||
| finished: false, | ||
| }, | ||
| partial: true, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| } | ||
| if (serverContent.outputTranscription.finished) { | ||
| yield { | ||
| outputTranscription: { | ||
| text: this._outputTranscriptionText, | ||
| finished: true, | ||
| }, | ||
| partial: false, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| this._outputTranscriptionText = ''; | ||
| } | ||
| } | ||
|
|
||
| if ( | ||
| serverContent.interrupted || | ||
| serverContent.turnComplete || | ||
| serverContent.generationComplete | ||
| ) { | ||
| if (this._inputTranscriptionText) { | ||
| yield { | ||
| inputTranscription: { | ||
| text: this._inputTranscriptionText, | ||
| finished: true, | ||
| }, | ||
| partial: false, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| this._inputTranscriptionText = ''; | ||
| } | ||
| if (this._outputTranscriptionText) { | ||
| yield { | ||
| outputTranscription: { | ||
| text: this._outputTranscriptionText, | ||
| finished: true, | ||
| }, | ||
| partial: false, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| this._outputTranscriptionText = ''; | ||
| } | ||
| } | ||
|
|
||
| if (serverContent.turnComplete) { | ||
| let gMetadataToYield = pendingGroundingMetadata; | ||
| if (text) { | ||
| yield this.buildFullTextResponse(text, isThought, gMetadataToYield); | ||
| text = ''; | ||
| isThought = false; | ||
| gMetadataToYield = undefined; | ||
| } | ||
| if (toolCallParts.length > 0) { | ||
| logger.debug('Returning aggregated toolCallParts'); | ||
| yield { | ||
| content: {role: 'model', parts: toolCallParts}, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| toolCallParts = []; | ||
| } | ||
| const finalResponse: LlmResponse = { | ||
| turnComplete: true, | ||
| ...(this.modelVersion ? {modelVersion: this.modelVersion} : {}), | ||
| }; | ||
| if (serverContent.interrupted !== undefined) { | ||
| finalResponse.interrupted = serverContent.interrupted; | ||
| } | ||
| const finalGrounding = | ||
| serverContent.groundingMetadata || gMetadataToYield; | ||
| if (finalGrounding !== undefined && finalGrounding !== null) { | ||
| finalResponse.groundingMetadata = finalGrounding; | ||
| } | ||
| yield finalResponse; | ||
| break; | ||
| } | ||
|
|
||
| if (serverContent.interrupted) { | ||
| if (text) { | ||
| yield this.buildFullTextResponse(text, isThought); | ||
| text = ''; | ||
| isThought = false; | ||
| } else { | ||
| yield { | ||
| interrupted: serverContent.interrupted, |
There was a problem hiding this comment.
please move all of that message trasformation logic to separate util functions + tests. Thanks
There was a problem hiding this comment.
is the same manner as core/src/utils/streaming_utils.ts
There was a problem hiding this comment.
Done! Extracted the entire message transformation and accumulation logic into a new utility class LiveResponseAggregator at core/src/utils/live_connection_utils.ts.
Also added comprehensive unit tests for all branches and states in core/test/utils/live_connection_utils_test.ts to ensure 100% coverage, making GeminiLlmConnection a clean wrapper that delegates the transformation state.
| const isGemini3x = isGemini3xFlashLive(this.modelVersion); | ||
| if (isGemini3x && content.parts.length === 1 && content.parts[0].text) { | ||
| logger.debug('Using sendRealtimeInput for Gemini 3.x text input'); | ||
| this.geminiSession.sendRealtimeInput({text: content.parts[0].text}); |
There was a problem hiding this comment.
does this works only fro lite model? not flash not pro?
There was a problem hiding this comment.
Yes, the Live API (bidirectional realtime WebSocket interface) is only supported on low-latency Flash/Lite live variants (like gemini-3.1-flash-live-preview). There are no Pro models supported by the Live API gateway currently. This helper targets model names matching the gemini-3.x-flash-live naming structure.
…sponseAggregator utility and add tests
|
@AmaadMartin thanks a lot for this. May I ask you if this is only first part of addressing #362 or complete implementation? Because what seems to be missing, is:
Please, correct me if I am wrong, but after this merges, the only way to use the new code is to call
Correct me if I am wrong, but for deployments with data-residency requirements (EU regions), routing live audio through global is not acceptable as an unconditional override. Could this fall back to 'global' only when no location is configured, or be opt-in?
|
|
Thanks for the feedback! We have updated the PR to address these points directly:
|
…rty to E2E requests
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
Closes: #362
Related: #362
2. Or, if no issue exists, describe the change:
If applicable, please follow the issue templates to provide as much detail as possible.
Problem: The JS ADK lacked native support for Gemini 2.5 and 3.x Live Models (specifically the bidirectional real-time WebSocket API), resulting in feature parity gaps with the Python ADK.
Solution:
AsyncQueueutility class to cleanly serialize and bridge incoming WebSocket events to async iterators.GeminiLlmConnection.receive()to cleanly map and translate Gemini Live WebSocket events (usageMetadata,serverContentcontaining model text/thought/audio/transcriptions,toolCall,sessionResumptionUpdate,goAway).google_llm.ts'sliveApiClientinitializer to override the location to'global'when utilizing Vertex AI, ensuring successful connections without immediately hanging or dropping sockets.tests/e2e/live_model_test.tsconnecting to real Vertex AI Live endpoints.Testing Plan
Please describe the tests that you ran to verify your changes. This is required for all PRs that are not small documentation or typo fixes.
Unit Tests:
Manual End-to-End (E2E) Tests:
Please provide instructions on how to manually test your changes, including any necessary setup or configuration.
gemini-2.5-flash-preview-native-audio.Checklist