Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

### Fixed

- fix(memory): run `PRAGMA wal_checkpoint(PASSIVE)` after FTS5 entity inserts to fix cross-session SYNAPSE seed lookup (#2166); checkpoint is called at `SqliteStore` startup (safety net) and after every `EntityResolver::resolve_batch` (targeted hook)
- fix(config): add `[security.guardrail]` stub to `default.toml` so `--migrate-config` injects commented guardrail defaults for configs that have `[security]` but no `[security.guardrail]` (#2158)
- ci: increase publish-crates timeout from 20 to 60 minutes and add `no-verify: true` to skip recompilation during publish (workspace has 21 crates; sequential publish with 15 s delays exceeded the previous limit)

Expand Down
16 changes: 16 additions & 0 deletions crates/zeph-memory/src/graph/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,22 @@ impl GraphStore {
.collect::<Result<Vec<_>, _>>()
}

/// Flush the `SQLite` WAL to the main database file.
///
/// Runs `PRAGMA wal_checkpoint(PASSIVE)`. Safe to call at any time; does not block active
/// readers or writers. Call after bulk entity inserts to ensure FTS5 shadow table writes are
/// visible to connections opened in future sessions.
///
/// # Errors
///
/// Returns an error if the PRAGMA execution fails.
pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
sqlx::query("PRAGMA wal_checkpoint(PASSIVE)")
.execute(&self.pool)
.await?;
Ok(())
}

/// Stream all entities from the database incrementally (true cursor, no full-table load).
pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
use futures::StreamExt as _;
Expand Down
31 changes: 31 additions & 0 deletions crates/zeph-memory/src/graph/store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2793,3 +2793,34 @@ async fn bfs_typed_entity_type_filter() {
"B must not be reachable via semantic filter when only entity edge exists"
);
}

/// Regression test for FTS5+WAL cross-session visibility (issue #2166).
///
/// Entities inserted via `upsert_entity` in one pool must be found by `find_entities_fuzzy`
/// in a new pool opened on the same file after the first pool is dropped.
/// Without `checkpoint_wal`, FTS5 shadow table writes buffered in the WAL are not visible
/// to a fresh connection, causing SYNAPSE to return zero seeds.
#[tokio::test]
async fn fts5_cross_session_visibility_after_checkpoint() {
let file = tempfile::NamedTempFile::new().expect("tempfile");
let path = file.path().to_str().expect("valid path").to_string();

// Session A: open store, insert entity, checkpoint, drop pool.
{
let store_a = SqliteStore::new(&path).await.unwrap();
let gs_a = GraphStore::new(store_a.pool().clone());
gs_a.upsert_entity("Rust", "rust", EntityType::Concept, None)
.await
.unwrap();
gs_a.checkpoint_wal().await.unwrap();
}

// Session B: new pool on same file — entity must be visible via FTS5.
let store_b = SqliteStore::new(&path).await.unwrap();
let gs_b = GraphStore::new(store_b.pool().clone());
let results = gs_b.find_entities_fuzzy("Rust", 10).await.unwrap();
assert!(
!results.is_empty(),
"FTS5 cross-session: entity inserted in session A must be visible in session B after WAL checkpoint"
);
}
46 changes: 46 additions & 0 deletions crates/zeph-memory/src/semantic/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ pub async fn extract_and_store(
}
}

store.checkpoint_wal().await?;

let new_entity_ids: Vec<i64> = entity_name_to_id.into_values().collect();

Ok(ExtractionResult {
Expand Down Expand Up @@ -516,6 +518,50 @@ mod tests {
"qdrant_point_id must remain None when no embedding_store is provided"
);
}

/// Regression test for #2166: FTS5 entity writes must be visible to a new connection pool
/// opened after extraction completes. Without `checkpoint_wal()` in `extract_and_store`,
/// a fresh pool sees stale FTS5 shadow tables and `find_entities_fuzzy` returns empty.
#[tokio::test]
async fn extract_and_store_fts5_cross_session_visibility() {
let file = tempfile::NamedTempFile::new().expect("tempfile");
let path = file.path().to_str().expect("valid path").to_string();

// Session A: run extract_and_store on a file DB (not :memory:) so WAL is used.
{
let sqlite = crate::sqlite::SqliteStore::new(&path).await.unwrap();
let extraction_json = r#"{"entities":[{"name":"Ferris","type":"concept","summary":"Rust mascot"}],"edges":[]}"#;
let mock =
zeph_llm::mock::MockProvider::with_responses(vec![extraction_json.to_owned()]);
let provider = AnyProvider::Mock(mock);
let config = GraphExtractionConfig {
max_entities: 10,
max_edges: 10,
extraction_timeout_secs: 10,
..Default::default()
};
extract_and_store(
"Ferris is the Rust mascot.".to_owned(),
vec![],
provider,
sqlite.pool().clone(),
config,
None,
None,
)
.await
.unwrap();
}

// Session B: new pool — FTS5 must see the entity extracted in session A.
let sqlite_b = crate::sqlite::SqliteStore::new(&path).await.unwrap();
let gs_b = crate::graph::GraphStore::new(sqlite_b.pool().clone());
let results = gs_b.find_entities_fuzzy("Ferris", 10).await.unwrap();
assert!(
!results.is_empty(),
"FTS5 cross-session (#2166): entity extracted in session A must be visible in session B"
);
}
}

impl SemanticMemory {
Expand Down
6 changes: 6 additions & 0 deletions crates/zeph-memory/src/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ impl SqliteStore {

sqlx::migrate!("./migrations").run(&pool).await?;

if path != ":memory:" {
sqlx::query("PRAGMA wal_checkpoint(PASSIVE)")
.execute(&pool)
.await?;
}

Ok(Self { pool })
}

Expand Down
Loading