From edec6594f04e5ea8bf1bb9cd26b3854b662ba7c7 Mon Sep 17 00:00:00 2001 From: njbrake Date: Wed, 10 Jun 2026 08:45:58 +0000 Subject: [PATCH] fix: cover the trim-to-compaction amnesia window with a continuity note trigger_compaction_for_dropped advances the trim watermark synchronously, so dropped rows vanish from LLM context on the very next message, but the compaction LLM call that extracts their facts into MEMORY.md is async and may still be in flight (or may have failed). In that window the agent has amnesia for the dropped range, immediately after the trim, when the dropped content is most likely still topical. The deterministic trim summary the trim turn saw was never persisted, so it existed for one turn only. While the user has a recent (under 60 minutes) 'pending' compaction event, a terse deterministic summary of the covered rows is rebuilt from the durable message rows (same summarize_dropped_messages shape) and injected as a dynamic system-prompt section. Once the event flips to 'completed', the note disappears and MEMORY.md carries the facts: the context now contains either the compacted facts or the note, never neither. Design choices: the summary is recomputed per turn instead of persisted (inputs are durable, the summarizer is cheap and deterministic, no migration needed); the note rides the dynamic half of the prompt so it never busts the history cache; the 60-minute window keeps a permanently stuck event from pinning a stale note forever (that case belongs to the retry sweep, issue #1431). Watermark semantics are untouched. Fixes #1432 Co-Authored-By: Claude Opus 4.8 --- backend/app/agent/compaction_note.py | 160 +++++++++++++++++++++++++++ backend/app/agent/system_prompt.py | 18 +++ tests/test_compaction_note.py | 135 ++++++++++++++++++++++ 3 files changed, 313 insertions(+) create mode 100644 backend/app/agent/compaction_note.py create mode 100644 tests/test_compaction_note.py diff --git a/backend/app/agent/compaction_note.py b/backend/app/agent/compaction_note.py new file mode 100644 index 00000000..474287a3 --- /dev/null +++ b/backend/app/agent/compaction_note.py @@ -0,0 +1,160 @@ +"""Continuity note for the window between trim and compaction completion. + +``trigger_compaction_for_dropped`` advances the trim watermark +synchronously, so the dropped rows vanish from LLM context on the very +next message, but the compaction LLM call that extracts their facts into +MEMORY.md runs asynchronously and may still be in flight (or may have +failed). In that window the agent has amnesia for the dropped range, at +the worst possible moment: immediately after the trim, when the dropped +content is most likely still topical (issue #1432). + +This module covers the gap without touching the watermark semantics: +while the user has a *recent* ``'pending'`` compaction event, a terse +deterministic summary of the covered rows (the same +``summarize_dropped_messages`` shape the trim turn itself saw) is +rebuilt from the durable message rows and injected as a dynamic +system-prompt section. Once the event flips to ``'completed'``, the note +disappears and MEMORY.md / HISTORY.md carry the facts. + +The summary is recomputed per turn rather than persisted: the inputs are +durable (event seq range + message rows are never deleted), the +summarizer is deterministic and cheap (no LLM), and recomputing avoids a +schema migration. The common case (no pending events) costs one indexed +SELECT per turn. + +The note is bounded in time: only events triggered in the last +``_NOTE_WINDOW_MINUTES`` qualify, so an event that stays ``'pending'`` +forever (crashed compaction, retries exhausted) does not pin a stale +note into every prompt indefinitely. Permanent failures are the retry +sweep's domain (issue #1431), not this note's. +""" + +from __future__ import annotations + +import datetime +import logging + +from sqlalchemy import Select, select + +from backend.app.agent.messages import AgentMessage, AssistantMessage, UserMessage +from backend.app.agent.trimming import summarize_dropped_messages +from backend.app.database import AsyncSessionLocal +from backend.app.enums import MessageDirection +from backend.app.models import ChatSession, CompactionEvent, Message + +logger = logging.getLogger(__name__) + +# Only events this recent produce a note. The gap being covered is +# seconds-to-minutes (the async LLM call); anything older is either +# completed (no note needed) or stuck (the retry sweep's problem). +_NOTE_WINDOW_MINUTES = 60 + +# Hard cap on rows loaded per note rebuild, summed across the user's +# qualifying events. The summarizer reads first lines only and its +# output is capped at 500 chars, so deep ranges add nothing. +_MAX_NOTE_ROWS = 400 + + +def _recent_pending_events_select( + user_id: str, + cutoff_utc: datetime.datetime, +) -> Select[tuple[CompactionEvent]]: + """Pending events recent enough to still be 'in flight' for this user.""" + return ( + select(CompactionEvent) + .where( + CompactionEvent.user_id == user_id, + CompactionEvent.status == "pending", + CompactionEvent.triggered_at >= cutoff_utc, + CompactionEvent.min_message_seq.is_not(None), + CompactionEvent.max_message_seq.is_not(None), + ) + .order_by(CompactionEvent.min_message_seq.asc()) + ) + + +def _rows_to_agent_messages(rows: list[Message]) -> list[AgentMessage]: + """Minimal Message-row conversion for the deterministic summarizer. + + Deliberately simpler than ``context._stored_messages_to_agent_messages`` + (which this module cannot import without a cycle through + ``system_prompt``): no tool expansion, no approval-prompt filtering. + The summarizer only reads first lines of user/assistant content, so + the simplified shape produces the same topics line. + """ + out: list[AgentMessage] = [] + for msg in rows: + if msg.direction == MessageDirection.INBOUND: + content = msg.processed_context or msg.body or "" + if content.strip(): + out.append(UserMessage(content=content)) + else: + content = msg.llm_reply_text or msg.body or "" + if content.strip(): + out.append(AssistantMessage(content=content)) + return out + + +async def build_pending_compaction_note(user_id: str) -> str: + """Summary of rows covered by the user's in-flight compaction events. + + Returns ``""`` when the user has no recent pending events (the common + case, one indexed SELECT). Failures are swallowed: a broken note must + never block the message turn it decorates. + """ + try: + cutoff = datetime.datetime.now(datetime.UTC) - datetime.timedelta( + minutes=_NOTE_WINDOW_MINUTES + ) + db = AsyncSessionLocal() + try: + events = list( + (await db.execute(_recent_pending_events_select(user_id, cutoff))).scalars().all() + ) + if not events: + return "" + + cs = ( + await db.execute(select(ChatSession).filter_by(user_id=user_id)) + ).scalar_one_or_none() + if cs is None: + return "" + + rows: list[Message] = [] + budget = _MAX_NOTE_ROWS + for event in events: + if budget <= 0: + break + event_rows = list( + ( + await db.execute( + select(Message) + .where( + Message.session_id == cs.id, + Message.seq >= event.min_message_seq, + Message.seq <= event.max_message_seq, + ) + .order_by(Message.seq) + .limit(budget) + ) + ) + .scalars() + .all() + ) + rows.extend(event_rows) + budget -= len(event_rows) + finally: + await db.close() + + agent_messages = _rows_to_agent_messages(rows) + if not agent_messages: + return "" + summary = summarize_dropped_messages(agent_messages) + return ( + "These messages were just archived from your context; their durable " + "facts are being written to your memory right now and will appear " + "in the Your Memory section shortly.\n" + summary + ) + except Exception: + logger.exception("Failed to build pending-compaction note for user %s", user_id) + return "" diff --git a/backend/app/agent/system_prompt.py b/backend/app/agent/system_prompt.py index 11b5306b..00077045 100644 --- a/backend/app/agent/system_prompt.py +++ b/backend/app/agent/system_prompt.py @@ -11,6 +11,7 @@ import logging import zoneinfo +from backend.app.agent.compaction_note import build_pending_compaction_note from backend.app.agent.markdown_registry import truncate_for_injection from backend.app.agent.memory_db import build_memory_context from backend.app.agent.prompts import load_prompt @@ -390,6 +391,23 @@ async def _build_agent_prompt_builder( memory = await build_memory_section(user.id, query=message_context) builder.add_section("Your Memory", memory, dynamic=True) + # Continuity cover for the trim-to-compaction window (issue #1432): + # rows dropped by trim vanish from history on the very next message + # (the watermark advances synchronously), but the compaction LLM call + # that moves their facts into MEMORY.md is async and may still be in + # flight. While a recent compaction event is pending, inject a terse + # deterministic summary of the covered rows so the agent's context + # contains either the compacted facts or this note, never neither. + # Dynamic: it appears for a few turns at most and rides the uncached + # current-turn slot, so it never busts the history cache. + pending_note = await build_pending_compaction_note(user.id) + if pending_note: + builder.add_section( + "Recently Archived Conversation (memory update in progress)", + pending_note, + dynamic=True, + ) + if current_session_id: cross = await build_cross_session_context(user.id, current_session_id) if cross: diff --git a/tests/test_compaction_note.py b/tests/test_compaction_note.py new file mode 100644 index 00000000..7c102f63 --- /dev/null +++ b/tests/test_compaction_note.py @@ -0,0 +1,135 @@ +"""Tests for the trim-to-compaction continuity note (issue #1432). + +The trim watermark advances synchronously, but the compaction LLM call +that extracts the dropped rows' facts is async. While a recent +``'pending'`` compaction event exists, ``build_pending_compaction_note`` +rebuilds a deterministic summary of the covered rows so the agent's +context contains either the compacted facts or the note, never neither. +""" + +from __future__ import annotations + +import datetime + +import pytest + +from backend.app.agent.compaction_note import build_pending_compaction_note +from backend.app.agent.file_store import UserData +from backend.app.database import db_session_async +from backend.app.enums import MessageDirection +from backend.app.models import ChatSession, CompactionEvent, Message + + +async def _seed_session_with_messages(user_id: str, message_count: int) -> ChatSession: + """Insert a ChatSession with alternating inbound/outbound messages.""" + async with db_session_async() as db: + cs = ChatSession( + session_id=f"session-{user_id}", + user_id=user_id, + channel="webchat", + initial_system_prompt="", + ) + db.add(cs) + await db.flush() + for i in range(1, message_count + 1): + db.add( + Message( + session_id=cs.id, + seq=i, + direction=( + MessageDirection.INBOUND if i % 2 == 1 else MessageDirection.OUTBOUND + ), + body=f"deck quote topic {i}", + processed_context="", + tool_interactions_json="", + external_message_id="", + media_urls_json="[]", + ) + ) + await db.commit() + await db.refresh(cs) + db.expunge(cs) + return cs + + +async def _insert_event( + user_id: str, + min_seq: int, + max_seq: int, + status: str = "pending", + age_minutes: int = 1, +) -> int: + triggered_at = datetime.datetime.now(datetime.UTC) - datetime.timedelta(minutes=age_minutes) + async with db_session_async() as db: + event = CompactionEvent( + user_id=user_id, + triggered_at=triggered_at, + status=status, + min_message_seq=min_seq, + max_message_seq=max_seq, + trimmed_count=max_seq - min_seq + 1, + ) + db.add(event) + await db.commit() + assert event.id is not None + return event.id + + +@pytest.mark.asyncio() +async def test_no_note_without_events(test_user: UserData) -> None: + await _seed_session_with_messages(test_user.id, message_count=6) + assert await build_pending_compaction_note(test_user.id) == "" + + +@pytest.mark.asyncio() +async def test_note_while_event_pending(test_user: UserData) -> None: + """A recent pending event produces a summary of the covered rows.""" + await _seed_session_with_messages(test_user.id, message_count=6) + await _insert_event(test_user.id, min_seq=1, max_seq=4) + + note = await build_pending_compaction_note(test_user.id) + assert note != "" + # The summarizer surfaces user topics from the covered range. + assert "deck quote topic 1" in note + # And the framing tells the agent the facts are en route to memory. + assert "being written to your memory" in note + + +@pytest.mark.asyncio() +async def test_note_is_deterministic_across_turns(test_user: UserData) -> None: + """The note is byte-identical while the same event stays pending.""" + await _seed_session_with_messages(test_user.id, message_count=6) + await _insert_event(test_user.id, min_seq=1, max_seq=4) + + first = await build_pending_compaction_note(test_user.id) + second = await build_pending_compaction_note(test_user.id) + assert first == second != "" + + +@pytest.mark.asyncio() +async def test_no_note_after_completion(test_user: UserData) -> None: + """Completed events stop producing the note: MEMORY.md has the facts.""" + await _seed_session_with_messages(test_user.id, message_count=6) + await _insert_event(test_user.id, min_seq=1, max_seq=4, status="completed") + + assert await build_pending_compaction_note(test_user.id) == "" + + +@pytest.mark.asyncio() +async def test_no_note_for_stale_pending_event(test_user: UserData) -> None: + """A long-stuck pending event must not pin a stale note forever.""" + await _seed_session_with_messages(test_user.id, message_count=6) + await _insert_event(test_user.id, min_seq=1, max_seq=4, age_minutes=120) + + assert await build_pending_compaction_note(test_user.id) == "" + + +@pytest.mark.asyncio() +async def test_note_covers_multiple_pending_events(test_user: UserData) -> None: + await _seed_session_with_messages(test_user.id, message_count=10) + await _insert_event(test_user.id, min_seq=1, max_seq=3) + await _insert_event(test_user.id, min_seq=4, max_seq=6) + + note = await build_pending_compaction_note(test_user.id) + assert "deck quote topic 1" in note + assert "deck quote topic 5" in note