diff --git a/CHANGELOG.md b/CHANGELOG.md index 76cc61b3..c9de097a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Fixed +- fix(memory): run `PRAGMA wal_checkpoint(PASSIVE)` after FTS5 entity inserts to fix cross-session SYNAPSE seed lookup (#2166); checkpoint is called at `SqliteStore` startup (safety net) and after every `EntityResolver::resolve_batch` (targeted hook) - fix(config): add `[security.guardrail]` stub to `default.toml` so `--migrate-config` injects commented guardrail defaults for configs that have `[security]` but no `[security.guardrail]` (#2158) - ci: increase publish-crates timeout from 20 to 60 minutes and add `no-verify: true` to skip recompilation during publish (workspace has 21 crates; sequential publish with 15 s delays exceeded the previous limit) diff --git a/crates/zeph-memory/src/graph/store/mod.rs b/crates/zeph-memory/src/graph/store/mod.rs index c05dc5f4..21987fdf 100644 --- a/crates/zeph-memory/src/graph/store/mod.rs +++ b/crates/zeph-memory/src/graph/store/mod.rs @@ -203,6 +203,22 @@ impl GraphStore { .collect::, _>>() } + /// Flush the `SQLite` WAL to the main database file. + /// + /// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active + /// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are + /// visible to connections opened in future sessions. + /// + /// # Errors + /// + /// Returns an error if the PRAGMA execution fails. + pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> { + sqlx::query("PRAGMA wal_checkpoint(PASSIVE)") + .execute(&self.pool) + .await?; + Ok(()) + } + /// Stream all entities from the database incrementally (true cursor, no full-table load). pub fn all_entities_stream(&self) -> impl Stream> + '_ { use futures::StreamExt as _; diff --git a/crates/zeph-memory/src/graph/store/tests.rs b/crates/zeph-memory/src/graph/store/tests.rs index 0921ffb9..41e5cdf5 100644 --- a/crates/zeph-memory/src/graph/store/tests.rs +++ b/crates/zeph-memory/src/graph/store/tests.rs @@ -2793,3 +2793,34 @@ async fn bfs_typed_entity_type_filter() { "B must not be reachable via semantic filter when only entity edge exists" ); } + +/// Regression test for FTS5+WAL cross-session visibility (issue #2166). +/// +/// Entities inserted via `upsert_entity` in one pool must be found by `find_entities_fuzzy` +/// in a new pool opened on the same file after the first pool is dropped. +/// Without `checkpoint_wal`, FTS5 shadow table writes buffered in the WAL are not visible +/// to a fresh connection, causing SYNAPSE to return zero seeds. +#[tokio::test] +async fn fts5_cross_session_visibility_after_checkpoint() { + let file = tempfile::NamedTempFile::new().expect("tempfile"); + let path = file.path().to_str().expect("valid path").to_string(); + + // Session A: open store, insert entity, checkpoint, drop pool. + { + let store_a = SqliteStore::new(&path).await.unwrap(); + let gs_a = GraphStore::new(store_a.pool().clone()); + gs_a.upsert_entity("Rust", "rust", EntityType::Concept, None) + .await + .unwrap(); + gs_a.checkpoint_wal().await.unwrap(); + } + + // Session B: new pool on same file — entity must be visible via FTS5. + let store_b = SqliteStore::new(&path).await.unwrap(); + let gs_b = GraphStore::new(store_b.pool().clone()); + let results = gs_b.find_entities_fuzzy("Rust", 10).await.unwrap(); + assert!( + !results.is_empty(), + "FTS5 cross-session: entity inserted in session A must be visible in session B after WAL checkpoint" + ); +} diff --git a/crates/zeph-memory/src/semantic/graph.rs b/crates/zeph-memory/src/semantic/graph.rs index 7bd285ae..33c8c74d 100644 --- a/crates/zeph-memory/src/semantic/graph.rs +++ b/crates/zeph-memory/src/semantic/graph.rs @@ -387,6 +387,8 @@ pub async fn extract_and_store( } } + store.checkpoint_wal().await?; + let new_entity_ids: Vec = entity_name_to_id.into_values().collect(); Ok(ExtractionResult { @@ -516,6 +518,50 @@ mod tests { "qdrant_point_id must remain None when no embedding_store is provided" ); } + + /// Regression test for #2166: FTS5 entity writes must be visible to a new connection pool + /// opened after extraction completes. Without `checkpoint_wal()` in `extract_and_store`, + /// a fresh pool sees stale FTS5 shadow tables and `find_entities_fuzzy` returns empty. + #[tokio::test] + async fn extract_and_store_fts5_cross_session_visibility() { + let file = tempfile::NamedTempFile::new().expect("tempfile"); + let path = file.path().to_str().expect("valid path").to_string(); + + // Session A: run extract_and_store on a file DB (not :memory:) so WAL is used. + { + let sqlite = crate::sqlite::SqliteStore::new(&path).await.unwrap(); + let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#; + let mock = + zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]); + let provider = AnyProvider::Mock(mock); + let config = GraphExtractionConfig { + max_entities: 10, + max_edges: 10, + extraction_timeout_secs: 10, + ..Default::default() + }; + extract_and_store( + "Ferris is the Rust mascot.".to_owned(), + vec![], + provider, + sqlite.pool().clone(), + config, + None, + None, + ) + .await + .unwrap(); + } + + // Session B: new pool — FTS5 must see the entity extracted in session A. + let sqlite_b = crate::sqlite::SqliteStore::new(&path).await.unwrap(); + let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone()); + let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap(); + assert!( + !results.is_empty(), + "FTS5 cross-session (#2166): entity extracted in session A must be visible in session B" + ); + } } impl SemanticMemory { diff --git a/crates/zeph-memory/src/sqlite/mod.rs b/crates/zeph-memory/src/sqlite/mod.rs index 62bd49e5..ea2da013 100644 --- a/crates/zeph-memory/src/sqlite/mod.rs +++ b/crates/zeph-memory/src/sqlite/mod.rs @@ -78,6 +78,12 @@ impl SqliteStore { sqlx::migrate!("./migrations").run(&pool).await?; + if path != ":memory:" { + sqlx::query("PRAGMA wal_checkpoint(PASSIVE)") + .execute(&pool) + .await?; + } + Ok(Self { pool }) }