diff --git a/backend/app/agent/compaction_note.py b/backend/app/agent/compaction_note.py new file mode 100644 index 00000000..474287a3 --- /dev/null +++ b/backend/app/agent/compaction_note.py @@ -0,0 +1,160 @@ +"""Continuity note for the window between trim and compaction completion. + +``trigger_compaction_for_dropped`` advances the trim watermark +synchronously, so the dropped rows vanish from LLM context on the very +next message, but the compaction LLM call that extracts their facts into +MEMORY.md runs asynchronously and may still be in flight (or may have +failed). In that window the agent has amnesia for the dropped range, at +the worst possible moment: immediately after the trim, when the dropped +content is most likely still topical (issue #1432). + +This module covers the gap without touching the watermark semantics: +while the user has a *recent* ``'pending'`` compaction event, a terse +deterministic summary of the covered rows (the same +``summarize_dropped_messages`` shape the trim turn itself saw) is +rebuilt from the durable message rows and injected as a dynamic +system-prompt section. Once the event flips to ``'completed'``, the note +disappears and MEMORY.md / HISTORY.md carry the facts. + +The summary is recomputed per turn rather than persisted: the inputs are +durable (event seq range + message rows are never deleted), the +summarizer is deterministic and cheap (no LLM), and recomputing avoids a +schema migration. The common case (no pending events) costs one indexed +SELECT per turn. + +The note is bounded in time: only events triggered in the last +``_NOTE_WINDOW_MINUTES`` qualify, so an event that stays ``'pending'`` +forever (crashed compaction, retries exhausted) does not pin a stale +note into every prompt indefinitely. Permanent failures are the retry +sweep's domain (issue #1431), not this note's. +""" + +from __future__ import annotations + +import datetime +import logging + +from sqlalchemy import Select, select + +from backend.app.agent.messages import AgentMessage, AssistantMessage, UserMessage +from backend.app.agent.trimming import summarize_dropped_messages +from backend.app.database import AsyncSessionLocal +from backend.app.enums import MessageDirection +from backend.app.models import ChatSession, CompactionEvent, Message + +logger = logging.getLogger(__name__) + +# Only events this recent produce a note. The gap being covered is +# seconds-to-minutes (the async LLM call); anything older is either +# completed (no note needed) or stuck (the retry sweep's problem). +_NOTE_WINDOW_MINUTES = 60 + +# Hard cap on rows loaded per note rebuild, summed across the user's +# qualifying events. The summarizer reads first lines only and its +# output is capped at 500 chars, so deep ranges add nothing. +_MAX_NOTE_ROWS = 400 + + +def _recent_pending_events_select( + user_id: str, + cutoff_utc: datetime.datetime, +) -> Select[tuple[CompactionEvent]]: + """Pending events recent enough to still be 'in flight' for this user.""" + return ( + select(CompactionEvent) + .where( + CompactionEvent.user_id == user_id, + CompactionEvent.status == "pending", + CompactionEvent.triggered_at >= cutoff_utc, + CompactionEvent.min_message_seq.is_not(None), + CompactionEvent.max_message_seq.is_not(None), + ) + .order_by(CompactionEvent.min_message_seq.asc()) + ) + + +def _rows_to_agent_messages(rows: list[Message]) -> list[AgentMessage]: + """Minimal Message-row conversion for the deterministic summarizer. + + Deliberately simpler than ``context._stored_messages_to_agent_messages`` + (which this module cannot import without a cycle through + ``system_prompt``): no tool expansion, no approval-prompt filtering. + The summarizer only reads first lines of user/assistant content, so + the simplified shape produces the same topics line. + """ + out: list[AgentMessage] = [] + for msg in rows: + if msg.direction == MessageDirection.INBOUND: + content = msg.processed_context or msg.body or "" + if content.strip(): + out.append(UserMessage(content=content)) + else: + content = msg.llm_reply_text or msg.body or "" + if content.strip(): + out.append(AssistantMessage(content=content)) + return out + + +async def build_pending_compaction_note(user_id: str) -> str: + """Summary of rows covered by the user's in-flight compaction events. + + Returns ``""`` when the user has no recent pending events (the common + case, one indexed SELECT). Failures are swallowed: a broken note must + never block the message turn it decorates. + """ + try: + cutoff = datetime.datetime.now(datetime.UTC) - datetime.timedelta( + minutes=_NOTE_WINDOW_MINUTES + ) + db = AsyncSessionLocal() + try: + events = list( + (await db.execute(_recent_pending_events_select(user_id, cutoff))).scalars().all() + ) + if not events: + return "" + + cs = ( + await db.execute(select(ChatSession).filter_by(user_id=user_id)) + ).scalar_one_or_none() + if cs is None: + return "" + + rows: list[Message] = [] + budget = _MAX_NOTE_ROWS + for event in events: + if budget <= 0: + break + event_rows = list( + ( + await db.execute( + select(Message) + .where( + Message.session_id == cs.id, + Message.seq >= event.min_message_seq, + Message.seq <= event.max_message_seq, + ) + .order_by(Message.seq) + .limit(budget) + ) + ) + .scalars() + .all() + ) + rows.extend(event_rows) + budget -= len(event_rows) + finally: + await db.close() + + agent_messages = _rows_to_agent_messages(rows) + if not agent_messages: + return "" + summary = summarize_dropped_messages(agent_messages) + return ( + "These messages were just archived from your context; their durable " + "facts are being written to your memory right now and will appear " + "in the Your Memory section shortly.\n" + summary + ) + except Exception: + logger.exception("Failed to build pending-compaction note for user %s", user_id) + return "" diff --git a/backend/app/agent/system_prompt.py b/backend/app/agent/system_prompt.py index 11b5306b..00077045 100644 --- a/backend/app/agent/system_prompt.py +++ b/backend/app/agent/system_prompt.py @@ -11,6 +11,7 @@ import logging import zoneinfo +from backend.app.agent.compaction_note import build_pending_compaction_note from backend.app.agent.markdown_registry import truncate_for_injection from backend.app.agent.memory_db import build_memory_context from backend.app.agent.prompts import load_prompt @@ -390,6 +391,23 @@ async def _build_agent_prompt_builder( memory = await build_memory_section(user.id, query=message_context) builder.add_section("Your Memory", memory, dynamic=True) + # Continuity cover for the trim-to-compaction window (issue #1432): + # rows dropped by trim vanish from history on the very next message + # (the watermark advances synchronously), but the compaction LLM call + # that moves their facts into MEMORY.md is async and may still be in + # flight. While a recent compaction event is pending, inject a terse + # deterministic summary of the covered rows so the agent's context + # contains either the compacted facts or this note, never neither. + # Dynamic: it appears for a few turns at most and rides the uncached + # current-turn slot, so it never busts the history cache. + pending_note = await build_pending_compaction_note(user.id) + if pending_note: + builder.add_section( + "Recently Archived Conversation (memory update in progress)", + pending_note, + dynamic=True, + ) + if current_session_id: cross = await build_cross_session_context(user.id, current_session_id) if cross: diff --git a/tests/test_compaction_note.py b/tests/test_compaction_note.py new file mode 100644 index 00000000..7c102f63 --- /dev/null +++ b/tests/test_compaction_note.py @@ -0,0 +1,135 @@ +"""Tests for the trim-to-compaction continuity note (issue #1432). + +The trim watermark advances synchronously, but the compaction LLM call +that extracts the dropped rows' facts is async. While a recent +``'pending'`` compaction event exists, ``build_pending_compaction_note`` +rebuilds a deterministic summary of the covered rows so the agent's +context contains either the compacted facts or the note, never neither. +""" + +from __future__ import annotations + +import datetime + +import pytest + +from backend.app.agent.compaction_note import build_pending_compaction_note +from backend.app.agent.file_store import UserData +from backend.app.database import db_session_async +from backend.app.enums import MessageDirection +from backend.app.models import ChatSession, CompactionEvent, Message + + +async def _seed_session_with_messages(user_id: str, message_count: int) -> ChatSession: + """Insert a ChatSession with alternating inbound/outbound messages.""" + async with db_session_async() as db: + cs = ChatSession( + session_id=f"session-{user_id}", + user_id=user_id, + channel="webchat", + initial_system_prompt="", + ) + db.add(cs) + await db.flush() + for i in range(1, message_count + 1): + db.add( + Message( + session_id=cs.id, + seq=i, + direction=( + MessageDirection.INBOUND if i % 2 == 1 else MessageDirection.OUTBOUND + ), + body=f"deck quote topic {i}", + processed_context="", + tool_interactions_json="", + external_message_id="", + media_urls_json="[]", + ) + ) + await db.commit() + await db.refresh(cs) + db.expunge(cs) + return cs + + +async def _insert_event( + user_id: str, + min_seq: int, + max_seq: int, + status: str = "pending", + age_minutes: int = 1, +) -> int: + triggered_at = datetime.datetime.now(datetime.UTC) - datetime.timedelta(minutes=age_minutes) + async with db_session_async() as db: + event = CompactionEvent( + user_id=user_id, + triggered_at=triggered_at, + status=status, + min_message_seq=min_seq, + max_message_seq=max_seq, + trimmed_count=max_seq - min_seq + 1, + ) + db.add(event) + await db.commit() + assert event.id is not None + return event.id + + +@pytest.mark.asyncio() +async def test_no_note_without_events(test_user: UserData) -> None: + await _seed_session_with_messages(test_user.id, message_count=6) + assert await build_pending_compaction_note(test_user.id) == "" + + +@pytest.mark.asyncio() +async def test_note_while_event_pending(test_user: UserData) -> None: + """A recent pending event produces a summary of the covered rows.""" + await _seed_session_with_messages(test_user.id, message_count=6) + await _insert_event(test_user.id, min_seq=1, max_seq=4) + + note = await build_pending_compaction_note(test_user.id) + assert note != "" + # The summarizer surfaces user topics from the covered range. + assert "deck quote topic 1" in note + # And the framing tells the agent the facts are en route to memory. + assert "being written to your memory" in note + + +@pytest.mark.asyncio() +async def test_note_is_deterministic_across_turns(test_user: UserData) -> None: + """The note is byte-identical while the same event stays pending.""" + await _seed_session_with_messages(test_user.id, message_count=6) + await _insert_event(test_user.id, min_seq=1, max_seq=4) + + first = await build_pending_compaction_note(test_user.id) + second = await build_pending_compaction_note(test_user.id) + assert first == second != "" + + +@pytest.mark.asyncio() +async def test_no_note_after_completion(test_user: UserData) -> None: + """Completed events stop producing the note: MEMORY.md has the facts.""" + await _seed_session_with_messages(test_user.id, message_count=6) + await _insert_event(test_user.id, min_seq=1, max_seq=4, status="completed") + + assert await build_pending_compaction_note(test_user.id) == "" + + +@pytest.mark.asyncio() +async def test_no_note_for_stale_pending_event(test_user: UserData) -> None: + """A long-stuck pending event must not pin a stale note forever.""" + await _seed_session_with_messages(test_user.id, message_count=6) + await _insert_event(test_user.id, min_seq=1, max_seq=4, age_minutes=120) + + assert await build_pending_compaction_note(test_user.id) == "" + + +@pytest.mark.asyncio() +async def test_note_covers_multiple_pending_events(test_user: UserData) -> None: + await _seed_session_with_messages(test_user.id, message_count=10) + await _insert_event(test_user.id, min_seq=1, max_seq=3) + await _insert_event(test_user.id, min_seq=4, max_seq=6) + + note = await build_pending_compaction_note(test_user.id) + assert "deck quote topic 1" in note + assert "deck quote topic 5" in note