Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions backend/app/agent/compaction_note.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Continuity note for the window between trim and compaction completion.

``trigger_compaction_for_dropped`` advances the trim watermark
synchronously, so the dropped rows vanish from LLM context on the very
next message, but the compaction LLM call that extracts their facts into
MEMORY.md runs asynchronously and may still be in flight (or may have
failed). In that window the agent has amnesia for the dropped range, at
the worst possible moment: immediately after the trim, when the dropped
content is most likely still topical (issue #1432).

This module covers the gap without touching the watermark semantics:
while the user has a *recent* ``'pending'`` compaction event, a terse
deterministic summary of the covered rows (the same
``summarize_dropped_messages`` shape the trim turn itself saw) is
rebuilt from the durable message rows and injected as a dynamic
system-prompt section. Once the event flips to ``'completed'``, the note
disappears and MEMORY.md / HISTORY.md carry the facts.

The summary is recomputed per turn rather than persisted: the inputs are
durable (event seq range + message rows are never deleted), the
summarizer is deterministic and cheap (no LLM), and recomputing avoids a
schema migration. The common case (no pending events) costs one indexed
SELECT per turn.

The note is bounded in time: only events triggered in the last
``_NOTE_WINDOW_MINUTES`` qualify, so an event that stays ``'pending'``
forever (crashed compaction, retries exhausted) does not pin a stale
note into every prompt indefinitely. Permanent failures are the retry
sweep's domain (issue #1431), not this note's.
"""

from __future__ import annotations

import datetime
import logging

from sqlalchemy import Select, select

from backend.app.agent.messages import AgentMessage, AssistantMessage, UserMessage
from backend.app.agent.trimming import summarize_dropped_messages
from backend.app.database import AsyncSessionLocal
from backend.app.enums import MessageDirection
from backend.app.models import ChatSession, CompactionEvent, Message

logger = logging.getLogger(__name__)

# Only events this recent produce a note. The gap being covered is
# seconds-to-minutes (the async LLM call); anything older is either
# completed (no note needed) or stuck (the retry sweep's problem).
_NOTE_WINDOW_MINUTES = 60

# Hard cap on rows loaded per note rebuild, summed across the user's
# qualifying events. The summarizer reads first lines only and its
# output is capped at 500 chars, so deep ranges add nothing.
_MAX_NOTE_ROWS = 400


def _recent_pending_events_select(
user_id: str,
cutoff_utc: datetime.datetime,
) -> Select[tuple[CompactionEvent]]:
"""Pending events recent enough to still be 'in flight' for this user."""
return (
select(CompactionEvent)
.where(
CompactionEvent.user_id == user_id,
CompactionEvent.status == "pending",
CompactionEvent.triggered_at >= cutoff_utc,
CompactionEvent.min_message_seq.is_not(None),
CompactionEvent.max_message_seq.is_not(None),
)
.order_by(CompactionEvent.min_message_seq.asc())
)


def _rows_to_agent_messages(rows: list[Message]) -> list[AgentMessage]:
"""Minimal Message-row conversion for the deterministic summarizer.

Deliberately simpler than ``context._stored_messages_to_agent_messages``
(which this module cannot import without a cycle through
``system_prompt``): no tool expansion, no approval-prompt filtering.
The summarizer only reads first lines of user/assistant content, so
the simplified shape produces the same topics line.
"""
out: list[AgentMessage] = []
for msg in rows:
if msg.direction == MessageDirection.INBOUND:
content = msg.processed_context or msg.body or ""
if content.strip():
out.append(UserMessage(content=content))
else:
content = msg.llm_reply_text or msg.body or ""
if content.strip():
out.append(AssistantMessage(content=content))
return out


async def build_pending_compaction_note(user_id: str) -> str:
"""Summary of rows covered by the user's in-flight compaction events.

Returns ``""`` when the user has no recent pending events (the common
case, one indexed SELECT). Failures are swallowed: a broken note must
never block the message turn it decorates.
"""
try:
cutoff = datetime.datetime.now(datetime.UTC) - datetime.timedelta(
minutes=_NOTE_WINDOW_MINUTES
)
db = AsyncSessionLocal()
try:
events = list(
(await db.execute(_recent_pending_events_select(user_id, cutoff))).scalars().all()
)
if not events:
return ""

cs = (
await db.execute(select(ChatSession).filter_by(user_id=user_id))
).scalar_one_or_none()
if cs is None:
return ""

rows: list[Message] = []
budget = _MAX_NOTE_ROWS
for event in events:
if budget <= 0:
break
event_rows = list(
(
await db.execute(
select(Message)
.where(
Message.session_id == cs.id,
Message.seq >= event.min_message_seq,
Message.seq <= event.max_message_seq,
)
.order_by(Message.seq)
.limit(budget)
)
)
.scalars()
.all()
)
rows.extend(event_rows)
budget -= len(event_rows)
finally:
await db.close()

agent_messages = _rows_to_agent_messages(rows)
if not agent_messages:
return ""
summary = summarize_dropped_messages(agent_messages)
return (
"These messages were just archived from your context; their durable "
"facts are being written to your memory right now and will appear "
"in the Your Memory section shortly.\n" + summary
)
except Exception:
logger.exception("Failed to build pending-compaction note for user %s", user_id)
return ""
18 changes: 18 additions & 0 deletions backend/app/agent/system_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import zoneinfo

from backend.app.agent.compaction_note import build_pending_compaction_note
from backend.app.agent.markdown_registry import truncate_for_injection
from backend.app.agent.memory_db import build_memory_context
from backend.app.agent.prompts import load_prompt
Expand Down Expand Up @@ -390,6 +391,23 @@ async def _build_agent_prompt_builder(
memory = await build_memory_section(user.id, query=message_context)
builder.add_section("Your Memory", memory, dynamic=True)

# Continuity cover for the trim-to-compaction window (issue #1432):
# rows dropped by trim vanish from history on the very next message
# (the watermark advances synchronously), but the compaction LLM call
# that moves their facts into MEMORY.md is async and may still be in
# flight. While a recent compaction event is pending, inject a terse
# deterministic summary of the covered rows so the agent's context
# contains either the compacted facts or this note, never neither.
# Dynamic: it appears for a few turns at most and rides the uncached
# current-turn slot, so it never busts the history cache.
pending_note = await build_pending_compaction_note(user.id)
if pending_note:
builder.add_section(
"Recently Archived Conversation (memory update in progress)",
pending_note,
dynamic=True,
)

if current_session_id:
cross = await build_cross_session_context(user.id, current_session_id)
if cross:
Expand Down
135 changes: 135 additions & 0 deletions tests/test_compaction_note.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Tests for the trim-to-compaction continuity note (issue #1432).

The trim watermark advances synchronously, but the compaction LLM call
that extracts the dropped rows' facts is async. While a recent
``'pending'`` compaction event exists, ``build_pending_compaction_note``
rebuilds a deterministic summary of the covered rows so the agent's
context contains either the compacted facts or the note, never neither.
"""

from __future__ import annotations

import datetime

import pytest

from backend.app.agent.compaction_note import build_pending_compaction_note
from backend.app.agent.file_store import UserData
from backend.app.database import db_session_async
from backend.app.enums import MessageDirection
from backend.app.models import ChatSession, CompactionEvent, Message


async def _seed_session_with_messages(user_id: str, message_count: int) -> ChatSession:
"""Insert a ChatSession with alternating inbound/outbound messages."""
async with db_session_async() as db:
cs = ChatSession(
session_id=f"session-{user_id}",
user_id=user_id,
channel="webchat",
initial_system_prompt="",
)
db.add(cs)
await db.flush()
for i in range(1, message_count + 1):
db.add(
Message(
session_id=cs.id,
seq=i,
direction=(
MessageDirection.INBOUND if i % 2 == 1 else MessageDirection.OUTBOUND
),
body=f"deck quote topic {i}",
processed_context="",
tool_interactions_json="",
external_message_id="",
media_urls_json="[]",
)
)
await db.commit()
await db.refresh(cs)
db.expunge(cs)
return cs


async def _insert_event(
user_id: str,
min_seq: int,
max_seq: int,
status: str = "pending",
age_minutes: int = 1,
) -> int:
triggered_at = datetime.datetime.now(datetime.UTC) - datetime.timedelta(minutes=age_minutes)
async with db_session_async() as db:
event = CompactionEvent(
user_id=user_id,
triggered_at=triggered_at,
status=status,
min_message_seq=min_seq,
max_message_seq=max_seq,
trimmed_count=max_seq - min_seq + 1,
)
db.add(event)
await db.commit()
assert event.id is not None
return event.id


@pytest.mark.asyncio()
async def test_no_note_without_events(test_user: UserData) -> None:
await _seed_session_with_messages(test_user.id, message_count=6)
assert await build_pending_compaction_note(test_user.id) == ""


@pytest.mark.asyncio()
async def test_note_while_event_pending(test_user: UserData) -> None:
"""A recent pending event produces a summary of the covered rows."""
await _seed_session_with_messages(test_user.id, message_count=6)
await _insert_event(test_user.id, min_seq=1, max_seq=4)

note = await build_pending_compaction_note(test_user.id)
assert note != ""
# The summarizer surfaces user topics from the covered range.
assert "deck quote topic 1" in note
# And the framing tells the agent the facts are en route to memory.
assert "being written to your memory" in note


@pytest.mark.asyncio()
async def test_note_is_deterministic_across_turns(test_user: UserData) -> None:
"""The note is byte-identical while the same event stays pending."""
await _seed_session_with_messages(test_user.id, message_count=6)
await _insert_event(test_user.id, min_seq=1, max_seq=4)

first = await build_pending_compaction_note(test_user.id)
second = await build_pending_compaction_note(test_user.id)
assert first == second != ""


@pytest.mark.asyncio()
async def test_no_note_after_completion(test_user: UserData) -> None:
"""Completed events stop producing the note: MEMORY.md has the facts."""
await _seed_session_with_messages(test_user.id, message_count=6)
await _insert_event(test_user.id, min_seq=1, max_seq=4, status="completed")

assert await build_pending_compaction_note(test_user.id) == ""


@pytest.mark.asyncio()
async def test_no_note_for_stale_pending_event(test_user: UserData) -> None:
"""A long-stuck pending event must not pin a stale note forever."""
await _seed_session_with_messages(test_user.id, message_count=6)
await _insert_event(test_user.id, min_seq=1, max_seq=4, age_minutes=120)

assert await build_pending_compaction_note(test_user.id) == ""


@pytest.mark.asyncio()
async def test_note_covers_multiple_pending_events(test_user: UserData) -> None:
await _seed_session_with_messages(test_user.id, message_count=10)
await _insert_event(test_user.id, min_seq=1, max_seq=3)
await _insert_event(test_user.id, min_seq=4, max_seq=6)

note = await build_pending_compaction_note(test_user.id)
assert "deck quote topic 1" in note
assert "deck quote topic 5" in note
Loading