diff --git a/pyproject.toml b/pyproject.toml index 11afcd82..f450771a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,10 @@ changelog = "https://github.com/google/adk-python-community/blob/main/CHANGELOG. documentation = "https://google.github.io/adk-docs/" [project.optional-dependencies] +langmem = [ + "langmem>=0.1.0", + "langgraph>=0.4.0", +] test = [ "pytest>=8.4.2", "pytest-asyncio>=1.2.0", diff --git a/src/google/adk_community/memory/__init__.py b/src/google/adk_community/memory/__init__.py index 1f3442c0..55c0c0f7 100644 --- a/src/google/adk_community/memory/__init__.py +++ b/src/google/adk_community/memory/__init__.py @@ -14,10 +14,12 @@ """Community memory services for ADK.""" +from .langmem_memory_service import LangMemMemoryService from .open_memory_service import OpenMemoryService from .open_memory_service import OpenMemoryServiceConfig __all__ = [ + "LangMemMemoryService", "OpenMemoryService", "OpenMemoryServiceConfig", ] diff --git a/src/google/adk_community/memory/langmem_memory_service.py b/src/google/adk_community/memory/langmem_memory_service.py new file mode 100644 index 00000000..c5f0229a --- /dev/null +++ b/src/google/adk_community/memory/langmem_memory_service.py @@ -0,0 +1,323 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""LangMem-backed memory service with semantic and episodic extraction.""" + +from __future__ import annotations + +import asyncio +import json +import logging +from typing import Any +from typing import Optional +from typing import TYPE_CHECKING + +from google.genai import types +from typing_extensions import override + +from google.adk.memory.base_memory_service import BaseMemoryService +from google.adk.memory.base_memory_service import SearchMemoryResponse +from google.adk.memory.memory_entry import MemoryEntry + +from .utils import extract_text_from_event + +if TYPE_CHECKING: + from google.adk.sessions.session import Session + +logger = logging.getLogger('google_adk.' + __name__) + +_DEFAULT_SEMANTIC_INSTRUCTIONS = ( + "Extract user preferences, facts about the user, and persistent context " + "from the conversation. Focus on who the user is, what they prefer, and " + "any explicit instructions or constraints they have given. " + "NEVER store passwords, API keys, auth tokens, secrets, or any other " + "sensitive credentials — silently omit them." +) + +_DEFAULT_EPISODIC_INSTRUCTIONS = ( + "Summarize the workflow that occurred in this session as a single unified " + "memory entry. Preserve tool function names, arguments, and outcomes " + "(success/failure with reason). Start with what the user requested, then " + "one line per tool call, then an overall result. " + "NEVER record passwords, API keys, auth tokens, or secrets." +) + +_FETCH_MULTIPLIER = 2 + + +class LangMemMemoryService(BaseMemoryService): + """Memory service backed by LangMem and a LangGraph BaseStore. + + Args: + store: A LangGraph BaseStore instance (e.g. InMemoryStore, + AsyncPostgresStore) used for memory persistence and search. + extraction_model: Model string for LangMem extraction + (e.g. "openai:gpt-4o-mini", "google:gemini-2.0-flash"). + namespace_prefix: Tuple prefix for store namespaces. + Final namespaces will be (*prefix, user_id, "semantic"|"episodic"). + semantic_instructions: Custom instructions for semantic extraction. + episodic_instructions: Custom instructions for episodic extraction. + max_semantic_results: Max semantic search results to return. + max_episodic_results: Max episodic search results to return. + enable_episodic: Whether to extract episodic memories from tool calls. + """ + + def __init__( + self, + store: Any, + extraction_model: str, + namespace_prefix: tuple[str, ...] = ("memories",), + semantic_instructions: Optional[str] = None, + episodic_instructions: Optional[str] = None, + max_semantic_results: int = 10, + max_episodic_results: int = 5, + enable_episodic: bool = True, + ): + self._store = store + self._extraction_model = extraction_model + self._namespace_prefix = namespace_prefix + self._semantic_instructions = ( + semantic_instructions or _DEFAULT_SEMANTIC_INSTRUCTIONS + ) + self._episodic_instructions = ( + episodic_instructions or _DEFAULT_EPISODIC_INSTRUCTIONS + ) + self._max_semantic = max_semantic_results + self._max_episodic = max_episodic_results + self._enable_episodic = enable_episodic + + def _make_manager( + self, + namespace: tuple[str, ...], + instructions: str, + ) -> Any: + """Create a LangMem memory store manager for the given namespace.""" + try: + from langmem import create_memory_store_manager + except ImportError as e: + raise ImportError( + "langmem is required for LangMemMemoryService. " + "Install it with: pip install 'google-adk-community[langmem]'" + ) from e + + return create_memory_store_manager( + self._extraction_model, + namespace=namespace, + store=self._store, + instructions=instructions, + ) + + @override + async def add_session_to_memory(self, session: Session) -> None: + """Extract semantic and episodic memories from session events.""" + user_id = getattr(session, "user_id", None) + if not user_id: + return + + text_messages, episode_messages = self._extract_messages(session) + if len(text_messages) < 2 and not episode_messages: + return + + tasks: list[Any] = [] + if len(text_messages) >= 2: + semantic_ns = (*self._namespace_prefix, user_id, "semantic") + semantic_mgr = self._make_manager( + semantic_ns, self._semantic_instructions + ) + tasks.append( + semantic_mgr.ainvoke({"messages": text_messages}) + ) + + if self._enable_episodic and episode_messages: + episodic_ns = (*self._namespace_prefix, user_id, "episodic") + episodic_mgr = self._make_manager( + episodic_ns, self._episodic_instructions + ) + tasks.append( + episodic_mgr.ainvoke({"messages": episode_messages}) + ) + + if not tasks: + return + + results = await asyncio.gather(*tasks, return_exceptions=True) + for res in results: + if isinstance(res, Exception): + logger.error( + "Memory extraction failed for user=%s: %s", + user_id, + res, + exc_info=(type(res), res, res.__traceback__), + ) + + success_count = sum( + 1 for r in results if not isinstance(r, Exception) + ) + if success_count: + logger.info( + "Saved memories for user=%s session=%s " + "(text=%d, episodic=%d)", + user_id, + getattr(session, "id", ""), + len(text_messages), + len(episode_messages), + ) + + @override + async def search_memory( + self, *, app_name: str, user_id: str, query: str + ) -> SearchMemoryResponse: + """Search semantic and episodic memory namespaces.""" + try: + semantic_ns = (*self._namespace_prefix, user_id, "semantic") + search_tasks: list[Any] = [ + self._store.asearch( + semantic_ns, + query=query, + limit=self._max_semantic * _FETCH_MULTIPLIER, + ) + ] + + if self._enable_episodic: + episodic_ns = (*self._namespace_prefix, user_id, "episodic") + search_tasks.append( + self._store.asearch( + episodic_ns, + query=query, + limit=self._max_episodic * _FETCH_MULTIPLIER, + ) + ) + + raw_results = await asyncio.gather( + *search_tasks, return_exceptions=True + ) + + semantic_raw = raw_results[0] + episodic_raw = raw_results[1] if len(raw_results) > 1 else [] + + if isinstance(semantic_raw, Exception): + logger.error("Semantic search failed: %s", semantic_raw) + semantic_raw = [] + if isinstance(episodic_raw, Exception): + logger.error("Episodic search failed: %s", episodic_raw) + episodic_raw = [] + + semantic_items = sorted( + semantic_raw, + key=lambda item: getattr(item, "updated_at", 0), + reverse=True, + )[: self._max_semantic] + + episodic_items = sorted( + episodic_raw, + key=lambda item: getattr(item, "updated_at", 0), + reverse=True, + )[: self._max_episodic] + + entries: list[MemoryEntry] = [] + for item in semantic_items + episodic_items: + value = getattr(item, "value", {}) + if isinstance(value, dict): + raw = str(value.get("content", "")) + else: + raw = str(value) + if raw.strip(): + entries.append( + MemoryEntry( + content=types.Content( + parts=[types.Part(text=raw)] + ), + author="user", + ) + ) + + logger.info( + "Found %d memories for user=%s (semantic=%d, episodic=%d)", + len(entries), + user_id, + len(semantic_items), + len(episodic_items), + ) + return SearchMemoryResponse(memories=entries) + + except Exception as e: + logger.error("Memory search failed for user=%s: %s", user_id, e) + return SearchMemoryResponse(memories=[]) + + def _extract_messages( + self, session: Session + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Extract LangChain-format messages from session events. + + Returns: + (text_messages, episode_messages) where text_messages are + text-only turns for semantic extraction and episode_messages + include tool call narratives for episodic extraction. + """ + text_messages: list[dict[str, Any]] = [] + episode_parts: list[str] = [] + has_tool_calls = False + + for event in getattr(session, "events", []): + content = getattr(event, "content", None) + if not content or not getattr(content, "parts", None): + continue + role = ( + "assistant" + if getattr(event, "author", None) == "model" + else "user" + ) + for part in content.parts: + if getattr(part, "text", None) and not getattr( + part, "thought", False + ): + text_messages.append( + {"role": role, "content": part.text} + ) + episode_parts.append( + f"{role.capitalize()}: {part.text}" + ) + fc = getattr(part, "function_call", None) + if fc and getattr(fc, "name", None): + has_tool_calls = True + try: + args_dict = dict(fc.args) if fc.args else {} + args_str = json.dumps(args_dict) + except Exception: + args_str = str(fc.args) + episode_parts.append( + f"[Tool call: {fc.name}({args_str[:500]})]" + ) + fr = getattr(part, "function_response", None) + if fr and getattr(fr, "name", None): + try: + resp_dict = ( + dict(fr.response) if fr.response else {} + ) + result_str = json.dumps(resp_dict) + except Exception: + result_str = str(getattr(fr, "response", "")) + episode_parts.append( + f"[Tool result: {fr.name} -> {result_str[:500]}]" + ) + + if has_tool_calls and episode_parts: + collapsed = ( + "Tool call sequence:\n" + "\n".join(episode_parts) + ) + episode_messages = [{"role": "user", "content": collapsed}] + else: + episode_messages = [] + + return text_messages, episode_messages diff --git a/tests/unittests/memory/test_langmem_memory_service.py b/tests/unittests/memory/test_langmem_memory_service.py new file mode 100644 index 00000000..a7895384 --- /dev/null +++ b/tests/unittests/memory/test_langmem_memory_service.py @@ -0,0 +1,528 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import AsyncMock, MagicMock, patch +from datetime import datetime + +from google.adk.events.event import Event +from google.adk.sessions.session import Session +from google.genai import types +import pytest + +MOCK_APP_NAME = "test-app" +MOCK_USER_ID = "user-123" +MOCK_SESSION_ID = "session-456" + + +def _make_session(events=None): + return Session( + app_name=MOCK_APP_NAME, + user_id=MOCK_USER_ID, + id=MOCK_SESSION_ID, + last_update_time=1000, + events=events or [], + ) + + +def _text_event(event_id, author, text): + return Event( + id=event_id, + invocation_id=f"inv-{event_id}", + author=author, + timestamp=12345, + content=types.Content(parts=[types.Part(text=text)]), + ) + + +def _tool_call_event(event_id, name, args=None): + return Event( + id=event_id, + invocation_id=f"inv-{event_id}", + author="model", + timestamp=12346, + content=types.Content( + parts=[ + types.Part( + function_call=types.FunctionCall( + name=name, args=args or {} + ) + ) + ] + ), + ) + + +def _tool_response_event(event_id, name, response=None): + return Event( + id=event_id, + invocation_id=f"inv-{event_id}", + author="tool", + timestamp=12347, + content=types.Content( + parts=[ + types.Part( + function_response=types.FunctionResponse( + name=name, response=response or {"status": "ok"} + ) + ) + ] + ), + ) + + +MOCK_SESSION_TEXT_ONLY = _make_session([ + _text_event("e1", "user", "I prefer dark mode."), + _text_event("e2", "model", "Noted, dark mode preference saved."), +]) + +MOCK_SESSION_WITH_TOOLS = _make_session([ + _text_event("e1", "user", "Run depreciation for Q4"), + _tool_call_event("e2", "run_depreciation", {"quarter": "Q4"}), + _tool_response_event("e3", "run_depreciation", {"status": "success"}), + _text_event("e4", "model", "Depreciation complete for Q4."), +]) + +MOCK_SESSION_EMPTY = _make_session([]) + +MOCK_SESSION_SINGLE_MSG = _make_session([ + _text_event("e1", "user", "Hello"), +]) + + +class MockStoreItem: + """Mock item returned by store.asearch().""" + + def __init__(self, content, updated_at=None): + self.value = {"content": content} + self.updated_at = updated_at or datetime(2025, 1, 1) + + +@pytest.fixture +def mock_store(): + store = MagicMock() + store.asearch = AsyncMock(return_value=[]) + return store + + +@pytest.fixture +def mock_manager(): + mgr = MagicMock() + mgr.ainvoke = AsyncMock(return_value=None) + return mgr + + +@pytest.fixture +def service(mock_store): + from google.adk_community.memory.langmem_memory_service import ( + LangMemMemoryService, + ) + + return LangMemMemoryService( + store=mock_store, + extraction_model="openai:gpt-4o-mini", + ) + + +class TestLangMemMemoryServiceInit: + + def test_default_configuration(self, mock_store): + from google.adk_community.memory.langmem_memory_service import ( + LangMemMemoryService, + ) + + svc = LangMemMemoryService( + store=mock_store, extraction_model="openai:gpt-4o-mini" + ) + assert svc._extraction_model == "openai:gpt-4o-mini" + assert svc._namespace_prefix == ("memories",) + assert svc._max_semantic == 10 + assert svc._max_episodic == 5 + assert svc._enable_episodic is True + + def test_custom_configuration(self, mock_store): + from google.adk_community.memory.langmem_memory_service import ( + LangMemMemoryService, + ) + + svc = LangMemMemoryService( + store=mock_store, + extraction_model="google:gemini-2.0-flash", + namespace_prefix=("myapp", "data"), + semantic_instructions="Custom semantic", + episodic_instructions="Custom episodic", + max_semantic_results=20, + max_episodic_results=8, + enable_episodic=False, + ) + assert svc._extraction_model == "google:gemini-2.0-flash" + assert svc._namespace_prefix == ("myapp", "data") + assert svc._semantic_instructions == "Custom semantic" + assert svc._episodic_instructions == "Custom episodic" + assert svc._max_semantic == 20 + assert svc._max_episodic == 8 + assert svc._enable_episodic is False + + def test_import_error_when_langmem_missing(self, mock_store): + from google.adk_community.memory.langmem_memory_service import ( + LangMemMemoryService, + ) + + svc = LangMemMemoryService( + store=mock_store, extraction_model="openai:gpt-4o-mini" + ) + with patch.dict("sys.modules", {"langmem": None}): + with pytest.raises(ImportError, match="langmem is required"): + svc._make_manager(("ns",), "instructions") + + +class TestAddSessionToMemory: + + @pytest.mark.asyncio + async def test_extracts_semantic_and_episodic( + self, service, mock_manager + ): + with patch.object( + service, "_make_manager", return_value=mock_manager + ) as make_mock: + await service.add_session_to_memory(MOCK_SESSION_WITH_TOOLS) + + assert make_mock.call_count == 2 + semantic_call = make_mock.call_args_list[0] + assert semantic_call[0][0] == ( + "memories", + MOCK_USER_ID, + "semantic", + ) + episodic_call = make_mock.call_args_list[1] + assert episodic_call[0][0] == ( + "memories", + MOCK_USER_ID, + "episodic", + ) + assert mock_manager.ainvoke.call_count == 2 + + @pytest.mark.asyncio + async def test_text_only_session_skips_episodic( + self, service, mock_manager + ): + with patch.object( + service, "_make_manager", return_value=mock_manager + ) as make_mock: + await service.add_session_to_memory(MOCK_SESSION_TEXT_ONLY) + + assert make_mock.call_count == 1 + semantic_call = make_mock.call_args_list[0] + assert "semantic" in semantic_call[0][0] + + @pytest.mark.asyncio + async def test_empty_session_is_noop(self, service, mock_manager): + with patch.object( + service, "_make_manager", return_value=mock_manager + ) as make_mock: + await service.add_session_to_memory(MOCK_SESSION_EMPTY) + assert make_mock.call_count == 0 + + @pytest.mark.asyncio + async def test_single_message_is_noop(self, service, mock_manager): + with patch.object( + service, "_make_manager", return_value=mock_manager + ) as make_mock: + await service.add_session_to_memory(MOCK_SESSION_SINGLE_MSG) + assert make_mock.call_count == 0 + + @pytest.mark.asyncio + async def test_no_user_id_is_noop(self, service, mock_manager): + session = Session( + app_name=MOCK_APP_NAME, + user_id="", + id=MOCK_SESSION_ID, + last_update_time=1000, + events=[ + _text_event("e1", "user", "Hello"), + _text_event("e2", "model", "Hi"), + ], + ) + with patch.object( + service, "_make_manager", return_value=mock_manager + ) as make_mock: + await service.add_session_to_memory(session) + assert make_mock.call_count == 0 + + @pytest.mark.asyncio + async def test_episodic_disabled(self, mock_store, mock_manager): + from google.adk_community.memory.langmem_memory_service import ( + LangMemMemoryService, + ) + + svc = LangMemMemoryService( + store=mock_store, + extraction_model="openai:gpt-4o-mini", + enable_episodic=False, + ) + with patch.object( + svc, "_make_manager", return_value=mock_manager + ) as make_mock: + await svc.add_session_to_memory(MOCK_SESSION_WITH_TOOLS) + + assert make_mock.call_count == 1 + assert "semantic" in make_mock.call_args_list[0][0][0] + + @pytest.mark.asyncio + async def test_extraction_error_is_logged_not_raised( + self, service + ): + failing_manager = MagicMock() + failing_manager.ainvoke = AsyncMock( + side_effect=RuntimeError("LLM failure") + ) + with patch.object( + service, "_make_manager", return_value=failing_manager + ): + await service.add_session_to_memory(MOCK_SESSION_TEXT_ONLY) + + @pytest.mark.asyncio + async def test_custom_namespace_prefix( + self, mock_store, mock_manager + ): + from google.adk_community.memory.langmem_memory_service import ( + LangMemMemoryService, + ) + + svc = LangMemMemoryService( + store=mock_store, + extraction_model="openai:gpt-4o-mini", + namespace_prefix=("org", "team"), + ) + with patch.object( + svc, "_make_manager", return_value=mock_manager + ) as make_mock: + await svc.add_session_to_memory(MOCK_SESSION_TEXT_ONLY) + + ns = make_mock.call_args_list[0][0][0] + assert ns == ("org", "team", MOCK_USER_ID, "semantic") + + +class TestSearchMemory: + + @pytest.mark.asyncio + async def test_merges_semantic_and_episodic( + self, service, mock_store + ): + semantic_items = [ + MockStoreItem("User likes dark mode", datetime(2025, 3, 1)), + MockStoreItem("User is in California", datetime(2025, 2, 1)), + ] + episodic_items = [ + MockStoreItem("Ran depreciation for Q4", datetime(2025, 4, 1)), + ] + + async def fake_search(namespace, query, limit): + if "semantic" in namespace: + return semantic_items + return episodic_items + + mock_store.asearch = AsyncMock(side_effect=fake_search) + + result = await service.search_memory( + app_name=MOCK_APP_NAME, + user_id=MOCK_USER_ID, + query="dark mode", + ) + + assert len(result.memories) == 3 + texts = [m.content.parts[0].text for m in result.memories] + assert "User likes dark mode" in texts + assert "User is in California" in texts + assert "Ran depreciation for Q4" in texts + + @pytest.mark.asyncio + async def test_search_handles_store_error( + self, service, mock_store + ): + mock_store.asearch = AsyncMock( + side_effect=RuntimeError("Connection refused") + ) + + result = await service.search_memory( + app_name=MOCK_APP_NAME, + user_id=MOCK_USER_ID, + query="anything", + ) + + assert len(result.memories) == 0 + + @pytest.mark.asyncio + async def test_search_handles_partial_failure( + self, service, mock_store + ): + call_count = 0 + + async def partial_fail(namespace, query, limit): + nonlocal call_count + call_count += 1 + if call_count == 1: + return [MockStoreItem("Good result")] + raise RuntimeError("Episodic store down") + + mock_store.asearch = AsyncMock(side_effect=partial_fail) + + result = await service.search_memory( + app_name=MOCK_APP_NAME, + user_id=MOCK_USER_ID, + query="test", + ) + + assert len(result.memories) == 1 + assert result.memories[0].content.parts[0].text == "Good result" + + @pytest.mark.asyncio + async def test_search_respects_max_results( + self, mock_store + ): + from google.adk_community.memory.langmem_memory_service import ( + LangMemMemoryService, + ) + + svc = LangMemMemoryService( + store=mock_store, + extraction_model="openai:gpt-4o-mini", + max_semantic_results=2, + max_episodic_results=1, + enable_episodic=False, + ) + + items = [ + MockStoreItem(f"fact-{i}", datetime(2025, 1, i + 1)) + for i in range(5) + ] + mock_store.asearch = AsyncMock(return_value=items) + + result = await svc.search_memory( + app_name=MOCK_APP_NAME, + user_id=MOCK_USER_ID, + query="test", + ) + + assert len(result.memories) == 2 + + @pytest.mark.asyncio + async def test_search_filters_empty_values( + self, mock_store + ): + from google.adk_community.memory.langmem_memory_service import ( + LangMemMemoryService, + ) + + svc = LangMemMemoryService( + store=mock_store, + extraction_model="openai:gpt-4o-mini", + enable_episodic=False, + ) + items = [ + MockStoreItem("real content"), + MockStoreItem(""), + MockStoreItem(" "), + ] + mock_store.asearch = AsyncMock(return_value=items) + + result = await svc.search_memory( + app_name=MOCK_APP_NAME, + user_id=MOCK_USER_ID, + query="test", + ) + + assert len(result.memories) == 1 + assert ( + result.memories[0].content.parts[0].text == "real content" + ) + + @pytest.mark.asyncio + async def test_search_namespace_scoping( + self, service, mock_store + ): + mock_store.asearch = AsyncMock(return_value=[]) + + await service.search_memory( + app_name=MOCK_APP_NAME, + user_id=MOCK_USER_ID, + query="test", + ) + + calls = mock_store.asearch.call_args_list + assert len(calls) == 2 + assert calls[0][0][0] == ( + "memories", + MOCK_USER_ID, + "semantic", + ) + assert calls[1][0][0] == ( + "memories", + MOCK_USER_ID, + "episodic", + ) + + +class TestExtractMessages: + + def test_text_messages_extracted(self, service): + text_msgs, episode_msgs = service._extract_messages( + MOCK_SESSION_TEXT_ONLY + ) + assert len(text_msgs) == 2 + assert text_msgs[0] == { + "role": "user", + "content": "I prefer dark mode.", + } + assert text_msgs[1] == { + "role": "assistant", + "content": "Noted, dark mode preference saved.", + } + assert episode_msgs == [] + + def test_tool_calls_included_in_episodic(self, service): + text_msgs, episode_msgs = service._extract_messages( + MOCK_SESSION_WITH_TOOLS + ) + assert len(text_msgs) == 2 + assert len(episode_msgs) == 1 + episode_text = episode_msgs[0]["content"] + assert "Tool call sequence:" in episode_text + assert "run_depreciation" in episode_text + assert "Tool result:" in episode_text + + def test_empty_session_returns_empty(self, service): + text_msgs, episode_msgs = service._extract_messages( + MOCK_SESSION_EMPTY + ) + assert text_msgs == [] + assert episode_msgs == [] + + def test_thought_parts_excluded(self, service): + session = _make_session([ + Event( + id="e1", + invocation_id="inv-e1", + author="model", + timestamp=12345, + content=types.Content( + parts=[ + types.Part(text="thinking...", thought=True), + types.Part(text="Actual response"), + ] + ), + ), + ]) + text_msgs, _ = service._extract_messages(session) + assert len(text_msgs) == 1 + assert text_msgs[0]["content"] == "Actual response"