Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 42 additions & 52 deletions muninn/core/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,39 @@
import asyncio
import hashlib
import json
import uuid
import time
import logging
import os
from typing import List, Optional, Dict, Any, Tuple
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from muninn.advanced.cross_agent import FederationManager
from muninn.advanced.temporal_kg import TemporalKnowledgeGraph
from muninn.chains import MemoryChainDetector
from muninn.consolidation.daemon import ConsolidationDaemon
from muninn.core.config import SUPPORTED_MODEL_PROFILES, MuninnConfig
from muninn.core.feature_flags import get_flags
from muninn.core.ingestion_manager import IngestionManager
from muninn.core.types import (
MemoryRecord, MemoryType, Provenance, SearchResult,
ExtractionResult, Entity, Relation,
ExtractionResult,
MemoryRecord,
MemoryType,
Provenance,
SearchResult,
)
from muninn.core.config import MuninnConfig, SUPPORTED_MODEL_PROFILES
from muninn.store.sqlite_metadata import SQLiteMetadataStore
from muninn.store.vector_store import VectorStore
from muninn.store.graph_store import GraphStore
from muninn.retrieval.bm25 import BM25Index
from muninn.retrieval.reranker import Reranker
from muninn.retrieval.hybrid import HybridRetriever
from muninn.retrieval.scout import MuninnScout
from muninn.extraction.pipeline import ExtractionPipeline
from muninn.scoring.importance import calculate_importance, calculate_novelty
from muninn.consolidation.daemon import ConsolidationDaemon
from muninn.goal import GoalCompass
from muninn.observability import OTelGenAITracer
from muninn.chains import MemoryChainDetector
from muninn.ingestion import IngestionPipeline, discover_legacy_sources as discover_legacy_sources_catalog
from muninn.ingestion import IngestionPipeline
from muninn.ingestion import discover_legacy_sources as discover_legacy_sources_catalog
from muninn.ingestion.parser import infer_source_type
from muninn.core.ingestion_manager import IngestionManager
from muninn.advanced.temporal_kg import TemporalKnowledgeGraph
from muninn.advanced.cross_agent import FederationManager
from muninn.core.feature_flags import get_flags
from muninn.observability import OTelGenAITracer
from muninn.retrieval.bm25 import BM25Index
from muninn.retrieval.hybrid import HybridRetriever
from muninn.retrieval.reranker import Reranker
from muninn.retrieval.scout import MuninnScout
from muninn.store.graph_store import GraphStore
from muninn.store.sqlite_metadata import SQLiteMetadataStore
from muninn.store.vector_store import VectorStore

logger = logging.getLogger("Muninn")

Expand Down Expand Up @@ -409,19 +411,7 @@ def _upsert_memory_chain_links(
candidate_records=candidate_records,
)

persisted = 0
for link in links:
created = self._graph.add_chain_link(
predecessor_id=link.predecessor_id,
successor_id=link.successor_id,
relation_type=link.relation_type,
confidence=link.confidence,
reason=link.reason,
shared_entities=link.shared_entities,
hours_apart=link.hours_apart,
)
if created:
persisted += 1
persisted = self._graph.add_chain_links_batch(links)
return persisted
except Exception as e:
logger.warning("Memory-chain linking failed (non-fatal): %s", e)
Expand Down Expand Up @@ -476,8 +466,8 @@ async def add(
# Handle terminal early returns (DEDUP_SKIP, CONFLICT_SKIP)
# Note: DEDUP_SIGNAL_UPDATE is handled below as it may fall through to ADD.
if (
processed.get("id") is None
and "event" in processed
processed.get("id") is None
and "event" in processed
and processed["event"] not in ("PROCESS_COMPLETE", "DEDUP_SIGNAL_UPDATE")
):
return processed
Expand All @@ -487,7 +477,7 @@ async def add(
dedup_result = processed["dedup"]
embedding = processed["embedding"]
record = processed["record"]

merged_successfully = False
async with self._write_lock:
existing = await asyncio.to_thread(self._metadata.get, dedup_result.existing_memory_id)
Expand All @@ -514,7 +504,7 @@ async def add(
asyncio.to_thread(self._bm25.add, dedup_result.existing_memory_id, merged_content, user_id, namespace)
)
merged_successfully = True

if merged_successfully:
return {
"id": dedup_result.existing_memory_id,
Expand Down Expand Up @@ -557,7 +547,7 @@ def _write_graph():
uid = record.metadata.get("user_id", "global")
ns = record.namespace
self._graph.add_memory_node(
record.id,
record.id,
extraction.summary or content[:200],
user_id=uid, namespace=ns
)
Expand Down Expand Up @@ -612,7 +602,7 @@ def _write_colbert():
}
if conflict_info:
result["conflict"] = conflict_info

if self._goal_compass is not None and record.project:
drift = await self._goal_compass.evaluate_drift(
text=content,
Expand Down Expand Up @@ -680,7 +670,7 @@ async def search(
{"query_preview": self._otel.maybe_content(query)},
)
effective_filters = dict(filters or {})

# v3.24.0: Default to excluding archived memories
if "archived" not in effective_filters:
effective_filters["archived"] = False
Expand Down Expand Up @@ -918,7 +908,7 @@ async def record_retrieval_feedback(
)

# Update Elo rating based on feedback outcome
from muninn.scoring.elo import calculate_elo_update, INITIAL_ELO
from muninn.scoring.elo import INITIAL_ELO, calculate_elo_update
record = await asyncio.to_thread(self._metadata.get, memory_id)
if record:
current_elo = record.metadata.get("elo_rating", INITIAL_ELO) if record.metadata else INITIAL_ELO
Expand Down Expand Up @@ -1047,7 +1037,7 @@ async def set_project_goal(
raise RuntimeError("Goal compass is disabled by feature flag")
if not goal_statement.strip():
raise ValueError("goal_statement cannot be empty")

async with self._write_lock:
return await self._goal_compass.set_goal(
user_id=user_id,
Expand Down Expand Up @@ -1313,10 +1303,10 @@ async def _add_chunk_task(chunk, source_context, record_ref):
chunk_metadata = dict(base_metadata)
chunk_metadata.update(source_context)
chunk_metadata.update(chunk.metadata)

# Map chunk.source_type to media_type if possible
media_type = chunk.source_type if chunk.source_type in ["image", "audio", "video"] else "text"

async with semaphore:
try:
add_result = await self.add(
Expand All @@ -1327,7 +1317,7 @@ async def _add_chunk_task(chunk, source_context, record_ref):
provenance=Provenance.INGESTED,
media_type=media_type,
)

if add_result.get("event") in {"DEDUP_SKIP", "CONFLICT_SKIP"}:
skipped_chunks += 1
record_ref["chunks_skipped"] += 1
Expand All @@ -1354,12 +1344,12 @@ async def _add_chunk_task(chunk, source_context, record_ref):
"chunks_failed": 0,
}
source_payloads.append(source_record)

if source_result.status != "processed":
continue

source_context = source_context_by_path.get(source_result.source_path, {})

# Create tasks for all chunks in this source
tasks = [
_add_chunk_task(chunk, source_context, source_record)
Expand Down Expand Up @@ -1765,7 +1755,7 @@ def _update_graph():
ns = record.namespace
self._graph.delete_memory_references(record.id)
self._graph.add_memory_node(
record.id,
record.id,
extraction.summary or data[:200],
user_id=uid, namespace=ns
)
Expand All @@ -1784,7 +1774,7 @@ def _update_graph():
user_id=uid,
namespace=ns,
)

def _update_bm25():
if data is not None:
uid = (record.metadata or {}).get("user_id", "global")
Expand Down Expand Up @@ -2372,7 +2362,7 @@ async def get_temporal_knowledge(
self._check_initialized()
if not self._temporal_kg:
return []

ts = float(timestamp) if timestamp is not None else time.time()
# This is a read operation, usually fast, but we can offload if needed.
# Kuzu reads are blocking, so offload to thread.
Expand Down
Loading
Loading