Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/superlocalmemory/core/recall_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ def _get_engine():
return _engine


def _handle_recall(query: str, limit: int, session_id: str = "") -> dict:
def _handle_recall(
query: str, limit: int, session_id: str = "", fast: bool = False,
) -> dict:
engine = _get_engine()
response = engine.recall(
query, limit=limit, session_id=session_id or None,
query, limit=limit, session_id=session_id or None, fast=bool(fast),
)

# Batch-fetch original memory text for all results
Expand Down Expand Up @@ -290,7 +292,7 @@ def _worker_main() -> None:
if cmd == "recall":
result = _handle_recall(
req.get("query", ""), req.get("limit", 10),
req.get("session_id", ""),
req.get("session_id", ""), bool(req.get("fast", False)),
)
_respond(result)
elif cmd == "store":
Expand Down
3 changes: 2 additions & 1 deletion src/superlocalmemory/core/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def shared(cls) -> WorkerPool:

def recall(
self, query: str, limit: int = 10, session_id: str = "",
fast: bool = False,
) -> dict:
"""Run recall in worker subprocess. Returns result dict.

Expand All @@ -77,7 +78,7 @@ def recall(
"""
return self._send({
"cmd": "recall", "query": query, "limit": limit,
"session_id": session_id or "",
"session_id": session_id or "", "fast": bool(fast),
})

def store(self, content: str, metadata: dict | None = None) -> dict:
Expand Down
10 changes: 7 additions & 3 deletions src/superlocalmemory/mcp/_daemon_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ def _url(self, path: str) -> str:

def recall(
self, query: str, limit: int = 10, session_id: str = "",
fast: bool = False,
) -> dict[str, Any]:
params = urllib.parse.urlencode(
{"q": query, "limit": limit, "session_id": session_id or ""}
)
params = urllib.parse.urlencode({
"q": query,
"limit": limit,
"session_id": session_id or "",
"fast": "true" if fast else "false",
})
try:
with urllib.request.urlopen(
self._url(f"/recall?{params}"), timeout=self._timeout,
Expand Down
9 changes: 7 additions & 2 deletions src/superlocalmemory/mcp/_pool_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,17 @@ def _unwrap_error(raw: Any, op: str) -> None:
raise PoolError(f"pool.{op} failed: {reason}")


def pool_recall(query: str, limit: int = 10, **_: Any) -> PoolRecallResponse:
def pool_recall(query: str, limit: int = 10, **kwargs: Any) -> PoolRecallResponse:
"""Call pool.recall and reshape its dict into a typed response.

Raises :class:`PoolError` on worker death or any non-ok envelope.
"""
raw = _pool().recall(query=query, limit=limit)
raw = _pool().recall(
query=query,
limit=limit,
session_id=str(kwargs.get("session_id") or ""),
fast=bool(kwargs.get("fast", False)),
)
_unwrap_error(raw, "recall")
items = raw.get("results", []) if isinstance(raw, dict) else []
results = [
Expand Down
49 changes: 31 additions & 18 deletions src/superlocalmemory/mcp/tools_active.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ async def session_init(
The AI should call this automatically before any other work.
"""
try:
from superlocalmemory.hooks.auto_recall import AutoRecall
from superlocalmemory.hooks.rules_engine import RulesEngine
from superlocalmemory.mcp._pool_adapter import pool_recall

Expand All @@ -104,21 +103,37 @@ async def session_init(
return {"success": True, "context": "", "memories": [], "message": "Auto-recall disabled"}

recall_config = rules.get_recall_config()
auto = AutoRecall(
recall_fn=pool_recall,
config={
"enabled": True,
"max_memories_injected": max_results,
"relevance_threshold": recall_config.get("relevance_threshold", 0.3),
},
)

# Get formatted context for system prompt injection
context = auto.get_session_context(project_path=project_path, query=query)

# Get structured results for tool response
search_query = query or f"project context {project_path}" if project_path else "recent important decisions"
memories = auto.get_query_context(search_query)
relevance_threshold = recall_config.get("relevance_threshold", 0.3)
if query:
search_query = query
elif project_path:
search_query = f"project context {project_path}"
else:
search_query = "recent important decisions"

response = pool_recall(search_query, limit=max_results, fast=True)
relevant = [
r for r in response.results
if r.score >= relevance_threshold
]

# Build both return shapes from one recall. Calling recall twice
# doubles session startup latency and can return duplicate snippets.
context = ""
if relevant:
lines = ["# Relevant Memory Context", ""]
for r in relevant[:max_results]:
lines.append(f"- {r.fact.content[:200]}")
context = "\n".join(lines)

memories = [
{
"fact_id": r.fact.fact_id,
"content": r.fact.content[:300],
"score": round(r.score, 3),
}
for r in relevant[:max_results]
]

# Get learning status
pid = engine.profile_id
Expand Down Expand Up @@ -184,7 +199,6 @@ async def observe(
from superlocalmemory.hooks.rules_engine import RulesEngine
from superlocalmemory.mcp._pool_adapter import pool_store

engine = get_engine()
rules = RulesEngine()

auto = AutoCapture(
Expand Down Expand Up @@ -305,7 +319,6 @@ async def close_session(session_id: str = "") -> dict:
"""
try:
engine = get_engine()
pid = engine.profile_id
sid = session_id or getattr(engine, '_last_session_id', '')
if not sid:
return {"success": False, "error": "No session_id provided"}
Expand Down
11 changes: 5 additions & 6 deletions src/superlocalmemory/mcp/tools_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@

from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Any, Callable
from typing import Callable

from mcp.types import ToolAnnotations

Expand Down Expand Up @@ -111,7 +110,6 @@ async def remember(
Extracts atomic facts, resolves entities, builds graph edges,
and indexes for 4-channel retrieval.
"""
import asyncio
try:
# v3.4.32: Store-first pattern. Write to pending.db and return
# immediately. The daemon's pending-materializer thread drains
Expand Down Expand Up @@ -141,7 +139,7 @@ async def remember(
@server.tool(annotations=ToolAnnotations(readOnlyHint=True))
async def recall(
query: str, limit: int = 10, agent_id: str = "mcp_client",
session_id: str = "",
session_id: str = "", fast: bool = False,
) -> dict:
"""Search memories by semantic query with 4-channel retrieval, RRF fusion, and reranking.

Expand All @@ -153,8 +151,8 @@ async def recall(
"""
import asyncio
try:
from superlocalmemory.core.worker_pool import WorkerPool
pool = WorkerPool.shared()
from superlocalmemory.mcp._daemon_proxy import choose_pool
pool = choose_pool()
# S9-DASH-10: priority for session_id, so engagement
# signals land on the right pending_outcome:
# 1. Explicit ``session_id`` tool-call argument.
Expand Down Expand Up @@ -199,6 +197,7 @@ async def recall(
# block behind a single threading.Lock. See worker_pool.py.
result = await asyncio.to_thread(
pool.recall, query, limit=limit, session_id=effective_sid,
fast=bool(fast),
)
if result.get("ok"):
# Record implicit feedback: every returned result is a recall_hit
Expand Down
4 changes: 2 additions & 2 deletions src/superlocalmemory/mcp/tools_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ async def recall_trace(query: str, limit: int = 10) -> dict:
limit: Maximum results (default 10).
"""
try:
from superlocalmemory.core.worker_pool import WorkerPool
raw = WorkerPool.shared().recall(query=query, limit=limit)
from superlocalmemory.mcp._daemon_proxy import choose_pool
raw = choose_pool().recall(query=query, limit=limit)
items = raw.get("results", []) if isinstance(raw, dict) else []
results = []
for item in items[:limit]:
Expand Down
6 changes: 3 additions & 3 deletions src/superlocalmemory/storage/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,23 +252,23 @@ def _set_pragmas(conn: sqlite3.Connection) -> None:
-- left by V2 migration.

-- INSERT trigger
CREATE TRIGGER atomic_facts_fts_insert
CREATE TRIGGER IF NOT EXISTS atomic_facts_fts_insert
AFTER INSERT ON atomic_facts
BEGIN
INSERT INTO atomic_facts_fts (rowid, fact_id, content)
VALUES (NEW.rowid, NEW.fact_id, NEW.content);
END;

-- DELETE trigger
CREATE TRIGGER atomic_facts_fts_delete
CREATE TRIGGER IF NOT EXISTS atomic_facts_fts_delete
AFTER DELETE ON atomic_facts
BEGIN
INSERT INTO atomic_facts_fts (atomic_facts_fts, rowid, fact_id, content)
VALUES ('delete', OLD.rowid, OLD.fact_id, OLD.content);
END;

-- UPDATE trigger
CREATE TRIGGER atomic_facts_fts_update
CREATE TRIGGER IF NOT EXISTS atomic_facts_fts_update
AFTER UPDATE OF content ON atomic_facts
BEGIN
INSERT INTO atomic_facts_fts (atomic_facts_fts, rowid, fact_id, content)
Expand Down
29 changes: 29 additions & 0 deletions tests/test_core/test_worker_pool_fast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2026 Varun Pratap Bhardwaj / Qualixar
# Licensed under AGPL-3.0-or-later - see LICENSE file
# Part of SuperLocalMemory V3 | https://qualixar.com | https://varunpratap.com

"""WorkerPool recall request shaping."""

from __future__ import annotations


def test_worker_pool_recall_forwards_fast_flag(monkeypatch):
from superlocalmemory.core.worker_pool import WorkerPool

pool = WorkerPool()
sent = {}

def _fake_send(payload):
sent.update(payload)
return {"ok": True}

monkeypatch.setattr(pool, "_send", _fake_send)

assert pool.recall("q", limit=3, session_id="s-1", fast=True) == {"ok": True}
assert sent == {
"cmd": "recall",
"query": "q",
"limit": 3,
"session_id": "s-1",
"fast": True,
}
24 changes: 20 additions & 4 deletions tests/test_mcp/test_mcp_daemon_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
from __future__ import annotations

import json
from types import SimpleNamespace

import pytest

from superlocalmemory.mcp._daemon_proxy import DaemonPoolProxy, choose_pool
from superlocalmemory.mcp._daemon_proxy import DaemonPoolProxy
from superlocalmemory.mcp._pool_adapter import (
PoolError, pool_recall, pool_store,
)
Expand All @@ -21,7 +20,7 @@
class TestPoolErrorSurfacing:
def test_pool_recall_raises_on_ok_false(self, monkeypatch):
class _Dead:
def recall(self, query, limit=10, session_id=""):
def recall(self, query, limit=10, session_id="", fast=False):
return {"ok": False, "error": "worker died"}
from superlocalmemory.mcp import _pool_adapter
monkeypatch.setattr(_pool_adapter, "_pool", lambda: _Dead())
Expand All @@ -41,7 +40,7 @@ def store(self, content, metadata=None):

def test_pool_recall_success_does_not_raise(self, monkeypatch):
class _Ok:
def recall(self, query, limit=10, session_id=""):
def recall(self, query, limit=10, session_id="", fast=False):
return {"ok": True, "results": [], "query_type": "x"}
from superlocalmemory.mcp import _pool_adapter
monkeypatch.setattr(_pool_adapter, "_pool", lambda: _Ok())
Expand Down Expand Up @@ -73,6 +72,23 @@ def _fake_urlopen(req, timeout=30):
assert "limit=3" in captured["url"]
assert "session_id=s-1" in captured["url"]

def test_recall_forwards_fast_flag(self, monkeypatch):
captured = {}

def _fake_urlopen(req, timeout=30):
captured["url"] = getattr(req, "full_url", req)
return _FakeResp(json.dumps({
"ok": True, "results": [], "query_type": "semantic",
}).encode())

import superlocalmemory.mcp._daemon_proxy as mod
monkeypatch.setattr(mod.urllib.request, "urlopen", _fake_urlopen)

proxy = DaemonPoolProxy(port=9999)
out = proxy.recall("fast path", fast=True)
assert out["ok"] is True
assert "fast=true" in captured["url"]

def test_store_forwards_http_post(self, monkeypatch):
captured = {}

Expand Down
28 changes: 20 additions & 8 deletions tests/test_mcp/test_mcp_pool_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ def __init__(self):
self.recall_calls = []
self.store_calls = []

def recall(self, query: str, limit: int = 10, session_id: str = ""):
self.recall_calls.append((query, limit))
def recall(
self, query: str, limit: int = 10, session_id: str = "",
fast: bool = False,
):
self.recall_calls.append((query, limit, session_id, fast))
return {
"ok": True,
"query": query,
Expand Down Expand Up @@ -59,7 +62,16 @@ def test_pool_recall_reshapes_dict(self, monkeypatch):
assert resp.results[0].fact.fact_id == "f1"
assert resp.results[0].fact.content == "first memory content"
assert resp.results[0].score == 0.91
assert fake.recall_calls == [("hello", 5)]
assert fake.recall_calls == [("hello", 5, "", False)]

def test_pool_recall_forwards_session_id_and_fast(self, monkeypatch):
from superlocalmemory.mcp import _pool_adapter
fake = _FakePool()
monkeypatch.setattr(_pool_adapter, "_pool", lambda: fake)

_pool_adapter.pool_recall("hello", limit=5, session_id="s-1", fast=True)

assert fake.recall_calls == [("hello", 5, "s-1", True)]

def test_pool_store_returns_fact_ids(self, monkeypatch):
from superlocalmemory.mcp import _pool_adapter
Expand All @@ -75,7 +87,7 @@ def test_pool_recall_handles_empty_results(self, monkeypatch):
from superlocalmemory.mcp import _pool_adapter

class _Empty:
def recall(self, query, limit=10, session_id=""):
def recall(self, query, limit=10, session_id="", fast=False):
return {"ok": True, "results": []}

monkeypatch.setattr(_pool_adapter, "_pool", lambda: _Empty())
Expand All @@ -89,7 +101,7 @@ def test_pool_recall_raises_on_ok_false(self, monkeypatch):
from superlocalmemory.mcp._pool_adapter import PoolError

class _Dead:
def recall(self, query, limit=10, session_id=""):
def recall(self, query, limit=10, session_id="", fast=False):
return {"ok": False, "error": "worker died"}

monkeypatch.setattr(_pool_adapter, "_pool", lambda: _Dead())
Expand Down Expand Up @@ -170,9 +182,9 @@ def _wrap(fn):
result = asyncio.run(registered["session_init"](project_path="/tmp/p"))

assert result["success"] is True
# At least one recall landed through the pool adapter
assert fake_pool.recall_calls, \
"session_init did not route through pool_recall"
assert fake_pool.recall_calls == [
("project context /tmp/p", 10, "", True),
], "session_init should perform one fast recall through pool_recall"

def test_observe_uses_pool_adapter_not_engine_store(self, monkeypatch):
import asyncio
Expand Down
Loading