From 6f04ce1d48d8a3d6acba4ebf3ce57b52cd8b4e11 Mon Sep 17 00:00:00 2001 From: Mark Nutter Date: Sun, 1 Mar 2026 22:26:42 -0600 Subject: [PATCH 1/2] feat: add scratchpad support for resumable analyses (#58) Implements short-lived working memory for in-progress RLM analyses, based on the "Everything is Context" paper's scratchpad concept. - Add `scratchpad` SQLite table to the existing memory DB with TTL support - New `rlm/scratchpad.py` module: save, get, list, clear, promote - CLI: `rlm scratchpad save|list|get|clear|promote` subcommands - Auto-expire entries after configurable TTL (default 24h) - `promote` graduates scratchpad entries to long-term memory via existing memory pipeline, merging tags and removing from scratchpad - 5 new MCP tools: rlm_scratchpad_save, _list, _get, _clear, _promote Co-Authored-By: Claude Sonnet 4.6 --- mcp/server.py | 156 ++++++++++++++++++++++++++++ rlm/cli.py | 115 ++++++++++++++++++++- rlm/db.py | 92 +++++++++++++++++ rlm/scratchpad.py | 252 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 614 insertions(+), 1 deletion(-) create mode 100644 rlm/scratchpad.py diff --git a/mcp/server.py b/mcp/server.py index 08284b3..da71b94 100644 --- a/mcp/server.py +++ b/mcp/server.py @@ -103,6 +103,50 @@ def handle_tool_call(name: str, arguments: dict) -> str: entry_id = arguments.get("entry_id", "") return run_rlm("forget", entry_id) + elif name == "rlm_scratchpad_save": + content = arguments.get("content", "") + label = arguments.get("label", "") + tags = arguments.get("tags", "") + ttl = arguments.get("ttl_hours", 24) + session = arguments.get("analysis_session", "") + cmd = ["scratchpad", "save", content] + if label: + cmd += ["--label", label] + if tags: + cmd += ["--tags", tags] + if ttl != 24: + cmd += ["--ttl", str(ttl)] + if session: + cmd += ["--session", session] + return run_rlm(*cmd) + + elif name == "rlm_scratchpad_list": + cmd = ["scratchpad", "list"] + if arguments.get("include_expired"): + cmd += ["--all"] + return run_rlm(*cmd) + + elif name == "rlm_scratchpad_get": + entry_id = arguments.get("entry_id", "") + return run_rlm("scratchpad", "get", entry_id) + + elif name == "rlm_scratchpad_clear": + cmd = ["scratchpad", "clear"] + if arguments.get("expired_only"): + cmd += ["--expired"] + return run_rlm(*cmd) + + elif name == "rlm_scratchpad_promote": + entry_id = arguments.get("entry_id", "") + tags = arguments.get("tags", "") + summary = arguments.get("summary", "") + cmd = ["scratchpad", "promote", entry_id] + if tags: + cmd += ["--tags", tags] + if summary: + cmd += ["--summary", summary] + return run_rlm(*cmd) + else: return f"Unknown tool: {name}" @@ -259,6 +303,118 @@ def handle_tool_call(name: str, arguments: dict) -> str: "required": ["entry_id"], }, }, + { + "name": "rlm_scratchpad_save", + "description": ( + "Save intermediate analysis state to the scratchpad — short-lived working memory " + "for in-progress RLM analyses. Use this to record partial results, hypotheses, " + "or findings mid-analysis so they survive context limits. " + "Entries auto-expire after a configurable TTL (default 24h). " + "Use rlm_scratchpad_promote to graduate important findings to long-term memory." + ), + "inputSchema": { + "type": "object", + "properties": { + "content": { + "type": "string", + "description": "The intermediate analysis content to save", + }, + "label": { + "type": "string", + "description": "Short label/title for this scratchpad entry", + }, + "tags": { + "type": "string", + "description": "Comma-separated tags", + }, + "ttl_hours": { + "type": "number", + "description": "Hours until this entry expires (default: 24)", + "default": 24, + }, + "analysis_session": { + "type": "string", + "description": "Optional RLM analysis session ID to associate with", + }, + }, + "required": ["content"], + }, + }, + { + "name": "rlm_scratchpad_list", + "description": ( + "List active scratchpad entries (short-lived working memory). " + "Shows entry IDs, labels, sizes, tags, and time until expiry." + ), + "inputSchema": { + "type": "object", + "properties": { + "include_expired": { + "type": "boolean", + "description": "Include expired entries (default: false)", + "default": False, + }, + }, + }, + }, + { + "name": "rlm_scratchpad_get", + "description": "Get the full content of a specific scratchpad entry by ID.", + "inputSchema": { + "type": "object", + "properties": { + "entry_id": { + "type": "string", + "description": "The scratchpad entry ID (e.g. 'scratch-abc123')", + }, + }, + "required": ["entry_id"], + }, + }, + { + "name": "rlm_scratchpad_clear", + "description": ( + "Clear scratchpad entries. By default removes all entries. " + "Use expired_only=true to only clean up entries past their TTL." + ), + "inputSchema": { + "type": "object", + "properties": { + "expired_only": { + "type": "boolean", + "description": "Only remove expired entries (default: false, removes all)", + "default": False, + }, + }, + }, + }, + { + "name": "rlm_scratchpad_promote", + "description": ( + "Promote a scratchpad entry to long-term memory. " + "Use this when intermediate analysis results are worth keeping permanently. " + "The entry is moved from the scratchpad into the persistent memory store " + "and removed from the scratchpad." + ), + "inputSchema": { + "type": "object", + "properties": { + "entry_id": { + "type": "string", + "description": "The scratchpad entry ID to promote", + }, + "tags": { + "type": "string", + "description": "Additional comma-separated tags for the long-term memory entry", + }, + "summary": { + "type": "string", + "description": "Summary for the long-term memory entry (defaults to entry label)", + }, + }, + "required": ["entry_id"], + }, + }, ] diff --git a/rlm/cli.py b/rlm/cli.py index 8d66331..3a70c35 100644 --- a/rlm/cli.py +++ b/rlm/cli.py @@ -8,7 +8,7 @@ import os import sys -from rlm import scanner, chunker, extractor, state, memory, export, url_fetcher, archive +from rlm import scanner, chunker, extractor, state, memory, export, url_fetcher, archive, scratchpad as scratchpad_mod MAX_OUTPUT = 4000 @@ -610,6 +610,84 @@ def fmt_bytes(b): _print("\n".join(lines)) +def cmd_scratchpad(args): + """Scratchpad subcommands: save, list, get, clear, promote.""" + action = args.scratchpad_action + + if action == "save": + if args.stdin: + import sys as _sys + content = _sys.stdin.read() + elif args.content: + content = args.content + else: + _print("Error: provide content as argument, or use --stdin") + import sys; sys.exit(1) + + if not content.strip(): + _print("Error: empty content") + import sys; sys.exit(1) + + tags = [t.strip() for t in args.tags.split(",")] if args.tags else [] + result = scratchpad_mod.save( + content=content, + label=args.label or "", + tags=tags, + ttl_hours=args.ttl, + analysis_session=args.session, + ) + lines = [ + f"Scratchpad entry saved: {result['id']}", + f"Label: {result['label'] or '(none)'}", + f"TTL: {result['ttl_hours']}h (expires in {result['ttl_hours']:.1f}h)", + f"Size: {result['char_count']:,} chars", + ] + if result["tags"]: + lines.append(f"Tags: {', '.join(result['tags'])}") + if result["analysis_session"]: + lines.append(f"Session: {result['analysis_session']}") + _print("\n".join(lines)) + + elif action == "list": + entries = scratchpad_mod.list_entries(include_expired=args.all) + _print(scratchpad_mod.format_entry_list(entries)) + + elif action == "get": + entry = scratchpad_mod.get(args.id) + if entry is None: + _print(f"Error: no scratchpad entry with id '{args.id}'") + import sys; sys.exit(1) + _print(scratchpad_mod.format_entry(entry)) + + elif action == "clear": + n = scratchpad_mod.clear(expired_only=args.expired) + if args.expired: + _print(f"Cleared {n} expired scratchpad entries.") + else: + _print(f"Cleared {n} scratchpad entries.") + + elif action == "promote": + tags = [t.strip() for t in args.tags.split(",")] if args.tags else [] + result = scratchpad_mod.promote( + entry_id=args.id, + tags=tags or None, + summary=args.summary, + ) + if result is None: + _print(f"Error: no scratchpad entry with id '{args.id}'") + import sys; sys.exit(1) + lines = [ + f"Promoted to long-term memory: {result['id']}", + f"Summary: {result.get('summary', '')}", + f"Tags: {', '.join(result.get('tags', []))}", + ] + _print("\n".join(lines)) + + else: + _print(f"Unknown scratchpad action: {action}") + import sys; sys.exit(1) + + def cmd_tui(args): """Launch the interactive TUI dashboard.""" from rlm.tui import RlmTuiApp @@ -766,6 +844,41 @@ def main(): p_stats = subparsers.add_parser("stats", help="Show memory store statistics") p_stats.set_defaults(func=cmd_stats) + # scratchpad + p_scratch = subparsers.add_parser("scratchpad", help="Manage scratchpad (short-lived working memory)") + scratch_sub = p_scratch.add_subparsers(dest="scratchpad_action", required=True) + + # scratchpad save + p_scratch_save = scratch_sub.add_parser("save", help="Save a scratchpad entry") + p_scratch_save.add_argument("content", nargs="?", help="Content to save") + p_scratch_save.add_argument("--label", help="Short label/title for this entry") + p_scratch_save.add_argument("--tags", help="Comma-separated tags") + p_scratch_save.add_argument("--ttl", type=float, default=24.0, + help="Time-to-live in hours (default: 24)") + p_scratch_save.add_argument("--session", help="Associate with an RLM analysis session ID") + p_scratch_save.add_argument("--stdin", action="store_true", help="Read content from stdin") + + # scratchpad list + p_scratch_list = scratch_sub.add_parser("list", help="List scratchpad entries") + p_scratch_list.add_argument("--all", action="store_true", help="Include expired entries") + + # scratchpad get + p_scratch_get = scratch_sub.add_parser("get", help="Get a scratchpad entry by ID") + p_scratch_get.add_argument("id", help="Scratchpad entry ID") + + # scratchpad clear + p_scratch_clear = scratch_sub.add_parser("clear", help="Clear scratchpad entries") + p_scratch_clear.add_argument("--expired", action="store_true", + help="Only remove expired entries (leave active ones)") + + # scratchpad promote + p_scratch_promote = scratch_sub.add_parser("promote", help="Promote scratchpad entry to long-term memory") + p_scratch_promote.add_argument("id", help="Scratchpad entry ID") + p_scratch_promote.add_argument("--tags", help="Additional comma-separated tags for the memory entry") + p_scratch_promote.add_argument("--summary", help="Override the memory summary") + + p_scratch.set_defaults(func=cmd_scratchpad) + # tui p_tui = subparsers.add_parser("tui", help="Launch interactive TUI dashboard") p_tui.set_defaults(func=cmd_tui) diff --git a/rlm/db.py b/rlm/db.py index cb29a47..16f9ecb 100644 --- a/rlm/db.py +++ b/rlm/db.py @@ -149,6 +149,19 @@ def _init_schema(conn: sqlite3.Connection): END; """) + # --- Scratchpad table (short-lived working memory for in-progress analyses) --- + conn.execute(""" + CREATE TABLE IF NOT EXISTS scratchpad ( + id TEXT PRIMARY KEY, + label TEXT NOT NULL DEFAULT '', + content TEXT NOT NULL, + tags TEXT NOT NULL DEFAULT '[]', + created_at REAL NOT NULL, + expires_at REAL NOT NULL, + analysis_session TEXT + ) + """) + conn.commit() @@ -782,3 +795,82 @@ def _fact_row_to_dict(row: sqlite3.Row) -> dict: if "rank" in d: d["score"] = round(-d.pop("rank"), 2) return d + + +# --- Scratchpad operations --- + + +def insert_scratchpad( + entry_id: str, + label: str, + content: str, + tags: list[str], + created_at: float, + expires_at: float, + analysis_session: str | None = None, +) -> None: + """Insert a new scratchpad entry.""" + conn = _get_conn() + conn.execute( + """INSERT OR REPLACE INTO scratchpad + (id, label, content, tags, created_at, expires_at, analysis_session) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (entry_id, label, content, json.dumps(tags), created_at, expires_at, analysis_session), + ) + conn.commit() + + +def get_scratchpad_entry(entry_id: str) -> dict | None: + """Load a scratchpad entry by ID. Returns None if not found.""" + conn = _get_conn() + row = conn.execute( + "SELECT * FROM scratchpad WHERE id = ?", (entry_id,) + ).fetchone() + if row is None: + return None + return _scratchpad_row_to_dict(row) + + +def list_scratchpad_entries(include_expired: bool = False) -> list[dict]: + """List scratchpad entries, excluding expired ones by default.""" + import time + conn = _get_conn() + if include_expired: + rows = conn.execute( + "SELECT * FROM scratchpad ORDER BY created_at DESC" + ).fetchall() + else: + now = time.time() + rows = conn.execute( + "SELECT * FROM scratchpad WHERE expires_at > ? ORDER BY created_at DESC", + (now,), + ).fetchall() + return [_scratchpad_row_to_dict(row) for row in rows] + + +def delete_scratchpad_entry(entry_id: str) -> bool: + """Delete a scratchpad entry. Returns True if found and deleted.""" + conn = _get_conn() + cursor = conn.execute("DELETE FROM scratchpad WHERE id = ?", (entry_id,)) + conn.commit() + return cursor.rowcount > 0 + + +def clear_scratchpad(expired_only: bool = False) -> int: + """Clear scratchpad entries. Returns count deleted.""" + import time + conn = _get_conn() + if expired_only: + now = time.time() + cursor = conn.execute("DELETE FROM scratchpad WHERE expires_at <= ?", (now,)) + else: + cursor = conn.execute("DELETE FROM scratchpad") + conn.commit() + return cursor.rowcount + + +def _scratchpad_row_to_dict(row: sqlite3.Row) -> dict: + """Convert a scratchpad row to a dict.""" + d = dict(row) + d["tags"] = json.loads(d["tags"]) + return d diff --git a/rlm/scratchpad.py b/rlm/scratchpad.py new file mode 100644 index 0000000..b91c945 --- /dev/null +++ b/rlm/scratchpad.py @@ -0,0 +1,252 @@ +"""Scratchpad: short-lived working memory for in-progress RLM analyses. + +Based on the "Everything is Context" paper's concept of scratchpad files as +temporary working memory that exists between analysis steps but doesn't +permanently occupy long-term memory. + +Scratchpad entries: +- Are stored in the same SQLite DB as long-term memory (separate table) +- Auto-expire after a configurable TTL (default: 24 hours) +- Can be promoted to long-term memory when the analysis is complete +- Are associated with an optional RLM analysis session ID + +Usage: + from rlm import scratchpad + + # Save intermediate analysis state + entry = scratchpad.save("Found 3 security issues in auth module", label="auth-scan") + + # Get an entry + entry = scratchpad.get(entry_id) + + # List active entries + entries = scratchpad.list_entries() + + # Promote to long-term memory + memory_id = scratchpad.promote(entry_id, tags=["security", "auth"]) + + # Clear all entries (or just expired ones) + scratchpad.clear() +""" + +import time +import uuid +from typing import Optional + +from rlm import db, memory + +DEFAULT_TTL_HOURS = 24 + + +def save( + content: str, + label: str = "", + tags: list[str] | None = None, + ttl_hours: float = DEFAULT_TTL_HOURS, + analysis_session: str | None = None, +) -> dict: + """Save a scratchpad entry. + + Args: + content: The text content to save (intermediate analysis results, notes, etc.) + label: Short human-readable label for this entry + tags: Optional list of tags for later filtering/searching + ttl_hours: Hours until this entry auto-expires (default: 24) + analysis_session: Optional RLM session ID this entry is associated with + + Returns: + Dict with entry metadata (id, label, created_at, expires_at, etc.) + """ + memory.init_memory_store() + + now = time.time() + entry_id = f"scratch-{uuid.uuid4().hex[:12]}" + expires_at = now + (ttl_hours * 3600) + + db.insert_scratchpad( + entry_id=entry_id, + label=label or content[:60].replace("\n", " "), + content=content, + tags=tags or [], + created_at=now, + expires_at=expires_at, + analysis_session=analysis_session, + ) + + return { + "id": entry_id, + "label": label, + "char_count": len(content), + "tags": tags or [], + "created_at": now, + "expires_at": expires_at, + "analysis_session": analysis_session, + "ttl_hours": ttl_hours, + } + + +def get(entry_id: str) -> dict | None: + """Retrieve a scratchpad entry by ID. + + Returns the full entry dict including content, or None if not found. + Note: Returns the entry even if expired (callers can check expires_at). + """ + memory.init_memory_store() + return db.get_scratchpad_entry(entry_id) + + +def list_entries(include_expired: bool = False) -> list[dict]: + """List scratchpad entries. + + Args: + include_expired: If True, include entries past their TTL. + + Returns: + List of entry metadata dicts (content is included). + """ + memory.init_memory_store() + return db.list_scratchpad_entries(include_expired=include_expired) + + +def clear(expired_only: bool = False) -> int: + """Clear scratchpad entries. + + Args: + expired_only: If True, only remove expired entries. Otherwise removes all. + + Returns: + Number of entries deleted. + """ + memory.init_memory_store() + return db.clear_scratchpad(expired_only=expired_only) + + +def delete(entry_id: str) -> bool: + """Delete a specific scratchpad entry. + + Returns: + True if found and deleted, False if not found. + """ + memory.init_memory_store() + return db.delete_scratchpad_entry(entry_id) + + +def promote( + entry_id: str, + tags: list[str] | None = None, + summary: str | None = None, +) -> dict | None: + """Promote a scratchpad entry to long-term memory. + + Reads the scratchpad entry, stores it via the memory system, then + deletes it from the scratchpad. + + Args: + entry_id: ID of the scratchpad entry to promote + tags: Additional tags to attach (merged with entry's existing tags) + summary: Override the memory summary (defaults to entry label) + + Returns: + The new memory entry dict, or None if entry not found. + """ + memory.init_memory_store() + + entry = db.get_scratchpad_entry(entry_id) + if entry is None: + return None + + # Merge tags: entry tags + any new tags + all_tags = list(entry.get("tags", [])) + if tags: + for t in tags: + if t not in all_tags: + all_tags.append(t) + + # Add scratchpad provenance tag + if "scratchpad" not in all_tags: + all_tags.append("scratchpad") + + mem_summary = summary or entry.get("label") or entry["content"][:80] + + result = memory.add_memory( + content=entry["content"], + tags=all_tags, + source="scratchpad", + source_name=entry_id, + summary=mem_summary, + ) + + # Remove from scratchpad now that it's in long-term memory + db.delete_scratchpad_entry(entry_id) + + return result + + +def format_entry_list(entries: list[dict]) -> str: + """Format a list of scratchpad entries for CLI display.""" + if not entries: + return "No scratchpad entries." + + import datetime + + lines = [f"Scratchpad entries ({len(entries)}):\n"] + now = time.time() + + for e in entries: + entry_id = e["id"] + label = e.get("label") or "(no label)" + tags = e.get("tags", []) + char_count = len(e.get("content", "")) + created = datetime.datetime.fromtimestamp(e["created_at"]).strftime("%Y-%m-%d %H:%M") + expires = e["expires_at"] + + if expires <= now: + ttl_str = "EXPIRED" + else: + remaining_h = (expires - now) / 3600 + if remaining_h < 1: + ttl_str = f"expires in {int(remaining_h * 60)}m" + else: + ttl_str = f"expires in {remaining_h:.1f}h" + + line = f" {entry_id} {label[:50]}" + if tags: + line += f" [{', '.join(tags[:3])}]" + line += f" ({char_count:,} chars, {created}, {ttl_str})" + if e.get("analysis_session"): + line += f" session={e['analysis_session']}" + lines.append(line) + + return "\n".join(lines) + + +def format_entry(entry: dict) -> str: + """Format a single scratchpad entry for CLI display.""" + import datetime + + now = time.time() + created = datetime.datetime.fromtimestamp(entry["created_at"]).strftime("%Y-%m-%d %H:%M:%S") + expires = entry["expires_at"] + + if expires <= now: + ttl_str = "EXPIRED" + else: + remaining_h = (expires - now) / 3600 + if remaining_h < 1: + ttl_str = f"expires in {int(remaining_h * 60)}m" + else: + ttl_str = f"expires in {remaining_h:.1f}h" + + lines = [ + f"ID: {entry['id']}", + f"Label: {entry.get('label') or '(none)'}", + f"Created: {created}", + f"Expires: {ttl_str}", + f"Tags: {', '.join(entry.get('tags', [])) or '(none)'}", + f"Size: {len(entry.get('content', '')):,} chars", + ] + if entry.get("analysis_session"): + lines.append(f"Session: {entry['analysis_session']}") + lines.append("") + lines.append(entry.get("content", "")) + return "\n".join(lines) From b4db51ca3ce949ca029ffcd7e1814fc01f8c7e94 Mon Sep 17 00:00:00 2001 From: Mark Nutter Date: Sun, 1 Mar 2026 22:31:09 -0600 Subject: [PATCH 2/2] test: add scratchpad test suite (issue #58) 26 tests covering db CRUD, high-level scratchpad API (save/get/list/clear/promote), TTL expiry logic, tag round-trips, promote-to-long-term-memory, and CLI formatting. Total test count: 157 (was 131). Co-Authored-By: Claude Sonnet 4.6 --- tests/test_scratchpad.py | 373 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 373 insertions(+) create mode 100644 tests/test_scratchpad.py diff --git a/tests/test_scratchpad.py b/tests/test_scratchpad.py new file mode 100644 index 0000000..60a7527 --- /dev/null +++ b/tests/test_scratchpad.py @@ -0,0 +1,373 @@ +"""Tests for the scratchpad module — short-lived working memory.""" + +import os +import tempfile +import time +import unittest + +# Use a temporary database for all tests (must be set before rlm.db is imported) +_test_db_dir = tempfile.mkdtemp(prefix="rlm-test-scratchpad-") +os.environ.setdefault("RLM_MEMORY_DIR", _test_db_dir) + +import rlm.db as db_mod + +db_mod.MEMORY_DIR = _test_db_dir +db_mod.DB_PATH = os.path.join(_test_db_dir, "memory.db") + +from rlm import db, scratchpad + + +class TestScratchpadDBOperations(unittest.TestCase): + """Test scratchpad table CRUD via db module.""" + + def setUp(self): + conn = db._get_conn() + conn.execute("DELETE FROM scratchpad") + conn.commit() + + def test_insert_and_get(self): + now = time.time() + db.insert_scratchpad( + entry_id="scratch-test001", + label="Auth scan findings", + content="Found 3 issues in auth module.", + tags=["security", "auth"], + created_at=now, + expires_at=now + 3600, + analysis_session="sess-abc", + ) + entry = db.get_scratchpad_entry("scratch-test001") + self.assertIsNotNone(entry) + self.assertEqual(entry["id"], "scratch-test001") + self.assertEqual(entry["label"], "Auth scan findings") + self.assertEqual(entry["content"], "Found 3 issues in auth module.") + self.assertEqual(entry["tags"], ["security", "auth"]) + self.assertEqual(entry["analysis_session"], "sess-abc") + + def test_get_nonexistent_returns_none(self): + self.assertIsNone(db.get_scratchpad_entry("scratch-doesnotexist")) + + def test_delete_entry(self): + now = time.time() + db.insert_scratchpad( + entry_id="scratch-del01", + label="Delete me", + content="temp content", + tags=[], + created_at=now, + expires_at=now + 3600, + ) + self.assertTrue(db.delete_scratchpad_entry("scratch-del01")) + self.assertIsNone(db.get_scratchpad_entry("scratch-del01")) + + def test_delete_nonexistent_returns_false(self): + self.assertFalse(db.delete_scratchpad_entry("scratch-ghost")) + + def test_list_excludes_expired_by_default(self): + now = time.time() + db.insert_scratchpad( + entry_id="scratch-active", + label="Active", + content="active content", + tags=[], + created_at=now, + expires_at=now + 3600, + ) + db.insert_scratchpad( + entry_id="scratch-expired", + label="Expired", + content="expired content", + tags=[], + created_at=now - 7200, + expires_at=now - 3600, # already expired + ) + entries = db.list_scratchpad_entries(include_expired=False) + ids = [e["id"] for e in entries] + self.assertIn("scratch-active", ids) + self.assertNotIn("scratch-expired", ids) + + def test_list_include_expired(self): + now = time.time() + db.insert_scratchpad( + entry_id="scratch-active2", + label="Active", + content="active", + tags=[], + created_at=now, + expires_at=now + 3600, + ) + db.insert_scratchpad( + entry_id="scratch-expired2", + label="Expired", + content="expired", + tags=[], + created_at=now - 7200, + expires_at=now - 3600, + ) + entries = db.list_scratchpad_entries(include_expired=True) + ids = [e["id"] for e in entries] + self.assertIn("scratch-active2", ids) + self.assertIn("scratch-expired2", ids) + + def test_clear_all(self): + now = time.time() + for i in range(3): + db.insert_scratchpad( + entry_id=f"scratch-clr{i}", + label=f"Entry {i}", + content="content", + tags=[], + created_at=now, + expires_at=now + 3600, + ) + deleted = db.clear_scratchpad(expired_only=False) + self.assertEqual(deleted, 3) + self.assertEqual(len(db.list_scratchpad_entries(include_expired=True)), 0) + + def test_clear_expired_only(self): + now = time.time() + db.insert_scratchpad( + entry_id="scratch-keep", + label="Keep", + content="keep", + tags=[], + created_at=now, + expires_at=now + 3600, + ) + db.insert_scratchpad( + entry_id="scratch-prune", + label="Prune", + content="prune", + tags=[], + created_at=now - 7200, + expires_at=now - 3600, + ) + deleted = db.clear_scratchpad(expired_only=True) + self.assertEqual(deleted, 1) + remaining = db.list_scratchpad_entries(include_expired=True) + self.assertEqual(len(remaining), 1) + self.assertEqual(remaining[0]["id"], "scratch-keep") + + def test_tags_round_trip(self): + now = time.time() + tags = ["analysis", "security", "in-progress"] + db.insert_scratchpad( + entry_id="scratch-tags", + label="Tag test", + content="content", + tags=tags, + created_at=now, + expires_at=now + 3600, + ) + entry = db.get_scratchpad_entry("scratch-tags") + self.assertEqual(entry["tags"], tags) + + +class TestScratchpadHighLevel(unittest.TestCase): + """Test scratchpad module API (save, get, list_entries, clear, promote).""" + + def setUp(self): + conn = db._get_conn() + conn.execute("DELETE FROM scratchpad") + conn.execute("DELETE FROM entries") + conn.execute("DELETE FROM facts") + conn.commit() + + def test_save_returns_metadata(self): + result = scratchpad.save( + "Intermediate finding: 3 auth bugs", + label="auth-scan-step1", + tags=["auth", "security"], + ttl_hours=12, + ) + self.assertIn("id", result) + self.assertTrue(result["id"].startswith("scratch-")) + self.assertEqual(result["label"], "auth-scan-step1") + self.assertEqual(result["tags"], ["auth", "security"]) + self.assertEqual(result["ttl_hours"], 12) + self.assertGreater(result["expires_at"], result["created_at"]) + self.assertAlmostEqual( + result["expires_at"] - result["created_at"], 12 * 3600, delta=5 + ) + + def test_save_auto_label_from_content(self): + result = scratchpad.save("Found SQL injection vulnerability in user login form") + # When label not provided, returned dict has empty label (input passthrough), + # but the stored DB entry derives label from content[:60] + self.assertEqual(result["label"], "") + entry = scratchpad.get(result["id"]) + self.assertIn("Found SQL injection", entry["label"]) + + def test_get_returns_full_entry(self): + saved = scratchpad.save("Full content here", label="test-get") + entry = scratchpad.get(saved["id"]) + self.assertIsNotNone(entry) + self.assertEqual(entry["content"], "Full content here") + self.assertEqual(entry["label"], "test-get") + + def test_get_nonexistent_returns_none(self): + self.assertIsNone(scratchpad.get("scratch-doesnotexist")) + + def test_list_entries_active_only(self): + scratchpad.save("Active entry", label="active", ttl_hours=24) + # Insert an already-expired entry directly via db + now = time.time() + db.insert_scratchpad( + entry_id="scratch-oldexp", + label="Expired entry", + content="old content", + tags=[], + created_at=now - 7200, + expires_at=now - 3600, + ) + entries = scratchpad.list_entries(include_expired=False) + labels = [e["label"] for e in entries] + self.assertIn("active", labels) + self.assertNotIn("Expired entry", labels) + + def test_clear_all(self): + scratchpad.save("entry 1", ttl_hours=1) + scratchpad.save("entry 2", ttl_hours=2) + n = scratchpad.clear(expired_only=False) + self.assertEqual(n, 2) + self.assertEqual(scratchpad.list_entries(include_expired=True), []) + + def test_clear_expired_only(self): + scratchpad.save("keep this", ttl_hours=24) + now = time.time() + db.insert_scratchpad( + entry_id="scratch-xexp", + label="Expired", + content="expired", + tags=[], + created_at=now - 7200, + expires_at=now - 3600, + ) + n = scratchpad.clear(expired_only=True) + self.assertEqual(n, 1) + remaining = scratchpad.list_entries(include_expired=True) + self.assertEqual(len(remaining), 1) + self.assertEqual(remaining[0]["label"], "keep this") + + def test_default_ttl_is_24h(self): + result = scratchpad.save("content with default ttl") + self.assertEqual(result["ttl_hours"], 24) + self.assertAlmostEqual( + result["expires_at"] - result["created_at"], 24 * 3600, delta=5 + ) + + def test_analysis_session_association(self): + result = scratchpad.save( + "Session finding", analysis_session="rlm-sess-xyz" + ) + entry = scratchpad.get(result["id"]) + self.assertEqual(entry["analysis_session"], "rlm-sess-xyz") + + def test_promote_moves_to_long_term_memory(self): + saved = scratchpad.save( + "Important: we decided to use SQLite for storage.", + label="arch-decision", + tags=["architecture"], + ) + entry_id = saved["id"] + + # Promote with additional tags + mem_entry = scratchpad.promote(entry_id, tags=["promoted"]) + + self.assertIsNotNone(mem_entry) + # Should be in long-term memory + long_term = db.get_entry(mem_entry["id"]) + self.assertIsNotNone(long_term) + self.assertIn("architecture", long_term["tags"]) + self.assertIn("scratchpad", long_term["tags"]) + self.assertIn("promoted", long_term["tags"]) + + # Should be removed from scratchpad + self.assertIsNone(scratchpad.get(entry_id)) + + def test_promote_nonexistent_returns_none(self): + result = scratchpad.promote("scratch-ghost") + self.assertIsNone(result) + + def test_promote_uses_custom_summary(self): + saved = scratchpad.save("Long detailed analysis content...", label="orig-label") + mem_entry = scratchpad.promote(saved["id"], summary="Custom memory summary") + long_term = db.get_entry(mem_entry["id"]) + self.assertEqual(long_term["summary"], "Custom memory summary") + + def test_promote_merges_tags_without_duplicates(self): + saved = scratchpad.save("content", tags=["alpha", "beta"]) + mem_entry = scratchpad.promote(saved["id"], tags=["beta", "gamma"]) + long_term = db.get_entry(mem_entry["id"]) + tags = long_term["tags"] + # beta should only appear once + self.assertEqual(tags.count("beta"), 1) + self.assertIn("alpha", tags) + self.assertIn("gamma", tags) + + +class TestScratchpadFormatting(unittest.TestCase): + """Test format_entry_list and format_entry output.""" + + def test_format_entry_list_empty(self): + output = scratchpad.format_entry_list([]) + self.assertEqual(output, "No scratchpad entries.") + + def test_format_entry_list_shows_count(self): + now = time.time() + entries = [ + { + "id": "scratch-aaa", + "label": "First entry", + "tags": ["foo"], + "content": "some content", + "created_at": now, + "expires_at": now + 3600, + "analysis_session": None, + }, + { + "id": "scratch-bbb", + "label": "Second entry", + "tags": [], + "content": "more content", + "created_at": now, + "expires_at": now + 7200, + "analysis_session": "sess-1", + }, + ] + output = scratchpad.format_entry_list(entries) + self.assertIn("2", output) + self.assertIn("scratch-aaa", output) + self.assertIn("scratch-bbb", output) + self.assertIn("First entry", output) + + def test_format_entry_shows_expired(self): + now = time.time() + entry = { + "id": "scratch-exp", + "label": "Old entry", + "tags": [], + "content": "old", + "created_at": now - 7200, + "expires_at": now - 1, + } + output = scratchpad.format_entry_list([entry]) + self.assertIn("EXPIRED", output) + + def test_format_entry_full(self): + now = time.time() + entry = { + "id": "scratch-full", + "label": "Full test", + "tags": ["a", "b"], + "content": "The actual content text.", + "created_at": now, + "expires_at": now + 3600, + "analysis_session": "sess-xyz", + } + output = scratchpad.format_entry(entry) + self.assertIn("scratch-full", output) + self.assertIn("Full test", output) + self.assertIn("a, b", output) + self.assertIn("sess-xyz", output) + self.assertIn("The actual content text.", output)