diff --git a/FEATURE_PARITY.md b/FEATURE_PARITY.md index 634131fca..2a85da9ce 100644 --- a/FEATURE_PARITY.md +++ b/FEATURE_PARITY.md @@ -336,7 +336,7 @@ This document tracks feature parity between IronClaw (Rust implementation) and O | Feature | OpenClaw | IronClaw | Notes | |---------|----------|----------|-------| -| Vector memory | ✅ | ✅ | pgvector | +| Vector memory | ✅ | ✅ | PostgreSQL uses pgvector; libSQL uses indexed vector search when available and brute-force cosine fallback after V9 | | Session-based memory | ✅ | ✅ | | | Hybrid search (BM25 + vector) | ✅ | ✅ | RRF algorithm | | Temporal decay (hybrid search) | ✅ | ❌ | Opt-in time-based scoring factor | diff --git a/docs/configuration-guide.md b/docs/configuration-guide.md index 46949040f..a7ac83e29 100644 --- a/docs/configuration-guide.md +++ b/docs/configuration-guide.md @@ -268,6 +268,13 @@ Table 18. Database and secrets environment variables. | `LIBSQL_AUTH_TOKEN` | Auth token for `LIBSQL_URL`. | Required when `LIBSQL_URL` is set. | | `SECRETS_MASTER_KEY` | Master key for encrypted secrets storage. | Optional, but must be at least 32 bytes when set. If omitted, axinite falls back to the operating-system keychain when available. | +When workspace memory search is enabled, backend choice affects how semantic +retrieval runs. PostgreSQL uses pgvector cosine-distance queries. libSQL uses +indexed `vector_top_k(...)` only when a compatible fixed-dimension vector +index exists; otherwise it falls back to brute-force cosine similarity in +Rust. `ironclaw doctor` and `ironclaw status` surface the active search mode. +See [database integrations](database-integrations.md) for the backend trade-offs. + ### 4.3 Agent runtime, safety, routines, heartbeat, hygiene, skills, and builder mode Table 19. Core runtime behaviour variables. diff --git a/docs/database-integrations.md b/docs/database-integrations.md index a7e286909..4f5acf194 100644 --- a/docs/database-integrations.md +++ b/docs/database-integrations.md @@ -226,8 +226,9 @@ explicit `BEGIN IMMEDIATE` block. ### 4.4 Workspace search path -libSQL mirrors the same workspace concepts as PostgreSQL, but the implemented -search path is narrower. +libSQL mirrors the same workspace concepts as PostgreSQL, but the vector path +uses a different implementation strategy after the flexible-dimension +migration. Table 3. libSQL workspace search components. @@ -236,28 +237,32 @@ Table 3. libSQL workspace search components. | Document store | `memory_documents` table | | Chunk store | `memory_chunks` table with `embedding BLOB` | | Full-text search | `memory_chunks_fts` FTS5 virtual table plus maintenance triggers | -| Semantic search | Best-effort `vector_top_k(...)` query when a compatible vector index exists | +| Semantic search | `vector_top_k(...)` when a compatible index exists, otherwise brute-force cosine similarity in Rust | | Fusion strategy | RRF in Rust, same as PostgreSQL | -The current implementation quirk is important: +The important implementation detail is how libSQL behaves after the V9 +flexible-dimension migration: -- the libSQL schema and V9 migration comments describe a brute-force vector - fallback after flexible dimensions remove the fixed-dimension index -- the live code in `src/db/libsql/workspace.rs` does not implement that - brute-force fallback -- instead it attempts `vector_top_k('idx_memory_chunks_embedding', ...)` - and, when that query fails as expected after V9 drops the index, it logs a - debug message and returns no vector results +- the fixed-dimension `libsql_vector_idx` index is dropped because it cannot + support arbitrary embedding lengths +- the live code first attempts `vector_top_k('idx_memory_chunks_embedding', ...)` +- when that indexed query is unavailable, the repository logs that it is using + brute-force vector search and computes cosine similarity in Rust across the + stored embedding blobs +- the result stream still feeds the same Reciprocal Rank Fusion (RRF) path as + PostgreSQL In practical terms, a migrated or freshly bootstrapped libSQL workspace currently behaves as: - FTS5 keyword search always available -- vector results only when a compatible vector index exists -- FTS-only search after the normal flexible-dimension migration path +- semantic retrieval still available after V9 through brute-force cosine + similarity +- hybrid search still combines keyword and semantic results, but libSQL pays a + linear scan cost where PostgreSQL can use pgvector operations -That is the most significant behavioural gap between the two backends in the -current code. +That means the main backend difference is now performance and implementation +strategy, not silent loss of semantic recall. ### 4.5 Satellite stores on libSQL @@ -330,14 +335,15 @@ Table 4. Current backend comparison. | Migration engine | `refinery` over numbered SQL files | Consolidated schema plus `_migrations`-tracked incremental Rust-side runner | | Secrets and WASM satellite stores | Reuse cloned pool handles | Reuse shared database handle, then open fresh connections | | Keyword search | PostgreSQL `tsvector` plus GIN | FTS5 virtual table plus triggers | -| Vector search | pgvector cosine distance, now without the old HNSW index after V9 | Best-effort only; current migrated path is effectively FTS-only | +| Vector search | pgvector cosine distance, now without the old HNSW index after V9 | Indexed `vector_top_k(...)` when available, otherwise brute-force cosine similarity in Rust | | Best fit | Full default deployment with richer search parity | Embedded, local-first, or low-ops deployment where external PostgreSQL is undesirable | ### 6.1 When PostgreSQL is the safer choice Choose PostgreSQL when: -- full hybrid workspace search quality matters +- full hybrid workspace search quality and search latency under larger + workspaces matter - the deployment already has PostgreSQL 15+ with pgvector available - query behaviour should match the default and most-tested path as closely as possible @@ -350,10 +356,9 @@ Choose libSQL when: - the deployment is local-first or edge-style - Turso replica mode is desirable, but a full PostgreSQL service is not -The main caveat is the current workspace-search trade-off. libSQL is not -merely "the same database API with a different wire protocol". In the current -implementation it is a simpler persistence backend with weaker semantic-search -behaviour after the flexible-dimension migration path. +The main caveat is now performance rather than capability. libSQL still offers +hybrid retrieval, but its post-V9 semantic path can require a brute-force scan +over all candidate embeddings for that workspace scope. ## 7. Current implementation caveats @@ -362,8 +367,9 @@ behaviour after the flexible-dimension migration path. handshake. 2. PostgreSQL still supports vector search after V9, but no longer through the old fixed-dimension HNSW index. -3. libSQL migration and schema comments still describe a brute-force vector - fallback that the current code does not implement. +3. libSQL falls back to brute-force cosine similarity in Rust when + `vector_top_k(...)` cannot run because the fixed-dimension vector index is + absent after the V9 migration. 4. Workspace memory and memory tools are absent in `--no-db` mode because the host does not build a workspace without a database. diff --git a/docs/developers-guide.md b/docs/developers-guide.md index 287a30cf3..48156c7dc 100644 --- a/docs/developers-guide.md +++ b/docs/developers-guide.md @@ -319,6 +319,68 @@ Meaning: PostgreSQL connection URL used by the app. Default or rule: +### libSQL test databases + +Unit tests that exercise the libSQL backend call +`LibSqlBackend::new_memory()` rather than `new_local()`. `new_memory()` +creates a UUID-named file in the OS temp directory so that multiple +connections within a single test share state, matching production semantics. +The shared database handle removes that file and its `-wal`/`-shm` sidecars +automatically when the final clone is dropped, so tests should not leave +artefacts behind on disk. + +Do **not** use `new_local()` in unit tests; reserve it for integration tests +or tests that specifically require filesystem-path behaviour. + + +### LibSqlDatabase shared handles + +`LibSqlBackend` owns an `Arc` rather than a raw libSQL +database handle. That wrapper exists for two reasons: + +- satellite stores such as the secrets and WASM stores can call + `shared_db()` and open their own per-operation connections without + reopening a different database +- temp-file-backed test databases created by `new_memory()` keep their + cleanup metadata on the shared handle, so the `.db`, `-wal`, and `-shm` + files live until the final shared owner is dropped + +If a constructor or store used to accept a backend directly and now accepts +`Arc`, that is usually a signal that it should share the same +underlying file while creating its own connections via +`LibSqlDatabase::connect()`. + +### Type-change propagation through store constructors + +The `Arc` → `Arc` change propagates to +every store that previously held a raw `Arc`. Each +affected constructor now accepts `Arc`: + +| Store | Field | Constructor parameter | +| --- | --- | --- | +| `LibSqlSecretsStore` | `db: Arc` | `new(db: Arc, …)` | +| `LibSqlWasmChannelStore` | `db: Arc` | `new(db: Arc)` | +| `LibSqlWasmToolStore` | `db: Arc` | `new(db: Arc)` | + +The shared handle is obtained at startup via `LibSqlBackend::shared_db()`, +which now returns `Arc` instead of +`Arc`: + +```rust +// Obtaining the shared handle (unchanged call site): +let db: Arc = backend.shared_db(); + +// Constructing a store with the shared handle: +let secrets_store = LibSqlSecretsStore::new(Arc::clone(&db), crypto); +let channel_store = LibSqlWasmChannelStore::new(Arc::clone(&db)); +let tool_store = LibSqlWasmToolStore::new(Arc::clone(&db)); +``` + +The `busy_timeout` PRAGMA that each store previously ran after connecting +is now applied once inside `LibSqlDatabase::connect()`, so it is no longer +necessary — and must not be duplicated — in individual store +`connect()` methods. + ## Dispatcher Architecture The dispatcher orchestrates interactive chat turns by preparing an LLM @@ -725,6 +787,40 @@ When those changes land, this guide must be updated in the same branch so local setup instructions stay truthful. +### WebhookServer test helpers + +`WebhookServer` exposes two `#[cfg(test)]`-only methods to eliminate +port-allocation races: + +- `start_with_listener(listener: TcpListener)` — accepts a pre-bound + listener, merges queued route fragments, resolves the live listener + address, and spawns the server. +- `restart_with_listener(listener: TcpListener)` — shuts the current server + down, resolves the new listener's address, and spawns a fresh server. + +Tests should pre-bind via `TcpListener::bind("127.0.0.1:0")` and pass the +result to these helpers instead of relying on `start()` / +`restart_with_addr()` to pick a free port. + + +### Workspace store module structure + +The libSQL workspace store is split by concern under +`src/db/libsql/workspace/`: + +- `mod.rs` owns the `NativeWorkspaceStore` implementation and hybrid-search + orchestration +- `document_ops.rs` owns document CRUD and directory-style listing helpers +- `chunk_ops.rs` owns chunk insertion, embedding updates, and chunk polling +- `fts.rs` owns FTS-only ranking queries +- `vector_search.rs` owns vector-index and brute-force similarity helpers +- `tests.rs` keeps cross-module integration coverage for the hybrid pipeline + +Prefer adding logic beside the feature it serves rather than growing +`mod.rs`. Module-local tests should live with the module they exercise, while +pipeline tests belong in `workspace/tests.rs`. + + ### Key APIs - `RunLoopCtx`: per-run container that carries the session handle, @@ -761,3 +857,14 @@ export DATABASE_URL=postgres://localhost/ironclaw Adjust the connection string if the local PostgreSQL instance requires a different host, user, or password. + +### Parameter-object structs in store helpers + +The workspace helpers use small parameter structs such as `AgentScope`, +`FtsSearchParams`, `VectorSearchQuery`, and `VectorIndexQuery` to keep helper +arity below the repository limit and to make call sites describe intent. + +Use this pattern when a helper repeatedly threads the same related values +through several internal calls. Keep these structs private or `pub(super)` +unless a wider API boundary genuinely needs them, and prefer names that +describe the query or scope they model instead of generic `Options` suffixes. diff --git a/docs/execplans/1-1-4-tests-for-schema-fidelity-and-execution-routing.md b/docs/execplans/1-1-4-tests-for-schema-fidelity-and-execution-routing.md index af1b9fd9e..f89734e5a 100644 --- a/docs/execplans/1-1-4-tests-for-schema-fidelity-and-execution-routing.md +++ b/docs/execplans/1-1-4-tests-for-schema-fidelity-and-execution-routing.md @@ -855,7 +855,7 @@ addressed in a subsequent pass (see progress checklist above). - `src/orchestrator/api/tests/fixtures/remote_tool_mocks.rs`: added `complex_tool_definition()` and `complex_tool_stub()` fixtures for testing full payload fidelity with nested JSON Schema and special characters. -- `src/orchestrator/api/tests/remote_tools.rs`: added three new tests: +- `src/orchestrator/api/tests/catalogue_fidelity.rs`: added three new tests: `remote_tool_catalog_preserves_full_tool_definition_payload`, `remote_tool_catalog_version_is_deterministic_and_sensitive_to_content`, and `orchestrator_responses_deserialize_into_worker_shared_types`. @@ -903,9 +903,11 @@ The implementation added 9 new test functions covering: (milestone 4). All tests use in-process mock servers and fixtures, avoiding external -dependencies. All tests follow existing `rstest` patterns and naming conventions. -The format check (`make check-fmt`) passed after running `cargo fmt --all`. All -validation gates have been run and passed successfully. +dependencies. All tests follow existing `rstest` patterns and naming +conventions. The format check (`make check-fmt`) passed after running +`cargo fmt --all`. The format check, git whitespace check, and full test suite +passed successfully. Markdown linting remained partially blocked by +pre-existing issues in `docs/roadmap.md`. ### Validation evidence @@ -928,10 +930,10 @@ Full test suite passed: 3076 tests passed; 0 failed; 2 ignored (webhook server test fixed to use already-bound address instead of privileged port; worker API types test split into three focused tests per code review). -Markdown linting revealed pre-existing issues in `docs/roadmap.md` unrelated to -this implementation (multiple consecutive blank lines at lines 1342, 1408, 1450, -1489, 1512). The ExecPlan, RFC 0001, and `docs/contents.md` changes introduced -no new Markdown issues. +Markdown linting was only partially green because `docs/roadmap.md` still had +pre-existing issues unrelated to this implementation (multiple consecutive +blank lines at lines 1342, 1408, 1450, 1489, 1512). The ExecPlan, RFC 0001, +and `docs/contents.md` changes introduced no new Markdown issues. ### Retrospective observations diff --git a/docs/users-guide.md b/docs/users-guide.md index 2a1352577..9b566eada 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -91,3 +91,29 @@ These notices are advisory. A success message means the runtime moved the job back into its normal retry path. A permanent failure or manual-intervention message means the runtime could not finish recovery automatically, and the operator should inspect the job or tool state before retrying work. + + +## Workspace memory search + +When workspace memory is enabled, the search backend differs by database: + +- **PostgreSQL** — performs pgvector cosine-distance queries directly. +- **libSQL / Turso** — attempts an indexed `vector_top_k(...)` query when a + compatible fixed-dimension vector index exists. After the V9 migration + (which removed the fixed-dimension index in favour of flexible-dimension + vector storage, with `memory_chunks.embedding` stored as a + flexible vector), the backend automatically falls back to brute-force + cosine similarity computed in Rust. Results from both paths feed into the + same Reciprocal Rank Fusion (RRF) pipeline, so hybrid full-text search + (FTS) + vector retrieval is preserved. + +To determine which search mode is active for a workspace, run: + +```text +ironclaw doctor +ironclaw status +``` + +Both commands report whether indexed or brute-force vector retrieval is +currently in use. See `docs/database-integrations.md` for backend trade-offs +and performance considerations. diff --git a/src/channels/wasm/storage.rs b/src/channels/wasm/storage.rs index 3f3798642..63e71ed86 100644 --- a/src/channels/wasm/storage.rs +++ b/src/channels/wasm/storage.rs @@ -351,24 +351,19 @@ fn pg_row_to_channel( /// matching the connection-per-request pattern used by the main `LibSqlBackend`. #[cfg(feature = "libsql")] pub struct LibSqlWasmChannelStore { - db: std::sync::Arc, + db: std::sync::Arc, } #[cfg(feature = "libsql")] impl LibSqlWasmChannelStore { - pub fn new(db: std::sync::Arc) -> Self { + pub fn new(db: std::sync::Arc) -> Self { Self { db } } async fn connect(&self) -> Result { - let conn = self - .db - .connect() - .map_err(|e| WasmChannelStoreError::Database(format!("Connection failed: {}", e)))?; - conn.query("PRAGMA busy_timeout = 5000", ()) - .await - .map_err(|e| { - WasmChannelStoreError::Database(format!("Failed to set busy_timeout: {}", e)) + let conn = + self.db.connect().await.map_err(|e| { + WasmChannelStoreError::Database(format!("Connection failed: {}", e)) })?; Ok(conn) } diff --git a/src/channels/webhook_server.rs b/src/channels/webhook_server.rs index 2a43f9ff4..eaa362934 100644 --- a/src/channels/webhook_server.rs +++ b/src/channels/webhook_server.rs @@ -23,6 +23,7 @@ pub struct WebhookServerConfig { /// `start()` call binds the listener and spawns the server task. pub struct WebhookServer { config: WebhookServerConfig, + resolved_addr: Option, routes: Vec, /// Merged router saved after start() for restart_with_addr(). merged_router: Option, @@ -35,6 +36,7 @@ impl WebhookServer { pub fn new(config: WebhookServerConfig) -> Self { Self { config, + resolved_addr: None, routes: Vec::new(), merged_router: None, shutdown_tx: None, @@ -58,28 +60,6 @@ impl WebhookServer { self.bind_and_spawn(app).await } - /// Bind using an already-bound [`tokio::net::TcpListener`], merge all route - /// fragments, and spawn the server. The listener's local address is stored - /// in `config.addr` so `current_addr()` stays accurate. - pub async fn start_with_listener( - &mut self, - listener: tokio::net::TcpListener, - ) -> Result<(), ChannelError> { - let addr = listener - .local_addr() - .map_err(|e| ChannelError::StartupFailed { - name: "webhook_server".to_string(), - reason: format!("local_addr failed: {e}"), - })?; - self.config.addr = addr; - let mut app = Router::new(); - for fragment in self.routes.drain(..) { - app = app.merge(fragment); - } - self.merged_router = Some(app.clone()); - self.spawn_on_listener(listener, app).await - } - /// Bind a listener to the configured address and spawn the server task. /// Private helper used by both start() and restart_with_addr(). async fn bind_and_spawn(&mut self, app: Router) -> Result<(), ChannelError> { @@ -87,59 +67,47 @@ impl WebhookServer { .await .map_err(|e| ChannelError::StartupFailed { name: "webhook_server".to_string(), - reason: format!("Failed to bind to {}: {e}", self.config.addr), + reason: format!("Failed to bind to {}: {}", self.config.addr, e), })?; - let addr = listener + + let resolved_addr = listener .local_addr() .map_err(|e| ChannelError::StartupFailed { name: "webhook_server".to_string(), - reason: format!("local_addr failed: {e}"), + reason: format!("Failed to get listener local address: {e}"), })?; - self.config.addr = addr; - self.spawn_on_listener(listener, app).await - } + self.resolved_addr = Some(resolved_addr); - /// Spawn the server on an already-bound listener. - /// Private helper that contains the common shutdown-channel and task-spawn logic. - async fn spawn_on_listener( - &mut self, - listener: tokio::net::TcpListener, - app: Router, - ) -> Result<(), ChannelError> { - tracing::info!("Webhook server listening on {}", self.config.addr); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - self.shutdown_tx = Some(shutdown_tx); - let handle = tokio::spawn(async move { - if let Err(e) = axum::serve(listener, app) - .with_graceful_shutdown(async { - let _ = shutdown_rx.await; - tracing::debug!("Webhook server shutting down"); - }) - .await - { - tracing::error!("Webhook server error: {e}"); - } - }); - self.handle = Some(handle); + self.spawn_with_listener(listener, app, resolved_addr); Ok(()) } - /// Shared restart kernel. Saves current listener state, spawns the server on - /// `listener` bound at `new_addr`, shuts down the old server on success, or - /// restores the previous state on failure. - async fn swap_listener( - &mut self, - new_addr: SocketAddr, - listener: tokio::net::TcpListener, - app: Router, - ) -> Result<(), ChannelError> { + /// Gracefully shut down the current listener and rebind to a new address. + /// The merged router from the original `start()` call is reused. + /// + /// If binding to the new address fails, the old listener remains active and + /// state is restored. This prevents a denial-of-service if the new address + /// is invalid or already in use. + pub async fn restart_with_addr(&mut self, new_addr: SocketAddr) -> Result<(), ChannelError> { + let app = self + .merged_router + .clone() + .ok_or_else(|| ChannelError::StartupFailed { + name: "webhook_server".to_string(), + reason: "restart_with_addr called before start()".to_string(), + })?; + + // Save old state for rollback if new bind fails let old_addr = self.config.addr; + let old_resolved_addr = self.resolved_addr; let old_shutdown_tx = self.shutdown_tx.take(); let old_handle = self.handle.take(); + // Update config to new address and try to bind self.config.addr = new_addr; - match self.spawn_on_listener(listener, app).await { + match self.bind_and_spawn(app).await { Ok(()) => { + // New listener is running, gracefully shut down the old one if let Some(tx) = old_shutdown_tx { let _ = tx.send(()); } @@ -149,7 +117,9 @@ impl WebhookServer { Ok(()) } Err(e) => { + // Restore old state; old listener remains active self.config.addr = old_addr; + self.resolved_addr = old_resolved_addr; self.shutdown_tx = old_shutdown_tx; self.handle = old_handle; Err(e) @@ -157,40 +127,102 @@ impl WebhookServer { } } - /// Gracefully shut down the current listener and rebind to a new address. - /// The merged router from the original `start()` call is reused. + /// Return the current listener address. /// - /// If binding to the new address fails, the old listener remains active and - /// state is restored. This prevents a denial-of-service if the new address - /// is invalid or already in use. - pub async fn restart_with_addr(&mut self, new_addr: SocketAddr) -> Result<(), ChannelError> { - let app = self - .merged_router - .clone() - .ok_or_else(|| ChannelError::StartupFailed { - name: "webhook_server".to_string(), - reason: "restart_with_addr called before start()".to_string(), - })?; + /// Before the first successful [`Self::start`], start_with_listener, + /// [`Self::restart_with_addr`], or restart_with_listener call, this + /// returns the configured bind address from `self.config.addr` and it may + /// not correspond to a live listener. After a successful start or restart, + /// it returns the resolved listener address, including any OS-assigned port + /// chosen for `:0` binds, while leaving `self.config.addr` unchanged. + pub fn current_addr(&self) -> SocketAddr { + self.resolved_addr.unwrap_or(self.config.addr) + } - let listener = tokio::net::TcpListener::bind(new_addr).await.map_err(|e| { - ChannelError::StartupFailed { - name: "webhook_server".to_string(), - reason: format!("Failed to bind to {new_addr}: {e}"), - } - })?; - let addr = listener + /// Returns whether the server currently has a running listener task. + pub fn is_running(&self) -> bool { + self.handle + .as_ref() + .map(|handle| !handle.is_finished()) + .unwrap_or(false) + } + + /// Signal graceful shutdown and wait for the server task to finish. + pub async fn shutdown(&mut self) { + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + if let Some(handle) = self.handle.take() { + let _ = handle.await; + } + } + + /// Accept a pre-bound listener, merge route fragments, and spawn the + /// server. + /// + /// Unlike [`Self::start`], this test-only entrypoint accepts a listener + /// that is already bound, eliminating the TOCTOU window between external + /// test port allocation and the server bind. That makes it suitable for + /// tests that reserve ephemeral ports before handing ownership to the + /// server. + pub async fn start_with_listener( + &mut self, + listener: tokio::net::TcpListener, + ) -> Result<(), ChannelError> { + let mut app = Router::new(); + for fragment in self.routes.drain(..) { + app = app.merge(fragment); + } + self.merged_router = Some(app.clone()); + + let local_addr = listener .local_addr() .map_err(|e| ChannelError::StartupFailed { name: "webhook_server".to_string(), - reason: format!("local_addr failed: {e}"), + reason: format!("Failed to get listener local address: {e}"), })?; + self.resolved_addr = Some(local_addr); + + self.spawn_with_listener(listener, app, local_addr); + Ok(()) + } + + /// Spawn the axum server on an already-bound listener. + fn spawn_with_listener( + &mut self, + listener: tokio::net::TcpListener, + app: Router, + addr: SocketAddr, + ) { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + self.shutdown_tx = Some(shutdown_tx); + + let handle = tokio::spawn(async move { + if let Err(e) = axum::serve(listener, app) + .with_graceful_shutdown(async { + let _ = shutdown_rx.await; + tracing::debug!("Webhook server shutting down"); + }) + .await + { + tracing::error!("Webhook server error: {}", e); + } + }); - self.swap_listener(addr, listener, app).await + tracing::info!("Webhook server listening on {}", addr); + self.handle = Some(handle); } - /// Shut down the running server and restart it on the already-bound - /// `listener`, inheriting all previously added routes from - /// `self.merged_router`. + /// Gracefully shut down the current listener and rebind using a pre-bound + /// listener. Eliminates the TOCTOU window between port reservation and + /// bind, making it suitable for tests. + /// + /// Unlike [`restart_with_addr`], this test-only helper does not support + /// rollback. [`restart_with_addr`] binds the replacement first and can keep + /// the old listener alive if that bind fails. This method shuts down the + /// old listener before calling [`Self::spawn_with_listener`], so there is + /// no rollback path if spawning were to fail. That trade-off is acceptable + /// in tests because [`Self::spawn_with_listener`] is infallible. pub async fn restart_with_listener( &mut self, listener: tokio::net::TcpListener, @@ -203,39 +235,283 @@ impl WebhookServer { reason: "restart_with_listener called before start()".to_string(), })?; - // Extract address from the provided listener before mutating self, - // so that old_addr, old_shutdown_tx and old_handle remain intact - // until we know local_addr() succeeds. - let addr = listener + let new_addr = listener .local_addr() .map_err(|e| ChannelError::StartupFailed { name: "webhook_server".to_string(), - reason: format!("local_addr failed: {e}"), + reason: format!("Failed to get listener local address: {e}"), })?; - self.swap_listener(addr, listener, app).await + // Stop the old listener before spawning the new one. Unlike + // restart_with_addr, we do not provide rollback semantics because the + // new listener is already bound and assumed to be valid. + self.shutdown().await; + + self.resolved_addr = Some(new_addr); + self.spawn_with_listener(listener, app, new_addr); + Ok(()) } +} - /// Return the current bind address. - pub fn current_addr(&self) -> SocketAddr { - self.config.addr +#[cfg(test)] +mod tests { + use std::net::TcpListener as StdTcpListener; + + use axum::Json; + use rstest::{fixture, rstest}; + use serde_json::json; + + use super::*; + + /// A started webhook server with a `/health` route and a pre-built client. + struct StartedWebhookServer { + server: WebhookServer, + client: reqwest::Client, } - /// Returns whether the server currently has a running listener task. - pub fn is_running(&self) -> bool { - self.handle - .as_ref() - .map(|handle| !handle.is_finished()) - .unwrap_or(false) + /// Bind an ephemeral localhost listener and keep it reserved until the + /// server takes ownership, eliminating port-probe races in tests. + async fn bind_ephemeral_listener() + -> Result<(tokio::net::TcpListener, SocketAddr), Box> { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + Ok((listener, addr)) } - /// Signal graceful shutdown and wait for the server task to finish. - pub async fn shutdown(&mut self) { - if let Some(tx) = self.shutdown_tx.take() { - let _ = tx.send(()); - } - if let Some(handle) = self.handle.take() { - let _ = handle.await; - } + /// Create a [`WebhookServer`] with a `/health` route, start it on a + /// pre-bound ephemeral listener, and return the server and a client. + #[fixture] + async fn started_webhook_server() + -> Result> { + let (listener, _) = bind_ephemeral_listener().await?; + let mut server = WebhookServer::new(WebhookServerConfig { + addr: "127.0.0.1:0".parse()?, + }); + server.add_routes(Router::new().route( + "/health", + axum::routing::get(|| async { Json(json!({"status": "ok"})) }), + )); + server.start_with_listener(listener).await?; + Ok(StartedWebhookServer { + server, + client: reqwest::Client::new(), + }) + } + + #[rstest] + #[tokio::test] + async fn test_start_binds_ephemeral_addr_and_serves_health_check() + -> Result<(), Box> { + let mut server = WebhookServer::new(WebhookServerConfig { + addr: "127.0.0.1:0".parse()?, + }); + server.add_routes(Router::new().route( + "/health", + axum::routing::get(|| async { Json(json!({"status": "ok"})) }), + )); + + server.start().await?; + let addr = server.current_addr(); + + assert_ne!( + addr.port(), + 0, + "Server should resolve an ephemeral bind to a concrete port" + ); + + let response = reqwest::Client::new() + .get(format!("http://{}/health", addr)) + .send() + .await?; + assert_eq!( + response.status(), + 200, + "Server should respond to health check after start()" + ); + + server.shutdown().await; + Ok(()) + } + + #[rstest] + #[tokio::test] + async fn test_start_and_restart_with_addr_use_production_bind_path() + -> Result<(), Box> { + let mut server = WebhookServer::new(WebhookServerConfig { + addr: "127.0.0.1:0".parse()?, + }); + server.add_routes(Router::new().route( + "/health", + axum::routing::get(|| async { Json(json!({"status": "ok"})) }), + )); + + server.start().await?; + let addr1 = server.current_addr(); + assert_ne!( + addr1.port(), + 0, + "Server should resolve the initial ephemeral bind to a concrete port" + ); + + let client = reqwest::Client::new(); + let response = client + .get(format!("http://{}/health", addr1)) + .send() + .await?; + assert_eq!( + response.status(), + 200, + "Server should respond to health check after start()" + ); + + let restart_addr: SocketAddr = "127.0.0.1:0".parse()?; + server.restart_with_addr(restart_addr).await?; + let addr2 = server.current_addr(); + + assert_ne!( + addr2.port(), + 0, + "Server should resolve the restarted ephemeral bind to a concrete port" + ); + assert_ne!( + addr1, addr2, + "Address should change after restart_with_addr" + ); + + let old_result = tokio::time::timeout( + std::time::Duration::from_millis(200), + client.get(format!("http://{}/health", addr1)).send(), + ) + .await; + assert!( + matches!(old_result, Err(_) | Ok(Err(_))), + "Old address should not respond after restart_with_addr" + ); + + let response = client + .get(format!("http://{}/health", addr2)) + .send() + .await?; + assert_eq!( + response.status(), + 200, + "Server should respond to health check after restart_with_addr" + ); + + server.shutdown().await; + Ok(()) + } + + #[rstest] + #[tokio::test] + async fn test_restart_with_listener( + #[future] started_webhook_server: Result< + StartedWebhookServer, + Box, + >, + ) -> Result<(), Box> { + let StartedWebhookServer { mut server, client } = started_webhook_server.await?; + let addr1 = server.current_addr(); + + assert_eq!( + server.current_addr(), + addr1, + "Server should be bound to the initial address before restart_with_listener" + ); + + let response = client + .get(format!("http://{}/health", addr1)) + .send() + .await?; + assert_eq!( + response.status(), + 200, + "Server should respond to health check before restart_with_listener" + ); + + let (listener, addr2) = bind_ephemeral_listener().await?; + server.restart_with_listener(listener).await?; + + assert_eq!( + server.current_addr(), + addr2, + "Server address should be updated after restart_with_listener" + ); + assert_ne!( + addr1, addr2, + "Address should change after restart_with_listener" + ); + + let response = client + .get(format!("http://{}/health", addr2)) + .send() + .await?; + assert_eq!( + response.status(), + 200, + "Restarted server should respond to health check on new address" + ); + + let old_result = tokio::time::timeout( + std::time::Duration::from_millis(200), + client.get(format!("http://{}/health", addr1)).send(), + ) + .await; + assert!( + matches!(old_result, Err(_) | Ok(Err(_))), + "Old address should not respond after server restarts" + ); + + server.shutdown().await; + Ok(()) + } + + #[rstest] + #[tokio::test] + async fn test_restart_with_addr_rollback_on_bind_failure( + #[future] started_webhook_server: Result< + StartedWebhookServer, + Box, + >, + ) -> Result<(), Box> { + let StartedWebhookServer { mut server, client } = started_webhook_server.await?; + let addr1 = server.current_addr(); + + let response = client + .get(format!("http://{}/health", addr1)) + .send() + .await?; + assert_eq!(response.status(), 200, "Server should be listening"); + + // Occupy a second port so the restart bind fails deterministically. + let occupied_listener = StdTcpListener::bind("127.0.0.1:0")?; + let conflict_addr = occupied_listener.local_addr()?; + + let result = server.restart_with_addr(conflict_addr).await; + assert!( + result.is_err(), + "Restart with already-bound address should fail" + ); + + drop(occupied_listener); + + let response = client + .get(format!("http://{}/health", addr1)) + .send() + .await?; + assert_eq!( + response.status(), + 200, + "Old listener should still be running after failed restart" + ); + + assert_eq!( + server.current_addr(), + addr1, + "Server address should be restored after failed restart" + ); + + server.shutdown().await; + Ok(()) } } diff --git a/src/db/CLAUDE.md b/src/db/CLAUDE.md index 5e48bd38a..c32e5b6f8 100644 --- a/src/db/CLAUDE.md +++ b/src/db/CLAUDE.md @@ -88,7 +88,7 @@ The `Database` supertrait is composed of seven sub-traits. Leaf consumers can de | Numeric/Decimal | `NUMERIC` | `TEXT` (preserves `rust_decimal` precision) | | Arrays | `TEXT[]` | `TEXT` (JSON-encoded array) | | Booleans | `BOOLEAN` | `INTEGER` (0/1) | -| Vector embeddings | `VECTOR` (any dim, V9 removed fixed 1536) | `F32_BLOB(1536)` via `libsql_vector_idx` | +| Vector embeddings | `VECTOR` (any dim, V9 removed fixed 1536) | `BLOB`; V9 dropped `libsql_vector_idx`, so `vector_top_k(...)` falls back to brute-force cosine similarity in Rust when unavailable | | Full-text search | `tsvector` + `ts_rank_cd` | FTS5 virtual table + sync triggers | | JSON path update | `jsonb_set(col, '{key}', val)` | `json_patch(col, '{"key": val}')` | | PL/pgSQL | Functions | Triggers (no stored procs in SQLite) | @@ -103,7 +103,7 @@ The `Database` supertrait is composed of seven sub-traits. Leaf consumers can de **Timestamp write format:** Always write timestamps with `fmt_ts(dt)` (RFC 3339, millisecond precision). Read with `get_ts()` / `get_opt_ts()` which handle legacy naive formats too. -**Vector dimension:** PostgreSQL V9 migration changed the column to unbounded `vector` (removing the HNSW index). libSQL still uses `F32_BLOB(1536)` — if you use a different-dimension embedding model, the libSQL schema needs updating too. +**Vector dimension:** PostgreSQL V9 migration changed the column to unbounded `vector` (removing the HNSW index). libSQL V9 likewise moved `memory_chunks.embedding` to a plain `BLOB`, dropped the fixed-dimension vector index, and now falls back to brute-force cosine similarity in Rust when `vector_top_k(...)` is unavailable. **Connection per operation:** `LibSqlBackend::connect()` creates a fresh connection for every operation, sets `PRAGMA busy_timeout = 5000`, and closes it when the `Connection` is dropped. This is intentional — the libSQL SDK does not offer a pool. Avoid holding connections open across `await` points. @@ -122,7 +122,9 @@ The `Database` supertrait is composed of seven sub-traits. Leaf consumers can de **Workspace/Memory:** - `memory_documents` — flexible path-based files -- `memory_chunks` — chunked content with FTS + vector indexes +- `memory_chunks` — chunked content with FTS; embeddings are stored as plain + BLOBs after V9, so `vector_top_k(...)` may fall back to brute-force cosine + similarity when the fixed-dimension vector index is unavailable - `memory_chunks_fts` — FTS5 virtual table (libSQL) / `tsvector` column (PostgreSQL) - `heartbeat_state` — periodic execution tracking @@ -147,7 +149,7 @@ The `Database` supertrait is composed of seven sub-traits. Leaf consumers can de - **Settings reload** — `Config::from_db` skipped (requires `Store`) - **No incremental migrations** — schema is idempotent CREATE IF NOT EXISTS; no ALTER TABLE support; column additions require a new versioned approach - **No encryption at rest** — only secrets (API tokens) are AES-256-GCM encrypted; all other data is plaintext SQLite -- **Hybrid search** — both FTS5 and vector search (`libsql_vector_idx`) are implemented; however, the vector index is fixed at `F32_BLOB(1536)` while PostgreSQL switched to unbounded `vector` in V9 +- **Hybrid search cost** — libSQL still provides hybrid search after V9, but semantic retrieval may use a brute-force cosine scan over candidate embeddings when no compatible vector index exists - **Write serialization** — WAL mode allows concurrent readers but only one writer at a time; busy timeout is 5 s, which may cause timeouts under high write concurrency ## Running Locally with libSQL @@ -159,13 +161,17 @@ DATABASE_BACKEND=libsql LIBSQL_PATH=~/.ironclaw/test.db cargo run # Use Turso cloud (embedded replica syncs local file to cloud) DATABASE_BACKEND=libsql LIBSQL_URL=libsql://xxx.turso.io LIBSQL_AUTH_TOKEN=xxx cargo run -# In-memory (tests only — data is lost when the process exits) +# Temp-file-backed test database (data is lost when the last shared handle drops) # Use LibSqlBackend::new_memory() directly in test code ``` ## Testing the libSQL Backend -Use `LibSqlBackend::new_memory()` in unit tests — no files, no cleanup required: +Use `LibSqlBackend::new_memory()` in unit tests when fresh connections need to +share state. Despite the name, it creates a temp-file-backed database and +stores the `temp_path` on the shared database handle so the file persists until +the final `Arc` clone is dropped. Tests do not need manual cleanup, but they +should assume a temp file exists for the lifetime of the shared handle: ```rust #[tokio::test] @@ -176,7 +182,10 @@ async fn test_my_feature() { } ``` -For concurrency tests that require multiple connections sharing state, use `LibSqlBackend::new_local(&tmp_path)` with a `tempfile::tempdir()`. In-memory databases do not share state between connections. +`LibSqlBackend::new_memory()` is appropriate for multi-connection tests because +all fresh connections share the same temp-file-backed state. Use +`LibSqlBackend::new_local(&tmp_path)` with a `tempfile::tempdir()` when a test +needs an explicit on-disk path it can inspect or control directly. ## Sharing the libSQL Database Handle diff --git a/src/db/libsql/mod.rs b/src/db/libsql/mod.rs index fd1446dda..4f62f047c 100644 --- a/src/db/libsql/mod.rs +++ b/src/db/libsql/mod.rs @@ -4,7 +4,12 @@ //! Supports three modes: //! - Local embedded (file-based, no server needed) //! - Turso cloud with embedded replica (sync to cloud) -//! - In-memory (for testing) +//! - Temp-file-backed (for testing) — creates a UUID-named `.db` file in the +//! OS temp directory; fresh connections share state via the file; the file +//! and its WAL/SHM sidecars are deleted automatically when the final shared +//! [`LibSqlDatabase`] handle is dropped. Clones returned by `shared_db()` +//! can outlive the [`LibSqlBackend`], so cleanup follows the last shared +//! handle rather than the backend wrapper. mod conversations; pub(crate) mod helpers; @@ -17,12 +22,13 @@ mod tool_failures; mod workspace; use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; use crate::db::NativeDatabase; use crate::error::DatabaseError; -use libsql::{Connection, Database as LibSqlDatabase}; -use tokio::fs; +use libsql::{Connection, Database as RawLibSqlDatabase}; +use uuid::Uuid; use crate::db::libsql_migrations; pub(crate) use helpers::{ @@ -31,47 +37,142 @@ pub(crate) use helpers::{ }; pub(crate) use row_conversion::row_to_memory_document; -/// libSQL/Turso database backend. +/// Shared libSQL database handle. /// -/// Stores the `Database` handle in an `Arc` so that the same underlying -/// database can be shared with stores (SecretsStore, WasmToolStore) that -/// create their own connections per-operation. +/// Wraps the underlying [`RawLibSqlDatabase`] plus optional temp-file metadata +/// used by test-only temp-file-backed databases. Stores such as +/// `LibSqlSecretsStore`, `LibSqlWasmChannelStore`, and `LibSqlWasmToolStore` +/// share this handle via `Arc` so they all create connections against the same +/// underlying database and so temp-file cleanup runs when the last shared owner +/// is dropped. +pub struct LibSqlDatabase { + db: RawLibSqlDatabase, + /// Path to the ephemeral database file created by + /// [`LibSqlBackend::new_memory`]. + /// `None` for persistent (`new_local` / `new_remote_replica`) backends. + /// When `Some`, the file and its `-wal`/`-shm` sidecars are removed in + /// [`Drop`]. + temp_path: Option, +} + +impl LibSqlDatabase { + fn new(db: RawLibSqlDatabase, temp_path: Option) -> Self { + Self { db, temp_path } + } + + #[cfg(test)] + pub fn temp_path(&self) -> Option { + self.temp_path.clone() + } + + /// Create a fresh libSQL connection from the shared database handle. + /// + /// Applies the same retry and `busy_timeout` setup used by + /// [`LibSqlBackend::connect`] so all shared-handle consumers behave + /// consistently. + pub async fn connect(&self) -> Result { + let mut last_err = None; + for attempt in 0..3u32 { + match self.db.connect() { + Ok(conn) => { + conn.query("PRAGMA busy_timeout = 5000", ()) + .await + .map_err(|e| { + DatabaseError::Pool(format!("Failed to set busy_timeout: {}", e)) + })?; + return Ok(conn); + } + Err(e) => { + last_err = Some(e); + if attempt < 2 { + tokio::time::sleep(std::time::Duration::from_millis( + 50 * 2u64.pow(attempt), + )) + .await; + } + } + } + } + Err(DatabaseError::Pool(format!( + "Failed to create connection after 3 attempts: {}", + last_err.map(|e| e.to_string()).unwrap_or_default() + ))) + } +} + +impl Drop for LibSqlDatabase { + fn drop(&mut self) { + if let Some(path) = &self.temp_path { + let _ = std::fs::remove_file(path); + let _ = std::fs::remove_file(path.with_extension("db-wal")); + let _ = std::fs::remove_file(path.with_extension("db-shm")); + } + } +} + +/// libSQL/Turso backend implementation of [`NativeDatabase`]. +/// +/// Owns one shared [`LibSqlDatabase`] handle and exposes constructors for the +/// local, remote-replica, and temp-file-backed test modes. Callers that need +/// backend-specific sharing can clone the underlying handle with +/// [`LibSqlBackend::shared_db`], while normal database operations go through +/// [`LibSqlBackend::connect`] and the trait implementations on this type. pub struct LibSqlBackend { db: Arc, } impl LibSqlBackend { - /// Ensure the parent directory of `path` exists, creating it and all - /// ancestors if necessary. - async fn ensure_parent_dir(path: &Path) -> Result<(), DatabaseError> { + fn ensure_parent_dir(path: &Path) -> Result<(), DatabaseError> { if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await.map_err(|e| { - DatabaseError::Pool(format!("Failed to create database directory: {e}")) + std::fs::create_dir_all(parent).map_err(|e| { + DatabaseError::Pool(format!("Failed to create database directory: {}", e)) })?; } + Ok(()) } + /// Wraps a built `libsql::Database` in `Self` with no temp-file path. + fn from_db(db: RawLibSqlDatabase) -> Self { + Self { + db: Arc::new(LibSqlDatabase::new(db, None)), + } + } + /// Create a new local embedded database. pub async fn new_local(path: &Path) -> Result { - Self::ensure_parent_dir(path).await?; + Self::ensure_parent_dir(path)?; + let db = libsql::Builder::new_local(path) .build() .await - .map_err(|e| DatabaseError::Pool(format!("Failed to open libSQL database: {e}")))?; - Ok(Self { db: Arc::new(db) }) + .map_err(|e| DatabaseError::Pool(format!("Failed to open libSQL database: {}", e)))?; + + Ok(Self::from_db(db)) } - /// Create a new in-memory database (for testing). + /// Create a temp-file-backed database for testing. + /// + /// Creates a UUID-named `.db` file in [`std::env::temp_dir`]. Multiple + /// calls to [`Self::connect`] share state through that file, matching the + /// behaviour of the production `new_local` path without requiring a + /// caller-supplied path. + /// + /// The file and its `-wal`/`-shm` sidecars are removed automatically when + /// the final shared database handle created for this backend is dropped. pub async fn new_memory() -> Result { - let db = libsql::Builder::new_local(":memory:") + let temp_path = + std::env::temp_dir().join(format!("axinite-libsql-memory-{}.db", Uuid::new_v4())); + let db = libsql::Builder::new_local(&temp_path) .build() .await .map_err(|e| { - DatabaseError::Pool(format!("Failed to create in-memory database: {}", e)) + DatabaseError::Pool(format!("Failed to create temp-file-backed database: {}", e)) })?; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + db: Arc::new(LibSqlDatabase::new(db, Some(temp_path))), + }) } /// Create with Turso cloud sync (embedded replica). @@ -80,12 +181,14 @@ impl LibSqlBackend { url: &str, auth_token: &str, ) -> Result { - Self::ensure_parent_dir(path).await?; + Self::ensure_parent_dir(path)?; + let db = libsql::Builder::new_remote_replica(path, url.to_string(), auth_token.to_string()) .build() .await - .map_err(|e| DatabaseError::Pool(format!("Failed to open remote replica: {e}")))?; - Ok(Self { db: Arc::new(db) }) + .map_err(|e| DatabaseError::Pool(format!("Failed to open remote replica: {}", e)))?; + + Ok(Self::from_db(db)) } /// Get a shared reference to the underlying database handle. @@ -106,32 +209,7 @@ impl LibSqlBackend { /// "unable to open database file" errors from concurrent connection /// creation (e.g. cron ticker vs main thread). pub async fn connect(&self) -> Result { - let mut last_err = None; - for attempt in 0..3u32 { - match self.db.connect() { - Ok(conn) => { - conn.query("PRAGMA busy_timeout = 5000", ()) - .await - .map_err(|e| { - DatabaseError::Pool(format!("Failed to set busy_timeout: {}", e)) - })?; - return Ok(conn); - } - Err(e) => { - last_err = Some(e); - if attempt < 2 { - tokio::time::sleep(std::time::Duration::from_millis( - 50 * 2u64.pow(attempt), - )) - .await; - } - } - } - } - Err(DatabaseError::Pool(format!( - "Failed to create connection after 3 attempts: {}", - last_err.map(|e| e.to_string()).unwrap_or_default() - ))) + self.db.connect().await } } @@ -173,6 +251,8 @@ impl NativeDatabase for LibSqlBackend { #[cfg(test)] mod tests { + use std::sync::Arc; + use chrono::{TimeZone, Utc}; use crate::db::Database; @@ -225,13 +305,8 @@ mod tests { let mut rows = conn.query("PRAGMA journal_mode", ()).await.unwrap(); let row = rows.next().await.unwrap().unwrap(); let mode: String = row.get(0).unwrap(); - // In-memory databases use "memory" journal mode (WAL doesn't apply to :memory:), - // but the PRAGMA still executes without error. For file-based databases it returns "wal". - assert!( - mode == "wal" || mode == "memory", - "expected wal or memory, got: {}", - mode, - ); + // The temp-file-backed test database should still enable WAL mode. + assert!(mode == "wal", "expected wal, got: {}", mode,); } #[tokio::test] @@ -246,6 +321,48 @@ mod tests { assert_eq!(timeout, 5000); } + #[test] + fn shared_libsql_database_drop_removes_temp_files() { + let runtime = tokio::runtime::Runtime::new().expect("create runtime for libsql test"); + let backend = runtime + .block_on(LibSqlBackend::new_memory()) + .expect("failed to create temp-file-backed backend"); + let shared_db = backend.shared_db(); + + let path = shared_db + .temp_path() + .expect("new_memory must set temp_path"); + let wal = path.with_extension("db-wal"); + let shm = path.with_extension("db-shm"); + + // Touch the database and sidecar files so the drop handler has + // something concrete to delete. + std::fs::write(&path, b"").expect("failed to create temp db file"); + std::fs::write(&wal, b"").expect("failed to create sidecar file"); + std::fs::write(&shm, b"").expect("failed to create sidecar file"); + + assert!(path.exists(), "temp db file must exist before drop"); + assert!(wal.exists(), "WAL sidecar must exist before drop"); + assert!(shm.exists(), "SHM sidecar must exist before drop"); + + drop(backend); + assert!( + path.exists(), + "shared handle should keep temp db file alive after backend drop" + ); + + let shared_db = match Arc::try_unwrap(shared_db) { + Ok(shared_db) => shared_db, + Err(_) => panic!("test should hold the final shared libsql database handle"), + }; + + drop(shared_db); + + assert!(!path.exists(), "temp db file must be removed after drop"); + assert!(!wal.exists(), "WAL sidecar must be removed after drop"); + assert!(!shm.exists(), "SHM sidecar must be removed after drop"); + } + /// Regression test: save_job must persist user_id and get_job must return it. #[tokio::test] async fn test_save_job_persists_user_id() { diff --git a/src/db/libsql/workspace.rs b/src/db/libsql/workspace.rs deleted file mode 100644 index 37ca119f4..000000000 --- a/src/db/libsql/workspace.rs +++ /dev/null @@ -1,902 +0,0 @@ -//! Workspace-related WorkspaceStore implementation for LibSqlBackend. - -use std::collections::HashMap; - -use libsql::params; -use uuid::Uuid; - -use super::{ - LibSqlBackend, fmt_ts, get_i64, get_opt_text, get_opt_ts, get_text, get_ts, - row_to_memory_document, -}; -use crate::db::{HybridSearchParams, InsertChunkParams, NativeWorkspaceStore}; -use crate::error::WorkspaceError; -use crate::workspace::{ - MemoryChunk, MemoryDocument, RankedResult, SearchResult, WorkspaceEntry, cosine_similarity, - reciprocal_rank_fusion, -}; - -use chrono::Utc; - -struct Candidate { - chunk_id: Uuid, - document_id: Uuid, - document_path: String, - content: String, - similarity: f32, -} - -fn rank_candidates(mut candidates: Vec, limit: usize) -> Vec { - // Sort by similarity descending - candidates.sort_by(|a, b| { - b.similarity - .partial_cmp(&a.similarity) - .unwrap_or(std::cmp::Ordering::Equal) - }); - - let total_candidates = candidates.len(); - - // Take top N and convert to RankedResult with 1-based rank - let results: Vec<_> = candidates - .into_iter() - .take(limit) - .enumerate() - .map(|(idx, c)| RankedResult { - chunk_id: c.chunk_id, - document_id: c.document_id, - document_path: c.document_path, - content: c.content, - rank: (idx + 1) as u32, - }) - .collect(); - - tracing::debug!( - "Brute-force vector search scanned {} candidates, returned {} results", - total_candidates, - results.len() - ); - - results -} - -/// Deserialize an embedding from a BLOB (4-byte little-endian f32 values). -/// -/// Returns an empty vector if the blob length is not a multiple of 4. -fn deserialize_embedding(blob: &[u8]) -> Vec { - if !blob.len().is_multiple_of(4) { - tracing::warn!( - "Embedding blob length {} is not a multiple of 4; skipping", - blob.len() - ); - return Vec::new(); - } - - blob.chunks_exact(4) - .map(|chunk| { - let bytes = [chunk[0], chunk[1], chunk[2], chunk[3]]; - f32::from_le_bytes(bytes) - }) - .collect() -} - -impl LibSqlBackend { - async fn collect_candidates( - &self, - rows: &mut libsql::Rows, - query_embedding: &[f32], - ) -> Result, WorkspaceError> { - let mut candidates = Vec::new(); - while let Some(row) = rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Row fetch failed: {}", e), - })? - { - let chunk_id: Uuid = match get_text(&row, 0).parse() { - Ok(id) => id, - Err(e) => { - tracing::warn!("Invalid chunk_id UUID in memory_chunks: {e}"); - continue; - } - }; - let document_id: Uuid = match get_text(&row, 1).parse() { - Ok(id) => id, - Err(e) => { - tracing::warn!("Invalid document_id UUID in memory_chunks: {e}"); - continue; - } - }; - let document_path = get_text(&row, 2); - let content = get_text(&row, 3); - - // Deserialize the embedding BLOB - let embedding_blob = match row.get_value(4) { - Ok(libsql::Value::Blob(bytes)) => bytes, - _ => continue, - }; - let chunk_embedding = deserialize_embedding(&embedding_blob); - if chunk_embedding.is_empty() { - continue; - } - - // Compute cosine similarity - let similarity = cosine_similarity(query_embedding, &chunk_embedding); - - candidates.push(Candidate { - chunk_id, - document_id, - document_path, - content, - similarity, - }); - } - Ok(candidates) - } - - /// Brute-force vector search using cosine similarity in Rust. - /// - /// Loads all chunks with embeddings for the given user/agent, computes - /// cosine similarity against the query embedding, and returns the top matches. - /// This is used as a fallback when the vector index is not available (post-V9 migration). - async fn brute_force_vector_search( - &self, - user_id: &str, - agent_id: Option, - embedding: &[f32], - limit: usize, - ) -> Result, WorkspaceError> { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let agent_id_str = agent_id.map(|id| id.to_string()); - - // Load all chunks with embeddings - let mut rows = conn - .query( - r#" - SELECT c.id, c.document_id, d.path, c.content, c.embedding - FROM memory_chunks c - JOIN memory_documents d ON d.id = c.document_id - WHERE d.user_id = ?1 AND d.agent_id IS ?2 - AND c.embedding IS NOT NULL - "#, - params![user_id, agent_id_str.as_deref()], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })?; - - let candidates = self.collect_candidates(&mut rows, embedding).await?; - Ok(rank_candidates(candidates, limit)) - } -} - -/// Execute full-text search and return ranked results. -/// -/// Queries the memory_chunks_fts virtual table, joining with memory_chunks -/// and memory_documents to fetch chunk content and document paths. Assigns -/// rank based on result order. -async fn fts_ranked_results( - conn: &libsql::Connection, - user_id: &str, - agent_id: Option<&str>, - query: &str, - limit: i64, -) -> Result, WorkspaceError> { - let mut rows = conn - .query( - r#" - SELECT c.id, c.document_id, d.path, c.content - FROM memory_chunks_fts fts - JOIN memory_chunks c ON c._rowid = fts.rowid - JOIN memory_documents d ON d.id = c.document_id - WHERE d.user_id = ?1 AND d.agent_id IS ?2 - AND memory_chunks_fts MATCH ?3 - ORDER BY rank - LIMIT ?4 - "#, - params![user_id, agent_id, query, limit], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("FTS query failed: {}", e), - })?; - - let mut results = Vec::new(); - while let Some(row) = rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("FTS row fetch failed: {}", e), - })? - { - results.push(RankedResult { - chunk_id: get_text(&row, 0).parse().unwrap_or_default(), - document_id: get_text(&row, 1).parse().unwrap_or_default(), - document_path: get_text(&row, 2), - content: get_text(&row, 3), - rank: results.len() as u32 + 1, - }); - } - Ok(results) -} - -/// Execute vector similarity search and return ranked results. -/// -/// Queries using libsql's vector_top_k function. If the vector index is -/// missing (expected after V9 migration), logs at debug level and returns -/// an empty vector, preserving the existing graceful fallback behaviour. -async fn vector_ranked_results( - conn: &libsql::Connection, - user_id: &str, - agent_id: Option<&str>, - embedding: &[f32], - limit: i64, -) -> Result, WorkspaceError> { - let vector_json = format!( - "[{}]", - embedding - .iter() - .map(|f| f.to_string()) - .collect::>() - .join(",") - ); - - // vector_top_k requires a libsql_vector_idx index. After the V9 - // migration the index is dropped (to support flexible embedding - // dimensions), so this query may fail. Fall back to FTS-only. - match conn - .query( - r#" - SELECT c.id, c.document_id, d.path, c.content - FROM vector_top_k('idx_memory_chunks_embedding', vector(?1), ?2) AS top_k - JOIN memory_chunks c ON c._rowid = top_k.id - JOIN memory_documents d ON d.id = c.document_id - WHERE d.user_id = ?3 AND d.agent_id IS ?4 - "#, - params![vector_json, limit, user_id, agent_id], - ) - .await - { - Ok(mut rows) => { - let mut results = Vec::new(); - while let Some(row) = rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Vector row fetch failed: {}", e), - })? - { - results.push(RankedResult { - chunk_id: get_text(&row, 0).parse().unwrap_or_default(), - document_id: get_text(&row, 1).parse().unwrap_or_default(), - document_path: get_text(&row, 2), - content: get_text(&row, 3), - rank: results.len() as u32 + 1, - }); - } - Ok(results) - } - Err(e) => { - tracing::debug!( - "Vector index query failed (expected after V9 migration), \ - falling back to FTS-only: {e}" - ); - Ok(Vec::new()) - } - } -} - -impl NativeWorkspaceStore for LibSqlBackend { - async fn get_document_by_path( - &self, - user_id: &str, - agent_id: Option, - path: &str, - ) -> Result { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let agent_id_str = agent_id.map(|id| id.to_string()); - let mut rows = conn - .query( - r#" - SELECT id, user_id, agent_id, path, content, - created_at, updated_at, metadata - FROM memory_documents - WHERE user_id = ?1 AND agent_id IS ?2 AND path = ?3 - "#, - params![user_id, agent_id_str.as_deref(), path], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })?; - - match rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })? { - Some(row) => Ok(row_to_memory_document(&row)), - None => Err(WorkspaceError::DocumentNotFound { - doc_type: path.to_string(), - user_id: user_id.to_string(), - }), - } - } - - async fn get_document_by_id(&self, id: Uuid) -> Result { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let mut rows = conn - .query( - r#" - SELECT id, user_id, agent_id, path, content, - created_at, updated_at, metadata - FROM memory_documents WHERE id = ?1 - "#, - params![id.to_string()], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })?; - - match rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })? { - Some(row) => Ok(row_to_memory_document(&row)), - None => Err(WorkspaceError::DocumentNotFound { - doc_type: "unknown".to_string(), - user_id: "unknown".to_string(), - }), - } - } - - async fn get_or_create_document_by_path( - &self, - user_id: &str, - agent_id: Option, - path: &str, - ) -> Result { - // Try get - match NativeWorkspaceStore::get_document_by_path(self, user_id, agent_id, path).await { - Ok(doc) => return Ok(doc), - Err(WorkspaceError::DocumentNotFound { .. }) => {} - Err(e) => return Err(e), - } - - // Create - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let id = Uuid::new_v4(); - let agent_id_str = agent_id.map(|id| id.to_string()); - conn.execute( - r#" - INSERT INTO memory_documents (id, user_id, agent_id, path, content, metadata) - VALUES (?1, ?2, ?3, ?4, '', '{}') - ON CONFLICT DO NOTHING - "#, - params![id.to_string(), user_id, agent_id_str.as_deref(), path], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Insert failed: {}", e), - })?; - - NativeWorkspaceStore::get_document_by_path(self, user_id, agent_id, path).await - } - - async fn update_document(&self, id: Uuid, content: &str) -> Result<(), WorkspaceError> { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let now = fmt_ts(&Utc::now()); - conn.execute( - "UPDATE memory_documents SET content = ?2, updated_at = ?3 WHERE id = ?1", - params![id.to_string(), content, now], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Update failed: {}", e), - })?; - Ok(()) - } - - async fn delete_document_by_path( - &self, - user_id: &str, - agent_id: Option, - path: &str, - ) -> Result<(), WorkspaceError> { - let doc = NativeWorkspaceStore::get_document_by_path(self, user_id, agent_id, path).await?; - NativeWorkspaceStore::delete_chunks(self, doc.id).await?; - - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let agent_id_str = agent_id.map(|id| id.to_string()); - conn.execute( - "DELETE FROM memory_documents WHERE user_id = ?1 AND agent_id IS ?2 AND path = ?3", - params![user_id, agent_id_str.as_deref(), path], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Delete failed: {}", e), - })?; - Ok(()) - } - - async fn list_directory( - &self, - user_id: &str, - agent_id: Option, - directory: &str, - ) -> Result, WorkspaceError> { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let dir = if !directory.is_empty() && !directory.ends_with('/') { - format!("{}/", directory) - } else { - directory.to_string() - }; - - let agent_id_str = agent_id.map(|id| id.to_string()); - let pattern = if dir.is_empty() { - "%".to_string() - } else { - format!("{}%", dir) - }; - - let mut rows = conn - .query( - r#" - SELECT path, updated_at, substr(content, 1, 200) as content_preview - FROM memory_documents - WHERE user_id = ?1 AND agent_id IS ?2 - AND (?3 = '%' OR path LIKE ?3) - ORDER BY path - "#, - params![user_id, agent_id_str.as_deref(), pattern], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("List directory failed: {}", e), - })?; - - let mut entries_map: HashMap = HashMap::new(); - - while let Some(row) = rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })? - { - let full_path = get_text(&row, 0); - let updated_at = get_opt_ts(&row, 1); - let content_preview = get_opt_text(&row, 2); - - let relative = if dir.is_empty() { - &full_path - } else if let Some(stripped) = full_path.strip_prefix(&dir) { - stripped - } else { - continue; - }; - - let child_name = if let Some(slash_pos) = relative.find('/') { - &relative[..slash_pos] - } else { - relative - }; - - if child_name.is_empty() { - continue; - } - - let is_dir = relative.contains('/'); - let entry_path = if dir.is_empty() { - child_name.to_string() - } else { - format!("{}{}", dir, child_name) - }; - - entries_map - .entry(child_name.to_string()) - .and_modify(|e| { - if is_dir { - e.is_directory = true; - e.content_preview = None; - } - if let (Some(existing), Some(new)) = (&e.updated_at, &updated_at) - && new > existing - { - e.updated_at = Some(*new); - } - }) - .or_insert(WorkspaceEntry { - path: entry_path, - is_directory: is_dir, - updated_at, - content_preview: if is_dir { None } else { content_preview }, - }); - } - - let mut entries: Vec = entries_map.into_values().collect(); - entries.sort_by(|a, b| a.path.cmp(&b.path)); - Ok(entries) - } - - async fn list_all_paths( - &self, - user_id: &str, - agent_id: Option, - ) -> Result, WorkspaceError> { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let agent_id_str = agent_id.map(|id| id.to_string()); - let mut rows = conn - .query( - "SELECT path FROM memory_documents WHERE user_id = ?1 AND agent_id IS ?2 ORDER BY path", - params![user_id, agent_id_str.as_deref()], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("List paths failed: {}", e), - })?; - - let mut paths = Vec::new(); - while let Some(row) = rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })? - { - paths.push(get_text(&row, 0)); - } - Ok(paths) - } - - async fn list_documents( - &self, - user_id: &str, - agent_id: Option, - ) -> Result, WorkspaceError> { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let agent_id_str = agent_id.map(|id| id.to_string()); - let mut rows = conn - .query( - r#" - SELECT id, user_id, agent_id, path, content, - created_at, updated_at, metadata - FROM memory_documents - WHERE user_id = ?1 AND agent_id IS ?2 - ORDER BY updated_at DESC - "#, - params![user_id, agent_id_str.as_deref()], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })?; - - let mut docs = Vec::new(); - while let Some(row) = rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })? - { - docs.push(row_to_memory_document(&row)); - } - Ok(docs) - } - - async fn delete_chunks(&self, document_id: Uuid) -> Result<(), WorkspaceError> { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::ChunkingFailed { - reason: e.to_string(), - })?; - conn.execute( - "DELETE FROM memory_chunks WHERE document_id = ?1", - params![document_id.to_string()], - ) - .await - .map_err(|e| WorkspaceError::ChunkingFailed { - reason: format!("Delete failed: {}", e), - })?; - Ok(()) - } - - async fn insert_chunk(&self, params: InsertChunkParams<'_>) -> Result { - let InsertChunkParams { - document_id, - chunk_index, - content, - embedding, - } = params; - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::ChunkingFailed { - reason: e.to_string(), - })?; - let id = Uuid::new_v4(); - let chunk_index = i64::from(chunk_index); - let embedding_blob = embedding.map(|e| { - let bytes: Vec = e.iter().flat_map(|f| f.to_le_bytes()).collect(); - bytes - }); - - conn.execute( - r#" - INSERT INTO memory_chunks (id, document_id, chunk_index, content, embedding) - VALUES (?1, ?2, ?3, ?4, ?5) - "#, - params![ - id.to_string(), - document_id.to_string(), - chunk_index, - content, - embedding_blob.map(libsql::Value::Blob), - ], - ) - .await - .map_err(|e| WorkspaceError::ChunkingFailed { - reason: format!("Insert failed: {}", e), - })?; - Ok(id) - } - - async fn update_chunk_embedding( - &self, - chunk_id: Uuid, - embedding: &[f32], - ) -> Result<(), WorkspaceError> { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::EmbeddingFailed { - reason: e.to_string(), - })?; - let bytes: Vec = embedding.iter().flat_map(|f| f.to_le_bytes()).collect(); - - conn.execute( - "UPDATE memory_chunks SET embedding = ?2 WHERE id = ?1", - params![chunk_id.to_string(), libsql::Value::Blob(bytes)], - ) - .await - .map_err(|e| WorkspaceError::EmbeddingFailed { - reason: format!("Update failed: {}", e), - })?; - Ok(()) - } - - async fn get_chunks_without_embeddings( - &self, - user_id: &str, - agent_id: Option, - limit: usize, - ) -> Result, WorkspaceError> { - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let agent_id_str = agent_id.map(|id| id.to_string()); - let mut rows = conn - .query( - r#" - SELECT c.id, c.document_id, c.chunk_index, c.content, c.created_at - FROM memory_chunks c - JOIN memory_documents d ON d.id = c.document_id - WHERE d.user_id = ?1 AND d.agent_id IS ?2 - AND c.embedding IS NULL - LIMIT ?3 - "#, - params![user_id, agent_id_str.as_deref(), limit as i64], - ) - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })?; - - let mut chunks = Vec::new(); - while let Some(row) = rows - .next() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: format!("Query failed: {}", e), - })? - { - let chunk_index = - u32::try_from(get_i64(&row, 2)).map_err(|_| WorkspaceError::SearchFailed { - reason: "memory_chunks.chunk_index must be non-negative".to_string(), - })?; - chunks.push(MemoryChunk { - id: get_text(&row, 0).parse().unwrap_or_default(), - document_id: get_text(&row, 1).parse().unwrap_or_default(), - chunk_index, - content: get_text(&row, 3), - embedding: None, - created_at: get_ts(&row, 4), - }); - } - Ok(chunks) - } - - async fn hybrid_search( - &self, - params: HybridSearchParams<'_>, - ) -> Result, WorkspaceError> { - let HybridSearchParams { - user_id, - agent_id, - query, - embedding, - config, - } = params; - let conn = self - .connect() - .await - .map_err(|e| WorkspaceError::SearchFailed { - reason: e.to_string(), - })?; - let agent_id_str = agent_id.map(|id| id.to_string()); - let pre_limit = config.pre_fusion_limit as i64; - - let fts_results = if config.use_fts { - let results = - fts_ranked_results(&conn, user_id, agent_id_str.as_deref(), query, pre_limit) - .await?; - tracing::debug!( - "FTS search returned {} results (pre-fusion limit: {})", - results.len(), - pre_limit - ); - results - } else { - Vec::new() - }; - - let vector_results = if config.use_vector { - if let Some(emb) = embedding { - // Try the vector index first; fall back to brute-force - // cosine similarity if the index is gone (post-V9 migration). - let indexed = - vector_ranked_results(&conn, user_id, agent_id_str.as_deref(), emb, pre_limit) - .await?; - if indexed.is_empty() { - tracing::info!( - "Vector index returned no results, using brute-force vector search" - ); - self.brute_force_vector_search(user_id, agent_id, emb, pre_limit as usize) - .await - .unwrap_or_else(|e| { - tracing::warn!("Brute-force vector search failed: {e}"); - Vec::new() - }) - } else { - tracing::debug!( - "Vector index search returned {} results (pre-fusion limit: {})", - indexed.len(), - pre_limit - ); - indexed - } - } else { - Vec::new() - } - } else { - if embedding.is_some() { - tracing::warn!( - "Embedding provided but vector search is disabled in config; using FTS-only results" - ); - } - Vec::new() - }; - - Ok(reciprocal_rank_fusion(fts_results, vector_results, config)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_deserialize_embedding_valid() { - let floats = [1.0f32, 2.0, 3.0]; - let bytes: Vec = floats.iter().flat_map(|f| f.to_le_bytes()).collect(); - - let result = deserialize_embedding(&bytes); - - assert_eq!(result.len(), 3); - assert!((result[0] - 1.0).abs() < 0.001); - assert!((result[1] - 2.0).abs() < 0.001); - assert!((result[2] - 3.0).abs() < 0.001); - } - - #[test] - fn test_deserialize_embedding_empty() { - let result = deserialize_embedding(&[]); - assert_eq!(result.len(), 0); - } - - #[test] - fn test_deserialize_embedding_invalid_length() { - // 7 bytes is not a multiple of 4 - let result = deserialize_embedding(&[1, 2, 3, 4, 5, 6, 7]); - assert_eq!(result.len(), 0); - } - - #[test] - fn test_deserialize_embedding_single_value() { - let value = 42.5f32; - let bytes = value.to_le_bytes(); - - let result = deserialize_embedding(&bytes); - - assert_eq!(result.len(), 1); - assert!((result[0] - 42.5).abs() < 0.001); - } - - #[test] - fn test_deserialize_embedding_negative_values() { - let floats = [-1.5f32, 0.0, 2.75]; - let bytes: Vec = floats.iter().flat_map(|f| f.to_le_bytes()).collect(); - - let result = deserialize_embedding(&bytes); - - assert_eq!(result.len(), 3); - assert!((result[0] - (-1.5)).abs() < 0.001); - assert!((result[1] - 0.0).abs() < 0.001); - assert!((result[2] - 2.75).abs() < 0.001); - } -} diff --git a/src/db/libsql/workspace/chunk_ops.rs b/src/db/libsql/workspace/chunk_ops.rs new file mode 100644 index 000000000..250f0f025 --- /dev/null +++ b/src/db/libsql/workspace/chunk_ops.rs @@ -0,0 +1,204 @@ +//! Chunk-oriented workspace-store helpers for the libSQL backend. + +#[cfg(test)] +#[path = "chunk_ops_tests.rs"] +mod tests; + +use libsql::params; +use uuid::Uuid; + +use super::super::{LibSqlBackend, get_i64, get_text, get_ts}; +use crate::db::InsertChunkParams; +use crate::error::WorkspaceError; +use crate::workspace::MemoryChunk; + +pub(super) async fn delete_chunks( + backend: &LibSqlBackend, + document_id: Uuid, +) -> Result<(), WorkspaceError> { + let conn = backend + .connect() + .await + .map_err(|e| WorkspaceError::ChunkingFailed { + reason: e.to_string(), + })?; + conn.execute( + "DELETE FROM memory_chunks WHERE document_id = ?1", + params![document_id.to_string()], + ) + .await + .map_err(|e| WorkspaceError::ChunkingFailed { + reason: format!("Delete failed: {}", e), + })?; + Ok(()) +} + +pub(super) async fn insert_chunk( + backend: &LibSqlBackend, + params: InsertChunkParams<'_>, +) -> Result { + let InsertChunkParams { + document_id, + chunk_index, + content, + embedding, + } = params; + let conn = backend + .connect() + .await + .map_err(|e| WorkspaceError::ChunkingFailed { + reason: e.to_string(), + })?; + let id = Uuid::new_v4(); + let chunk_index = i64::from(chunk_index); + let embedding_blob = embedding.and_then(|values| { + (!values.is_empty()).then(|| { + values + .iter() + .flat_map(|f| f.to_le_bytes()) + .collect::>() + }) + }); + let embedding_value = embedding_blob + .map(libsql::Value::Blob) + .unwrap_or(libsql::Value::Null); + + conn.execute( + r#" + INSERT INTO memory_chunks (id, document_id, chunk_index, content, embedding) + VALUES (?1, ?2, ?3, ?4, ?5) + "#, + params![ + id.to_string(), + document_id.to_string(), + chunk_index, + content, + embedding_value, + ], + ) + .await + .map_err(|e| WorkspaceError::ChunkingFailed { + reason: format!("Insert failed: {}", e), + })?; + Ok(id) +} + +pub(super) async fn update_chunk_embedding( + backend: &LibSqlBackend, + chunk_id: Uuid, + embedding: &[f32], +) -> Result<(), WorkspaceError> { + let conn = backend + .connect() + .await + .map_err(|e| WorkspaceError::EmbeddingFailed { + reason: e.to_string(), + })?; + let embedding_value = if embedding.is_empty() { + libsql::Value::Null + } else { + let bytes: Vec = embedding.iter().flat_map(|f| f.to_le_bytes()).collect(); + libsql::Value::Blob(bytes) + }; + + conn.execute( + "UPDATE memory_chunks SET embedding = ?2 WHERE id = ?1", + params![chunk_id.to_string(), embedding_value], + ) + .await + .map_err(|e| WorkspaceError::EmbeddingFailed { + reason: format!("Update failed: {}", e), + })?; + Ok(()) +} + +/// Parse a single `memory_chunks` row into a [`MemoryChunk`]. +/// +/// Returns `Ok(None)` and emits a `WARN` log when either UUID column +/// contains an invalid value (the row is silently skipped). +/// Returns `Err` when `chunk_index` is negative (a fatal data-integrity +/// violation). +fn parse_chunk_row(row: libsql::Row) -> Result, WorkspaceError> { + let raw_chunk_id = get_text(&row, 0); + let id: Uuid = match raw_chunk_id.parse() { + Ok(id) => id, + Err(e) => { + tracing::warn!( + "Invalid chunk_id UUID in memory_chunks ('{}'): {e}", + raw_chunk_id + ); + return Ok(None); + } + }; + + let raw_document_id = get_text(&row, 1); + let document_id: Uuid = match raw_document_id.parse() { + Ok(id) => id, + Err(e) => { + tracing::warn!( + "Invalid document_id UUID in memory_chunks ('{}'): {e}", + raw_document_id + ); + return Ok(None); + } + }; + + let chunk_index = + u32::try_from(get_i64(&row, 2)).map_err(|_| WorkspaceError::SearchFailed { + reason: "memory_chunks.chunk_index must be non-negative".to_string(), + })?; + + Ok(Some(MemoryChunk { + id, + document_id, + chunk_index, + content: get_text(&row, 3), + embedding: None, + created_at: get_ts(&row, 4), + })) +} + +pub(super) async fn get_chunks_without_embeddings( + backend: &LibSqlBackend, + user_id: &str, + agent_id: Option, + limit: usize, +) -> Result, WorkspaceError> { + let conn = backend + .connect() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: e.to_string(), + })?; + let agent_id_str = agent_id.map(|id| id.to_string()); + let mut rows = conn + .query( + r#" + SELECT c.id, c.document_id, c.chunk_index, c.content, c.created_at + FROM memory_chunks c + JOIN memory_documents d ON d.id = c.document_id + WHERE d.user_id = ?1 AND d.agent_id IS ?2 + AND c.embedding IS NULL + LIMIT ?3 + "#, + params![user_id, agent_id_str.as_deref(), limit as i64], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })?; + + let mut chunks = Vec::new(); + while let Some(row) = rows + .next() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })? + { + if let Some(chunk) = parse_chunk_row(row)? { + chunks.push(chunk); + } + } + Ok(chunks) +} diff --git a/src/db/libsql/workspace/chunk_ops_tests.rs b/src/db/libsql/workspace/chunk_ops_tests.rs new file mode 100644 index 000000000..c599d017e --- /dev/null +++ b/src/db/libsql/workspace/chunk_ops_tests.rs @@ -0,0 +1,164 @@ +//! Tests for libSQL workspace chunk helpers. + +use chrono::{TimeZone, Utc}; +use libsql::params; +use uuid::Uuid; + +use super::*; +use crate::db::NativeDatabase; + +#[tokio::test] +async fn parse_chunk_row_maps_valid_rows() { + let chunk_id = Uuid::new_v4(); + let document_id = Uuid::new_v4(); + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + let conn = backend + .connect() + .await + .expect("failed to open libsql connection"); + let mut rows = conn + .query( + "SELECT ?1, ?2, ?3, ?4, ?5", + params![ + chunk_id.to_string(), + document_id.to_string(), + 7i64, + "chunk body", + "2026-03-07T12:34:56.000Z" + ], + ) + .await + .expect("failed to query literal chunk row"); + let row = rows + .next() + .await + .expect("failed to fetch literal chunk row") + .expect("expected one literal chunk row"); + + let chunk = parse_chunk_row(row) + .expect("valid chunk row should parse") + .expect("valid chunk row should not be skipped"); + + assert_eq!(chunk.id, chunk_id); + assert_eq!(chunk.document_id, document_id); + assert_eq!(chunk.chunk_index, 7); + assert_eq!(chunk.content, "chunk body"); + assert_eq!( + chunk.created_at, + Utc.with_ymd_and_hms(2026, 3, 7, 12, 34, 56) + .single() + .expect("timestamp should be valid"), + ); +} + +#[tokio::test] +async fn parse_chunk_row_skips_invalid_chunk_uuid() { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + let conn = backend + .connect() + .await + .expect("failed to open libsql connection"); + let mut rows = conn + .query( + "SELECT 'not-a-uuid', ?1, 0, 'chunk body', '2026-03-07T12:34:56.000Z'", + params![Uuid::new_v4().to_string()], + ) + .await + .expect("failed to query literal chunk row"); + let row = rows + .next() + .await + .expect("failed to fetch literal chunk row") + .expect("expected one literal chunk row"); + + assert!( + parse_chunk_row(row) + .expect("invalid chunk id should be skipped cleanly") + .is_none() + ); +} + +#[tokio::test] +async fn parse_chunk_row_rejects_negative_chunk_index() { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + let conn = backend + .connect() + .await + .expect("failed to open libsql connection"); + let mut rows = conn + .query( + "SELECT ?1, ?2, -1, 'chunk body', '2026-03-07T12:34:56.000Z'", + params![Uuid::new_v4().to_string(), Uuid::new_v4().to_string()], + ) + .await + .expect("failed to query literal chunk row"); + let row = rows + .next() + .await + .expect("failed to fetch literal chunk row") + .expect("expected one literal chunk row"); + + let error = parse_chunk_row(row).expect_err("negative chunk index must fail"); + assert!(matches!( + error, + WorkspaceError::SearchFailed { reason } + if reason == "memory_chunks.chunk_index must be non-negative" + )); +} + +#[tokio::test] +async fn insert_chunk_stores_empty_embeddings_as_null() { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + backend + .run_migrations() + .await + .expect("failed to run libsql migrations"); + let document_id = Uuid::new_v4(); + let conn = backend + .connect() + .await + .expect("failed to open libsql connection"); + conn.execute( + "INSERT INTO memory_documents (id, user_id, agent_id, path, content, metadata) VALUES (?1, 'default', NULL, 'notes/chunk.md', '', '{}')", + params![document_id.to_string()], + ) + .await + .expect("failed to insert document"); + + let chunk_id = insert_chunk( + &backend, + crate::db::InsertChunkParams { + document_id, + chunk_index: 0, + content: "chunk body", + embedding: Some(&[]), + }, + ) + .await + .expect("failed to insert chunk with empty embedding"); + + let mut rows = conn + .query( + "SELECT embedding FROM memory_chunks WHERE id = ?1", + params![chunk_id.to_string()], + ) + .await + .expect("failed to query inserted chunk"); + let row = rows + .next() + .await + .expect("failed to fetch inserted chunk row") + .expect("expected inserted chunk row"); + assert!(matches!( + row.get_value(0).expect("failed to read embedding value"), + libsql::Value::Null + )); +} diff --git a/src/db/libsql/workspace/document_listing.rs b/src/db/libsql/workspace/document_listing.rs new file mode 100644 index 000000000..3e3ce7f4c --- /dev/null +++ b/src/db/libsql/workspace/document_listing.rs @@ -0,0 +1,73 @@ +//! Listing helpers for libSQL workspace document queries. + +use std::collections::HashMap; + +use crate::workspace::WorkspaceEntry; + +pub(super) fn normalise_dir_prefix(directory: &str) -> String { + if !directory.is_empty() && !directory.ends_with('/') { + format!("{}/", directory) + } else { + directory.to_string() + } +} + +pub(super) fn dir_like_pattern(dir: &str) -> String { + if dir.is_empty() { + "%".to_string() + } else { + format!("{}%", dir) + } +} + +pub(super) fn resolve_entry(full_path: &str, dir: &str) -> Option<(String, bool, String)> { + let relative = if dir.is_empty() { + full_path + } else { + full_path.strip_prefix(dir)? + }; + let child_name = if let Some(slash_pos) = relative.find('/') { + &relative[..slash_pos] + } else { + relative + }; + if child_name.is_empty() { + return None; + } + let is_dir = relative.contains('/'); + let entry_path = if dir.is_empty() { + child_name.to_string() + } else { + format!("{}{}", dir, child_name) + }; + Some((child_name.to_string(), is_dir, entry_path)) +} + +pub(super) fn merge_entry( + entries_map: &mut HashMap, + child_name: String, + entry_path: String, + is_dir: bool, + updated_at: Option>, + content_preview: Option, +) { + entries_map + .entry(child_name) + .and_modify(|entry| { + if is_dir { + entry.is_directory = true; + entry.content_preview = None; + } + if let (Some(existing), Some(new)) = (&entry.updated_at, &updated_at) + && new > existing + { + entry.updated_at = Some(*new); + } + }) + .or_insert(WorkspaceEntry { + path: entry_path, + is_directory: is_dir, + updated_at, + content_preview: if is_dir { None } else { content_preview }, + }); +} diff --git a/src/db/libsql/workspace/document_ops.rs b/src/db/libsql/workspace/document_ops.rs new file mode 100644 index 000000000..2103dfbd0 --- /dev/null +++ b/src/db/libsql/workspace/document_ops.rs @@ -0,0 +1,351 @@ +//! Document-oriented workspace-store helpers for the libSQL backend. + +#[path = "document_listing.rs"] +mod listing; +#[cfg(test)] +#[path = "document_ops_tests.rs"] +mod tests; + +use std::collections::HashMap; + +use chrono::Utc; +use libsql::params; +use uuid::Uuid; + +use super::super::{ + LibSqlBackend, fmt_ts, get_opt_text, get_opt_ts, get_text, row_to_memory_document, +}; +use crate::db::NativeWorkspaceStore; +use crate::error::WorkspaceError; +use crate::workspace::{MemoryDocument, WorkspaceEntry}; +use listing::{dir_like_pattern, merge_entry, normalise_dir_prefix, resolve_entry}; + +/// Identifies the user/agent context for a workspace document query. +/// +/// Bundles the `user_id` + `agent_id` pair that every document-scoped +/// helper requires, reducing per-function arity and making call sites +/// self-documenting. +#[derive(Clone, Copy)] +pub(super) struct AgentScope<'a> { + pub(super) user_id: &'a str, + pub(super) agent_id: Option, +} + +async fn connect_backend(backend: &LibSqlBackend) -> Result { + backend + .connect() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: e.to_string(), + }) +} + +async fn fetch_first_row(mut rows: libsql::Rows) -> Result, WorkspaceError> { + rows.next().await.map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + }) +} + +/// Maps an optional row to a [`MemoryDocument`], returning `not_found` when +/// the row is absent. +fn document_from_row_or_not_found( + row: Option, + doc_type: &str, + user_id: &str, +) -> Result { + match row { + Some(row) => Ok(row_to_memory_document(&row)), + None => Err(WorkspaceError::DocumentNotFound { + doc_type: doc_type.to_string(), + user_id: user_id.to_string(), + }), + } +} + +async fn drain_rows(mut rows: libsql::Rows, map_row: F) -> Result, WorkspaceError> +where + F: Fn(libsql::Row) -> T, +{ + let mut out = Vec::new(); + while let Some(row) = rows + .next() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })? + { + out.push(map_row(row)); + } + Ok(out) +} + +pub(super) async fn get_document_by_path( + backend: &LibSqlBackend, + scope: &AgentScope<'_>, + path: &str, +) -> Result { + let conn = connect_backend(backend).await?; + let agent_id_str = scope.agent_id.map(|id| id.to_string()); + let rows = conn + .query( + r#" + SELECT id, user_id, agent_id, path, content, + created_at, updated_at, metadata + FROM memory_documents + WHERE user_id = ?1 AND agent_id IS ?2 AND path = ?3 + "#, + params![scope.user_id, agent_id_str.as_deref(), path], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })?; + + document_from_row_or_not_found(fetch_first_row(rows).await?, path, scope.user_id) +} + +pub(super) async fn get_document_by_id( + backend: &LibSqlBackend, + id: Uuid, +) -> Result { + let conn = connect_backend(backend).await?; + let rows = conn + .query( + r#" + SELECT id, user_id, agent_id, path, content, + created_at, updated_at, metadata + FROM memory_documents WHERE id = ?1 + "#, + params![id.to_string()], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })?; + + document_from_row_or_not_found(fetch_first_row(rows).await?, "unknown", "unknown") +} + +pub(super) async fn get_or_create_document_by_path( + backend: &LibSqlBackend, + scope: &AgentScope<'_>, + path: &str, +) -> Result { + match NativeWorkspaceStore::get_document_by_path(backend, scope.user_id, scope.agent_id, path) + .await + { + Ok(doc) => return Ok(doc), + Err(WorkspaceError::DocumentNotFound { .. }) => {} + Err(e) => return Err(e), + } + + let conn = backend + .connect() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: e.to_string(), + })?; + let id = Uuid::new_v4(); + let agent_id_str = scope.agent_id.map(|id| id.to_string()); + conn.execute( + r#" + INSERT INTO memory_documents (id, user_id, agent_id, path, content, metadata) + VALUES (?1, ?2, ?3, ?4, '', '{}') + ON CONFLICT DO NOTHING + "#, + params![id.to_string(), scope.user_id, agent_id_str.as_deref(), path], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Insert failed: {}", e), + })?; + + NativeWorkspaceStore::get_document_by_path(backend, scope.user_id, scope.agent_id, path).await +} + +pub(super) async fn update_document( + backend: &LibSqlBackend, + id: Uuid, + content: &str, +) -> Result<(), WorkspaceError> { + let conn = backend + .connect() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: e.to_string(), + })?; + let now = fmt_ts(&Utc::now()); + conn.execute( + "UPDATE memory_documents SET content = ?2, updated_at = ?3 WHERE id = ?1", + params![id.to_string(), content, now], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Update failed: {}", e), + })?; + Ok(()) +} + +pub(super) async fn delete_document_by_path( + backend: &LibSqlBackend, + scope: &AgentScope<'_>, + path: &str, +) -> Result<(), WorkspaceError> { + let conn = connect_backend(backend).await?; + let agent_id_str = scope.agent_id.map(|id| id.to_string()); + let tx = conn + .transaction() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Delete failed: {}", e), + })?; + let rows = tx + .query( + r#" + SELECT id, user_id, agent_id, path, content, + created_at, updated_at, metadata + FROM memory_documents + WHERE user_id = ?1 AND agent_id IS ?2 AND path = ?3 + "#, + params![scope.user_id, agent_id_str.as_deref(), path], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })?; + let doc = match fetch_first_row(rows).await? { + Some(row) => row_to_memory_document(&row), + None => { + return Err(WorkspaceError::DocumentNotFound { + doc_type: path.to_string(), + user_id: scope.user_id.to_string(), + }); + } + }; + + tx.execute( + "DELETE FROM memory_chunks WHERE document_id = ?1", + params![doc.id.to_string()], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Delete failed: {}", e), + })?; + tx.execute( + "DELETE FROM memory_documents WHERE user_id = ?1 AND agent_id IS ?2 AND path = ?3", + params![scope.user_id, agent_id_str.as_deref(), path], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Delete failed: {}", e), + })?; + tx.commit() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Delete failed: {}", e), + })?; + Ok(()) +} + +pub(super) async fn list_directory( + backend: &LibSqlBackend, + scope: &AgentScope<'_>, + directory: &str, +) -> Result, WorkspaceError> { + let conn = backend + .connect() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: e.to_string(), + })?; + let dir = normalise_dir_prefix(directory); + + let agent_id_str = scope.agent_id.map(|id| id.to_string()); + let pattern = dir_like_pattern(&dir); + + let mut rows = conn + .query( + r#" + SELECT path, updated_at, substr(content, 1, 200) as content_preview + FROM memory_documents + WHERE user_id = ?1 AND agent_id IS ?2 + AND (?3 = '%' OR path LIKE ?3) + ORDER BY path + "#, + params![scope.user_id, agent_id_str.as_deref(), pattern], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("List directory failed: {}", e), + })?; + + let mut entries_map: HashMap = HashMap::new(); + while let Some(row) = rows + .next() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })? + { + let full_path = get_text(&row, 0); + let Some((child_name, is_dir, entry_path)) = resolve_entry(&full_path, &dir) else { + continue; + }; + merge_entry( + &mut entries_map, + child_name, + entry_path, + is_dir, + get_opt_ts(&row, 1), + get_opt_text(&row, 2), + ); + } + + let mut entries: Vec = entries_map.into_values().collect(); + entries.sort_by(|a, b| a.path.cmp(&b.path)); + Ok(entries) +} + +pub(super) async fn list_all_paths( + backend: &LibSqlBackend, + scope: &AgentScope<'_>, +) -> Result, WorkspaceError> { + let conn = connect_backend(backend).await?; + let agent_id_str = scope.agent_id.map(|id| id.to_string()); + let rows = conn + .query( + "SELECT path FROM memory_documents WHERE user_id = ?1 AND agent_id IS ?2 ORDER BY path", + params![scope.user_id, agent_id_str.as_deref()], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("List paths failed: {}", e), + })?; + + drain_rows(rows, |row| get_text(&row, 0)).await +} + +pub(super) async fn list_documents( + backend: &LibSqlBackend, + scope: &AgentScope<'_>, +) -> Result, WorkspaceError> { + let conn = connect_backend(backend).await?; + let agent_id_str = scope.agent_id.map(|id| id.to_string()); + let rows = conn + .query( + r#" + SELECT id, user_id, agent_id, path, content, + created_at, updated_at, metadata + FROM memory_documents + WHERE user_id = ?1 AND agent_id IS ?2 + ORDER BY updated_at DESC + "#, + params![scope.user_id, agent_id_str.as_deref()], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })?; + + drain_rows(rows, |row| row_to_memory_document(&row)).await +} diff --git a/src/db/libsql/workspace/document_ops_tests.rs b/src/db/libsql/workspace/document_ops_tests.rs new file mode 100644 index 000000000..acd8ed919 --- /dev/null +++ b/src/db/libsql/workspace/document_ops_tests.rs @@ -0,0 +1,113 @@ +//! Tests for libSQL workspace document helpers. + +use libsql::params; +use uuid::Uuid; + +use super::*; +use crate::db::{NativeDatabase, NativeWorkspaceStore}; + +#[tokio::test] +async fn document_from_row_or_not_found_maps_present_rows() { + let id = Uuid::new_v4(); + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + let conn = backend + .connect() + .await + .expect("failed to open libsql connection"); + let mut rows = conn + .query( + "SELECT ?1, 'default', NULL, 'notes/doc.md', 'hello', '2026-03-07T12:34:56.000Z', '2026-03-07T12:35:56.000Z', '{\"kind\":\"note\"}'", + params![id.to_string()], + ) + .await + .expect("failed to query literal document row"); + let row = rows + .next() + .await + .expect("failed to fetch literal document row") + .expect("expected one literal document row"); + + let document = document_from_row_or_not_found(Some(row), "notes/doc.md", "default") + .expect("present row should map to document"); + + assert_eq!(document.id, id); + assert_eq!(document.user_id, "default"); + assert_eq!(document.path, "notes/doc.md"); + assert_eq!(document.content, "hello"); +} + +#[test] +fn document_from_row_or_not_found_returns_not_found_error() { + let error = document_from_row_or_not_found(None, "notes/missing.md", "default") + .expect_err("missing row should become not-found"); + assert!(matches!( + error, + WorkspaceError::DocumentNotFound { doc_type, user_id } + if doc_type == "notes/missing.md" && user_id == "default" + )); +} + +#[tokio::test] +async fn get_document_by_path_returns_not_found_for_missing_document() { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + backend + .run_migrations() + .await + .expect("failed to run libsql migrations"); + + let error = get_document_by_path( + &backend, + &AgentScope { + user_id: "default", + agent_id: None, + }, + "notes/missing.md", + ) + .await + .expect_err("missing document lookup should fail"); + assert!(matches!( + error, + WorkspaceError::DocumentNotFound { doc_type, user_id } + if doc_type == "notes/missing.md" && user_id == "default" + )); +} + +#[tokio::test] +async fn list_directory_merges_file_and_directory_entries() { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + backend + .run_migrations() + .await + .expect("failed to run libsql migrations"); + backend + .get_or_create_document_by_path("default", None, "notes/alpha.md") + .await + .expect("failed to create alpha doc"); + backend + .get_or_create_document_by_path("default", None, "notes/nested/beta.md") + .await + .expect("failed to create beta doc"); + + let entries = list_directory( + &backend, + &AgentScope { + user_id: "default", + agent_id: None, + }, + "notes", + ) + .await + .expect("failed to list directory"); + + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].path, "notes/alpha.md"); + assert!(!entries[0].is_directory); + assert_eq!(entries[1].path, "notes/nested"); + assert!(entries[1].is_directory); +} diff --git a/src/db/libsql/workspace/fts.rs b/src/db/libsql/workspace/fts.rs new file mode 100644 index 000000000..bd56549d4 --- /dev/null +++ b/src/db/libsql/workspace/fts.rs @@ -0,0 +1,66 @@ +//! Full-text-search helpers for libSQL workspace retrieval. + +#[cfg(test)] +#[path = "fts_tests.rs"] +mod tests; + +use libsql::params; + +use super::super::get_text; +use crate::error::WorkspaceError; +use crate::workspace::RankedResult; + +/// Parameters for a full-text search query. +pub(super) struct FtsSearchParams<'a> { + pub(super) user_id: &'a str, + pub(super) agent_id: Option<&'a str>, + pub(super) query: &'a str, + pub(super) limit: i64, +} + +/// Execute full-text search and return ranked results. +/// +/// Queries the memory_chunks_fts virtual table, joining with memory_chunks +/// and memory_documents to fetch chunk content and document paths. Assigns +/// rank based on result order. +pub(super) async fn fts_ranked_results( + conn: &libsql::Connection, + params: FtsSearchParams<'_>, +) -> Result, WorkspaceError> { + let mut rows = conn + .query( + r#" + SELECT c.id, c.document_id, d.path, c.content + FROM memory_chunks_fts fts + JOIN memory_chunks c ON c._rowid = fts.rowid + JOIN memory_documents d ON d.id = c.document_id + WHERE d.user_id = ?1 AND d.agent_id IS ?2 + AND memory_chunks_fts MATCH ?3 + ORDER BY rank + LIMIT ?4 + "#, + params![params.user_id, params.agent_id, params.query, params.limit], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("FTS query failed: {}", e), + })?; + + let mut results = Vec::new(); + while let Some(row) = rows + .next() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("FTS row fetch failed: {}", e), + })? + { + results.push(RankedResult { + chunk_id: get_text(&row, 0).parse().unwrap_or_default(), + document_id: get_text(&row, 1).parse().unwrap_or_default(), + document_path: get_text(&row, 2), + content: get_text(&row, 3), + rank: results.len() as u32 + 1, + }); + } + Ok(results) +} diff --git a/src/db/libsql/workspace/fts_tests.rs b/src/db/libsql/workspace/fts_tests.rs new file mode 100644 index 000000000..255802369 --- /dev/null +++ b/src/db/libsql/workspace/fts_tests.rs @@ -0,0 +1,79 @@ +//! Tests for libSQL workspace full-text search helpers. + +use super::super::LibSqlBackend; +use super::*; +use crate::db::{InsertChunkParams, NativeDatabase, NativeWorkspaceStore}; + +#[tokio::test] +async fn fts_ranked_results_returns_ranked_matches() { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + backend + .run_migrations() + .await + .expect("failed to run libsql migrations"); + let document = backend + .get_or_create_document_by_path("default", None, "notes/fts.md") + .await + .expect("failed to create FTS document"); + backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 0, + content: "semantic retrieval through full text", + embedding: None, + }) + .await + .expect("failed to insert FTS chunk"); + let conn = backend + .connect() + .await + .expect("failed to open libsql connection"); + + let results = fts_ranked_results( + &conn, + FtsSearchParams { + user_id: "default", + agent_id: None, + query: "semantic", + limit: 5, + }, + ) + .await + .expect("FTS query should succeed"); + + assert_eq!(results.len(), 1); + assert_eq!(results[0].document_path, "notes/fts.md"); + assert_eq!(results[0].content, "semantic retrieval through full text"); + assert_eq!(results[0].rank, 1); +} + +#[tokio::test] +async fn fts_ranked_results_surfaces_query_errors() { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + let conn = backend + .connect() + .await + .expect("failed to open libsql connection"); + + let error = fts_ranked_results( + &conn, + FtsSearchParams { + user_id: "default", + agent_id: None, + query: "semantic", + limit: 5, + }, + ) + .await + .expect_err("FTS query should fail before migrations"); + + assert!(matches!( + error, + crate::error::WorkspaceError::SearchFailed { reason } + if reason.starts_with("FTS query failed:") + )); +} diff --git a/src/db/libsql/workspace/mod.rs b/src/db/libsql/workspace/mod.rs new file mode 100644 index 000000000..caf7bfc92 --- /dev/null +++ b/src/db/libsql/workspace/mod.rs @@ -0,0 +1,231 @@ +//! Workspace-store operations for the libSQL backend. + +mod chunk_ops; +mod document_ops; +mod fts; +#[cfg(test)] +mod tests; +mod vector_search; + +use uuid::Uuid; + +use super::LibSqlBackend; +use crate::db::{HybridSearchParams, InsertChunkParams, NativeWorkspaceStore}; +use crate::error::WorkspaceError; +use crate::workspace::{ + MemoryChunk, MemoryDocument, RankedResult, SearchResult, WorkspaceEntry, reciprocal_rank_fusion, +}; +use chunk_ops::{ + delete_chunks, get_chunks_without_embeddings, insert_chunk, update_chunk_embedding, +}; +use document_ops::{ + AgentScope, delete_document_by_path, get_document_by_id, get_document_by_path, + get_or_create_document_by_path, list_all_paths, list_directory, list_documents, + update_document, +}; +use fts::{FtsSearchParams, fts_ranked_results}; +use vector_search::{ + VectorIndexQuery, VectorSearchOutcome, VectorSearchQuery, vector_ranked_results, +}; + +/// Execute full-text search and log the result count. +/// +/// Delegates to [`fts_ranked_results`] and emits a `DEBUG` trace with the +/// pre-fusion limit for observability. +async fn fetch_fts_results( + conn: &libsql::Connection, + params: FtsSearchParams<'_>, +) -> Result, WorkspaceError> { + let limit = params.limit; + let results = fts_ranked_results(conn, params).await?; + tracing::debug!( + "FTS search returned {} results (pre-fusion limit: {})", + results.len(), + limit, + ); + Ok(results) +} + +impl LibSqlBackend { + /// Execute vector similarity search, falling back to brute-force cosine + /// similarity when the fixed-dimension vector index is unavailable. + async fn fetch_vector_results( + &self, + conn: &libsql::Connection, + index_query: VectorIndexQuery<'_>, + agent_id: Option, + ) -> Result, WorkspaceError> { + let brute_limit = index_query.limit as usize; + let user_id = index_query.user_id; + let embedding = index_query.embedding; + match vector_ranked_results(conn, &index_query).await? { + VectorSearchOutcome::Indexed(results) => Ok(results), + VectorSearchOutcome::IndexUnavailable => { + tracing::info!("Using brute-force vector search (no vector index)"); + self.brute_force_vector_search( + VectorSearchQuery { + user_id, + agent_id, + embedding, + }, + brute_limit, + ) + .await + .map_err(|e| { + tracing::warn!("Brute-force vector search failed: {e}"); + e + }) + } + } + } +} + +impl NativeWorkspaceStore for LibSqlBackend { + async fn get_document_by_path( + &self, + user_id: &str, + agent_id: Option, + path: &str, + ) -> Result { + get_document_by_path(self, &AgentScope { user_id, agent_id }, path).await + } + + async fn get_document_by_id(&self, id: Uuid) -> Result { + get_document_by_id(self, id).await + } + + async fn get_or_create_document_by_path( + &self, + user_id: &str, + agent_id: Option, + path: &str, + ) -> Result { + get_or_create_document_by_path(self, &AgentScope { user_id, agent_id }, path).await + } + + async fn update_document(&self, id: Uuid, content: &str) -> Result<(), WorkspaceError> { + update_document(self, id, content).await + } + + async fn delete_document_by_path( + &self, + user_id: &str, + agent_id: Option, + path: &str, + ) -> Result<(), WorkspaceError> { + delete_document_by_path(self, &AgentScope { user_id, agent_id }, path).await + } + + async fn list_directory( + &self, + user_id: &str, + agent_id: Option, + directory: &str, + ) -> Result, WorkspaceError> { + list_directory(self, &AgentScope { user_id, agent_id }, directory).await + } + + async fn list_all_paths( + &self, + user_id: &str, + agent_id: Option, + ) -> Result, WorkspaceError> { + list_all_paths(self, &AgentScope { user_id, agent_id }).await + } + + async fn list_documents( + &self, + user_id: &str, + agent_id: Option, + ) -> Result, WorkspaceError> { + list_documents(self, &AgentScope { user_id, agent_id }).await + } + + async fn delete_chunks(&self, document_id: Uuid) -> Result<(), WorkspaceError> { + delete_chunks(self, document_id).await + } + + async fn insert_chunk(&self, params: InsertChunkParams<'_>) -> Result { + insert_chunk(self, params).await + } + + async fn update_chunk_embedding( + &self, + chunk_id: Uuid, + embedding: &[f32], + ) -> Result<(), WorkspaceError> { + update_chunk_embedding(self, chunk_id, embedding).await + } + + async fn get_chunks_without_embeddings( + &self, + user_id: &str, + agent_id: Option, + limit: usize, + ) -> Result, WorkspaceError> { + get_chunks_without_embeddings(self, user_id, agent_id, limit).await + } + + async fn hybrid_search( + &self, + params: HybridSearchParams<'_>, + ) -> Result, WorkspaceError> { + let HybridSearchParams { + user_id, + agent_id, + query, + embedding, + config, + } = params; + let conn = self + .connect() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: e.to_string(), + })?; + let agent_id_str = agent_id.map(|id| id.to_string()); + let pre_limit = config.pre_fusion_limit as i64; + + let fts_results = if config.use_fts { + fetch_fts_results( + &conn, + FtsSearchParams { + user_id, + agent_id: agent_id_str.as_deref(), + query, + limit: pre_limit, + }, + ) + .await? + } else { + Vec::new() + }; + + let vector_results = if config.use_vector { + if let Some(emb) = embedding { + self.fetch_vector_results( + &conn, + VectorIndexQuery { + user_id, + agent_id: agent_id_str.as_deref(), + embedding: emb, + limit: pre_limit, + }, + agent_id, + ) + .await? + } else { + Vec::new() + } + } else { + if embedding.is_some() { + tracing::warn!( + "Embedding provided but vector search is disabled in config; using FTS-only results" + ); + } + Vec::new() + }; + + Ok(reciprocal_rank_fusion(fts_results, vector_results, config)) + } +} diff --git a/src/db/libsql/workspace/tests.rs b/src/db/libsql/workspace/tests.rs new file mode 100644 index 000000000..94b9dc9bb --- /dev/null +++ b/src/db/libsql/workspace/tests.rs @@ -0,0 +1,600 @@ +//! Tests for the libSQL workspace-store module split. + +use libsql::params; + +use super::super::LibSqlBackend; +use super::vector_search::{ + VectorIndexQuery, VectorSearchOutcome, VectorSearchQuery, deserialize_embedding, + vector_ranked_results, +}; +use crate::db::{HybridSearchParams, InsertChunkParams, NativeDatabase, NativeWorkspaceStore}; +use crate::workspace::SearchConfig; + +/// Assert that `actual` has the same length as `expected` and that every +/// element is within floating-point tolerance. +fn assert_embedding_approx_eq(actual: &[f32], expected: &[f32]) { + assert_eq!( + actual.len(), + expected.len(), + "embedding length mismatch: got {}, expected {}", + actual.len(), + expected.len(), + ); + for (i, (a, e)) in actual.iter().zip(expected.iter()).enumerate() { + assert!( + (a - e).abs() < 0.001, + "embedding[{i}]: got {a}, expected {e} (tolerance 0.001)", + ); + } +} + +/// Assert that `results` contains exactly one entry whose `document_path`, +/// `fts_rank`, and `vector_rank` match the supplied values. +fn assert_sole_search_result( + results: &[crate::workspace::SearchResult], + expected_path: &str, + expected_fts_rank: Option, + expected_vector_rank: Option, +) { + assert_eq!(results.len(), 1, "expected exactly one search result"); + let r = &results[0]; + assert_eq!(r.document_path, expected_path, "document_path mismatch"); + assert_eq!(r.fts_rank, expected_fts_rank, "fts_rank mismatch"); + assert_eq!(r.vector_rank, expected_vector_rank, "vector_rank mismatch"); +} + +/// Create a temp-file-backed [`LibSqlBackend`] with migrations applied, +/// ready for use in unit tests. +async fn setup_backend() -> LibSqlBackend { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create in-memory libsql backend"); + backend + .run_migrations() + .await + .expect("failed to run libsql migrations"); + backend +} + +/// Create a document at `path` for the default user scope, ready for use in +/// unit tests. +async fn create_test_document( + backend: &LibSqlBackend, + path: &str, +) -> crate::workspace::MemoryDocument { + backend + .get_or_create_document_by_path("default", None, path) + .await + .unwrap_or_else(|e| panic!("failed to create test document at '{path}': {e}")) +} + +/// Assert that `result` is the `DocumentNotFound` error variant. +fn assert_document_not_found(result: Result) { + assert!( + matches!( + result, + Err(crate::error::WorkspaceError::DocumentNotFound { .. }) + ), + "expected DocumentNotFound, got {:?}", + result + ); +} + +#[test] +fn test_deserialize_embedding_valid() { + let floats = [1.0f32, 2.0, 3.0]; + let bytes: Vec = floats.iter().flat_map(|f| f.to_le_bytes()).collect(); + + let result = deserialize_embedding(&bytes); + + assert_embedding_approx_eq(&result, &[1.0, 2.0, 3.0]); +} + +#[test] +fn test_deserialize_embedding_empty() { + let result = deserialize_embedding(&[]); + assert_eq!(result.len(), 0); +} + +#[test] +fn test_deserialize_embedding_invalid_length() { + let result = deserialize_embedding(&[1, 2, 3, 4, 5, 6, 7]); + assert_eq!(result.len(), 0); +} + +#[test] +fn test_deserialize_embedding_single_value() { + let value = 42.5f32; + let bytes = value.to_le_bytes(); + + let result = deserialize_embedding(&bytes); + + assert_eq!(result.len(), 1); + assert!((result[0] - 42.5).abs() < 0.001); +} + +#[test] +fn test_deserialize_embedding_negative_values() { + let floats = [-1.5f32, 0.0, 2.75]; + let bytes: Vec = floats.iter().flat_map(|f| f.to_le_bytes()).collect(); + + let result = deserialize_embedding(&bytes); + + assert_embedding_approx_eq(&result, &[-1.5, 0.0, 2.75]); +} + +#[test] +fn test_embedding_to_vector_json_formats_floats_as_json_array() { + use super::vector_search::embedding_to_vector_json; + + let result = embedding_to_vector_json(&[1.0, -2.5, 0.0]); + + assert!( + result.starts_with('['), + "JSON array must start with '[', got: {result}" + ); + assert!( + result.ends_with(']'), + "JSON array must end with ']', got: {result}" + ); + // The negative float must be preserved faithfully. + assert!( + result.contains("-2.5") || result.contains("-2."), + "must serialise the negative float, got: {result}" + ); + + // An empty slice must produce "[]". + let empty = embedding_to_vector_json(&[]); + assert_eq!(empty, "[]", "empty embedding must serialise as '[]'"); +} + +#[tokio::test] +async fn get_chunks_without_embeddings_skips_invalid_chunk_id_uuid() { + let backend = setup_backend().await; + + let document = backend + .get_or_create_document_by_path("default", None, "notes/bad-chunk-uuid.md") + .await + .expect("failed to create document"); + + let conn = backend.connect().await.expect("failed to connect"); + conn.execute( + "INSERT INTO memory_chunks (id, document_id, chunk_index, content, created_at) \ + VALUES ('not-a-uuid', ?1, 0, 'bad chunk', datetime('now'))", + params![document.id.to_string()], + ) + .await + .expect("failed to insert bad-chunk-id row"); + + backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 1, + content: "valid chunk", + embedding: None, + }) + .await + .expect("failed to insert valid chunk"); + + let chunks = backend + .get_chunks_without_embeddings("default", None, 10) + .await + .expect("get_chunks_without_embeddings must not fail on invalid UUIDs"); + + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].content, "valid chunk"); +} + +#[tokio::test] +async fn get_chunks_without_embeddings_errors_on_negative_chunk_index() { + let backend = setup_backend().await; + + let document = backend + .get_or_create_document_by_path("default", None, "notes/neg-idx.md") + .await + .expect("failed to create document"); + + let conn = backend.connect().await.expect("failed to connect"); + conn.execute( + "INSERT INTO memory_chunks (id, document_id, chunk_index, content, created_at) \ + VALUES (?1, ?2, -1, 'negative index', datetime('now'))", + params![uuid::Uuid::new_v4().to_string(), document.id.to_string()], + ) + .await + .expect("failed to insert negative-index row"); + + let result = backend + .get_chunks_without_embeddings("default", None, 10) + .await; + + assert!( + result.is_err(), + "get_chunks_without_embeddings must return Err for negative chunk_index" + ); +} + +#[tokio::test] +async fn get_document_by_path_returns_not_found_for_missing_document() { + let backend = setup_backend().await; + let result = backend + .get_document_by_path("default", None, "does/not/exist.md") + .await; + assert_document_not_found(result); +} + +#[tokio::test] +async fn get_document_by_id_returns_not_found_for_unknown_id() { + let backend = setup_backend().await; + let result = backend.get_document_by_id(uuid::Uuid::new_v4()).await; + assert_document_not_found(result); +} + +// This test also validates the `collect_vector_index_rows` → +// IndexUnavailable path: the pre-condition assertion confirms +// vector_ranked_results returns IndexUnavailable before the brute-force +// fallback assertions begin. +#[tokio::test] +async fn hybrid_search_uses_brute_force_when_vector_index_is_unavailable() { + let backend = setup_backend().await; + + let document = backend + .get_or_create_document_by_path("default", None, "notes/search.md") + .await + .expect("failed to create search test document"); + backend + .update_document(document.id, "semantic search fallback test") + .await + .expect("failed to update search test document"); + backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 0, + content: "semantic search fallback test", + embedding: Some(&[1.0, 0.0, 0.0]), + }) + .await + .expect("failed to insert search test chunk"); + + let conn = backend + .connect() + .await + .expect("failed to open libsql connection for vector precondition"); + let vector_outcome = vector_ranked_results( + &conn, + &VectorIndexQuery { + user_id: "default", + agent_id: None, + embedding: &[1.0, 0.0, 0.0], + limit: 5, + }, + ) + .await + .expect("failed to run vector search precondition"); + assert!( + matches!(vector_outcome, VectorSearchOutcome::IndexUnavailable), + "Test requires the vector-index-unavailable path before hybrid fallback assertions" + ); + + let results = backend + .hybrid_search(HybridSearchParams { + user_id: "default", + agent_id: None, + query: "semantic", + embedding: Some(&[1.0, 0.0, 0.0]), + config: &SearchConfig::default().with_limit(5), + }) + .await + .expect("failed to execute hybrid search"); + + assert_sole_search_result(&results, "notes/search.md", Some(1), Some(1)); + assert!( + results[0].is_hybrid(), + "brute-force result must be flagged as hybrid" + ); +} + +#[tokio::test] +async fn brute_force_vector_search_skips_mismatched_embedding_dimensions() { + let backend = setup_backend().await; + + let document = backend + .get_or_create_document_by_path("default", None, "notes/mixed-dim.md") + .await + .expect("failed to create mixed-dimension search document"); + backend + .update_document(document.id, "mixed dimension vector search test") + .await + .expect("failed to update mixed-dimension search document"); + backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 0, + content: "same-dimension chunk", + embedding: Some(&[1.0, 0.0, 0.0]), + }) + .await + .expect("failed to insert same-dimension chunk"); + backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 1, + content: "different-dimension chunk", + embedding: Some(&[1.0, 0.0]), + }) + .await + .expect("failed to insert different-dimension chunk"); + + let results = backend + .brute_force_vector_search( + VectorSearchQuery { + user_id: "default", + agent_id: None, + embedding: &[1.0, 0.0, 0.0], + }, + 10, + ) + .await + .expect("failed to run brute-force vector search"); + + assert_eq!(results.len(), 1); + assert_eq!(results[0].content, "same-dimension chunk"); +} + +#[tokio::test] +async fn hybrid_search_returns_fts_only_results_without_embedding() { + let backend = setup_backend().await; + + let document = backend + .get_or_create_document_by_path("default", None, "notes/fts-only.md") + .await + .expect("failed to create FTS-only search document"); + backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 0, + content: "keyword only workspace search", + embedding: None, + }) + .await + .expect("failed to insert FTS-only chunk"); + + let results = backend + .hybrid_search(HybridSearchParams { + user_id: "default", + agent_id: None, + query: "keyword", + embedding: None, + config: &SearchConfig::default().with_limit(5), + }) + .await + .expect("failed to execute FTS-only hybrid search"); + + assert_sole_search_result(&results, "notes/fts-only.md", Some(1), None); +} + +#[tokio::test] +async fn insert_chunk_and_delete_chunks_round_trip() { + let backend = setup_backend().await; + + let document = create_test_document(&backend, "notes/chunks.md").await; + + let chunk_id = backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 0, + content: "round-trip chunk", + embedding: None, + }) + .await + .expect("failed to insert chunk"); + + let before = backend + .get_chunks_without_embeddings("default", None, 10) + .await + .expect("failed to list chunks before delete"); + assert!( + before.iter().any(|c| c.id == chunk_id), + "inserted chunk must appear in get_chunks_without_embeddings" + ); + + backend + .delete_chunks(document.id) + .await + .expect("failed to delete chunks"); + + let after = backend + .get_chunks_without_embeddings("default", None, 10) + .await + .expect("failed to list chunks after delete"); + assert!( + after.iter().all(|c| c.id != chunk_id), + "deleted chunk must not appear after delete_chunks" + ); +} + +#[tokio::test] +async fn update_chunk_embedding_is_reflected_in_chunks_list() { + let backend = setup_backend().await; + + let document = create_test_document(&backend, "notes/embed-update.md").await; + + let chunk_id = backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 0, + content: "embedding update test", + embedding: None, + }) + .await + .expect("failed to insert chunk"); + + let before = backend + .get_chunks_without_embeddings("default", None, 10) + .await + .expect("failed to list chunks before embedding update"); + assert!( + before.iter().any(|c| c.id == chunk_id), + "chunk without embedding must appear before update" + ); + + backend + .update_chunk_embedding(chunk_id, &[0.1, 0.2, 0.3]) + .await + .expect("failed to update chunk embedding"); + + let after = backend + .get_chunks_without_embeddings("default", None, 10) + .await + .expect("failed to list chunks after embedding update"); + assert!( + after.iter().all(|c| c.id != chunk_id), + "chunk with embedding must not appear in get_chunks_without_embeddings" + ); +} + +#[tokio::test] +async fn get_or_create_document_by_path_is_idempotent() { + let backend = setup_backend().await; + + let first = backend + .get_or_create_document_by_path("default", None, "notes/idempotent.md") + .await + .expect("failed to create document on first call"); + let second = backend + .get_or_create_document_by_path("default", None, "notes/idempotent.md") + .await + .expect("failed to get document on second call"); + + assert_eq!(first.id, second.id, "get_or_create must return the same id"); +} + +#[tokio::test] +async fn update_document_changes_content() { + let backend = setup_backend().await; + + let document = backend + .get_or_create_document_by_path("default", None, "notes/update.md") + .await + .expect("failed to create document"); + backend + .update_document(document.id, "updated content") + .await + .expect("failed to update document content"); + + let fetched = backend + .get_document_by_id(document.id) + .await + .expect("failed to fetch updated document"); + assert_eq!( + fetched.content, "updated content", + "document content must reflect update" + ); +} + +#[tokio::test] +async fn delete_document_by_path_removes_document_and_chunks() { + let backend = setup_backend().await; + + let document = backend + .get_or_create_document_by_path("default", None, "notes/delete-me.md") + .await + .expect("failed to create document"); + backend + .insert_chunk(InsertChunkParams { + document_id: document.id, + chunk_index: 0, + content: "to be deleted", + embedding: None, + }) + .await + .expect("failed to insert chunk"); + + backend + .delete_document_by_path("default", None, "notes/delete-me.md") + .await + .expect("failed to delete document"); + + let result = backend + .get_document_by_path("default", None, "notes/delete-me.md") + .await; + assert_document_not_found(result); + + let chunks = backend + .get_chunks_without_embeddings("default", None, 10) + .await + .expect("failed to list chunks after document deletion"); + assert!( + chunks.iter().all(|c| c.document_id != document.id), + "chunks belonging to deleted document must be removed" + ); +} + +#[tokio::test] +async fn list_all_paths_returns_inserted_document_path() { + let backend = setup_backend().await; + + create_test_document(&backend, "notes/listed.md").await; + + let paths = backend + .list_all_paths("default", None) + .await + .expect("failed to list all paths"); + + assert!( + paths.contains(&"notes/listed.md".to_string()), + "list_all_paths must include inserted document path" + ); +} + +#[tokio::test] +async fn list_documents_returns_inserted_document() { + let backend = setup_backend().await; + + let document = create_test_document(&backend, "notes/listed-doc.md").await; + + let docs = backend + .list_documents("default", None) + .await + .expect("failed to list documents"); + + assert!( + docs.iter().any(|d| d.id == document.id), + "list_documents must include inserted document" + ); +} + +#[tokio::test] +async fn list_directory_returns_immediate_children_only() { + let backend = setup_backend().await; + + backend + .get_or_create_document_by_path("default", None, "notes/dir/child.md") + .await + .expect("failed to create child document"); + backend + .get_or_create_document_by_path("default", None, "notes/dir/sub/deep.md") + .await + .expect("failed to create deeply nested document"); + + let entries = backend + .list_directory("default", None, "notes/dir") + .await + .expect("failed to list directory"); + + assert!( + entries + .iter() + .any(|e| !e.is_directory && e.path.ends_with("child.md")), + "list_directory must include the direct file child" + ); + assert!( + entries + .iter() + .any(|e| e.is_directory && e.path.ends_with("sub")), + "list_directory must include the sub-directory child" + ); + assert!( + entries.iter().all(|e| !e.path.ends_with("deep.md")), + "list_directory must not include deeply nested files" + ); +} diff --git a/src/db/libsql/workspace/vector_search.rs b/src/db/libsql/workspace/vector_search.rs new file mode 100644 index 000000000..35d08996c --- /dev/null +++ b/src/db/libsql/workspace/vector_search.rs @@ -0,0 +1,318 @@ +//! Vector-search helpers for libSQL workspace retrieval. + +#[cfg(test)] +#[path = "vector_search_tests.rs"] +mod tests; + +use libsql::params; +use uuid::Uuid; + +use super::super::{LibSqlBackend, get_text}; +use crate::error::WorkspaceError; +use crate::workspace::{RankedResult, cosine_similarity}; + +struct Candidate { + chunk_id: Uuid, + document_id: Uuid, + document_path: String, + content: String, + similarity: f32, +} + +pub(super) enum VectorSearchOutcome { + Indexed(Vec), + IndexUnavailable, +} + +/// Scoped query parameters shared by vector-search helpers. +/// +/// Bundles the user/agent scope with the query embedding so callers +/// pass a single cohesive object rather than three separate arguments. +pub(super) struct VectorSearchQuery<'a> { + pub(super) user_id: &'a str, + pub(super) agent_id: Option, + pub(super) embedding: &'a [f32], +} + +fn is_missing_vector_index_error(error: &libsql::Error) -> bool { + let sqlite_message = match error { + libsql::Error::SqliteFailure(_, message) + | libsql::Error::RemoteSqliteFailure(_, _, message) => message, + _ => return false, + }; + + let error_message = sqlite_message.to_ascii_lowercase(); + + error_message.contains("vector_top_k") + || error_message.contains("no such function") + || error_message.contains("idx_memory_chunks_embedding") + || error_message.contains("failed to parse vector index parameters") +} + +fn rank_candidates(mut candidates: Vec, limit: usize) -> Vec { + candidates.sort_by(|a, b| { + b.similarity + .partial_cmp(&a.similarity) + .unwrap_or(std::cmp::Ordering::Equal) + .then_with(|| a.chunk_id.cmp(&b.chunk_id)) + }); + + let total_candidates = candidates.len(); + let results: Vec<_> = candidates + .into_iter() + .take(limit) + .enumerate() + .map(|(idx, c)| RankedResult { + chunk_id: c.chunk_id, + document_id: c.document_id, + document_path: c.document_path, + content: c.content, + rank: (idx + 1) as u32, + }) + .collect(); + + tracing::debug!( + "Brute-force vector search scanned {} candidates, returned {} results", + total_candidates, + results.len() + ); + + results +} + +/// Deserialize an embedding from a BLOB (4-byte little-endian f32 values). +/// +/// Returns an empty vector if the blob length is not a multiple of 4. +pub(super) fn deserialize_embedding(blob: &[u8]) -> Vec { + if !blob.len().is_multiple_of(4) { + tracing::warn!( + "Embedding blob length {} is not a multiple of 4; skipping", + blob.len() + ); + return Vec::new(); + } + + blob.chunks_exact(4) + .map(|chunk| { + let bytes = [chunk[0], chunk[1], chunk[2], chunk[3]]; + f32::from_le_bytes(bytes) + }) + .collect() +} + +impl LibSqlBackend { + async fn collect_candidates( + &self, + rows: &mut libsql::Rows, + query_embedding: &[f32], + ) -> Result, WorkspaceError> { + let mut candidates = Vec::new(); + let mut skipped_mismatched_dims = 0usize; + while let Some(row) = rows + .next() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Row fetch failed: {}", e), + })? + { + let chunk_id: Uuid = match get_text(&row, 0).parse() { + Ok(id) => id, + Err(e) => { + tracing::warn!("Invalid chunk_id UUID in memory_chunks: {e}"); + continue; + } + }; + let document_id: Uuid = match get_text(&row, 1).parse() { + Ok(id) => id, + Err(e) => { + tracing::warn!("Invalid document_id UUID in memory_chunks: {e}"); + continue; + } + }; + let document_path = get_text(&row, 2); + let content = get_text(&row, 3); + let embedding_blob = match row.get_value(4) { + Ok(libsql::Value::Blob(bytes)) => bytes, + _ => continue, + }; + let chunk_embedding = deserialize_embedding(&embedding_blob); + if chunk_embedding.is_empty() { + continue; + } + if chunk_embedding.len() != query_embedding.len() { + skipped_mismatched_dims += 1; + continue; + } + + let similarity = cosine_similarity(query_embedding, &chunk_embedding); + candidates.push(Candidate { + chunk_id, + document_id, + document_path, + content, + similarity, + }); + } + + if skipped_mismatched_dims > 0 { + tracing::debug!( + "Brute-force vector search skipped {} candidates with embedding dimension mismatches (query dimension: {})", + skipped_mismatched_dims, + query_embedding.len() + ); + } + + Ok(candidates) + } + + /// Brute-force vector search using cosine similarity in Rust. + /// + /// Loads all chunks with embeddings for the given user/agent, computes + /// cosine similarity against the query embedding, and returns the top + /// matches. This is used as a fallback when the vector index is not + /// available (post-V9 migration). + pub(super) async fn brute_force_vector_search( + &self, + query: VectorSearchQuery<'_>, + limit: usize, + ) -> Result, WorkspaceError> { + let conn = self + .connect() + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: e.to_string(), + })?; + let agent_id_str = query.agent_id.map(|id| id.to_string()); + let mut rows = conn + .query( + r#" + SELECT c.id, c.document_id, d.path, c.content, c.embedding + FROM memory_chunks c + JOIN memory_documents d ON d.id = c.document_id + WHERE d.user_id = ?1 AND d.agent_id IS ?2 + AND c.embedding IS NOT NULL + "#, + params![query.user_id, agent_id_str.as_deref()], + ) + .await + .map_err(|e| WorkspaceError::SearchFailed { + reason: format!("Query failed: {}", e), + })?; + + let candidates = self.collect_candidates(&mut rows, query.embedding).await?; + Ok(rank_candidates(candidates, limit)) + } +} + +pub(super) fn embedding_to_vector_json(embedding: &[f32]) -> String { + format!( + "[{}]", + embedding + .iter() + .map(|f| f.to_string()) + .collect::>() + .join(",") + ) +} + +async fn collect_vector_index_rows( + mut rows: libsql::Rows, + limit: i64, +) -> Result { + let mut results = Vec::new(); + while let Some(row) = match rows.next().await { + Ok(row) => row, + Err(e) => { + if is_missing_vector_index_error(&e) { + tracing::debug!( + "Vector index row fetch failed, brute-force fallback required: {e}" + ); + return Ok(VectorSearchOutcome::IndexUnavailable); + } + + return Err(WorkspaceError::SearchFailed { + reason: format!("Vector index row fetch failed: {e}"), + }); + } + } { + let chunk_id: Uuid = match get_text(&row, 0).parse() { + Ok(id) => id, + Err(e) => { + tracing::warn!("Invalid chunk_id UUID in memory_chunks: {e}"); + continue; + } + }; + let document_id: Uuid = match get_text(&row, 1).parse() { + Ok(id) => id, + Err(e) => { + tracing::warn!("Invalid document_id UUID in memory_documents: {e}"); + continue; + } + }; + results.push(RankedResult { + chunk_id, + document_id, + document_path: get_text(&row, 2), + content: get_text(&row, 3), + rank: results.len() as u32 + 1, + }); + } + tracing::debug!( + "libSQL vector index search returned {} results (pre-fusion limit: {})", + results.len(), + limit + ); + Ok(VectorSearchOutcome::Indexed(results)) +} + +/// Parameters for a vector-index similarity query. +/// +/// Groups the search-intent arguments for [`vector_ranked_results`] to keep +/// its arity within the project limit of four. +pub(super) struct VectorIndexQuery<'a> { + pub(super) user_id: &'a str, + pub(super) agent_id: Option<&'a str>, + pub(super) embedding: &'a [f32], + pub(super) limit: i64, +} + +/// Execute vector similarity search via libSQL's vector index. +/// +/// Returns [`VectorSearchOutcome::IndexUnavailable`] when `vector_top_k(...)` +/// cannot run because the fixed-dimension vector index is missing, which is +/// the expected state after the V9 flexible-dimension migration. +pub(super) async fn vector_ranked_results( + conn: &libsql::Connection, + query: &VectorIndexQuery<'_>, +) -> Result { + let vector_json = embedding_to_vector_json(query.embedding); + + match conn + .query( + r#" + SELECT c.id, c.document_id, d.path, c.content + FROM vector_top_k('idx_memory_chunks_embedding', vector(?1), ?2) AS top_k + JOIN memory_chunks c ON c._rowid = top_k.id + JOIN memory_documents d ON d.id = c.document_id + WHERE d.user_id = ?3 AND d.agent_id IS ?4 + "#, + params![vector_json, query.limit, query.user_id, query.agent_id], + ) + .await + { + Ok(rows) => collect_vector_index_rows(rows, query.limit).await, + Err(e) => { + if is_missing_vector_index_error(&e) { + tracing::debug!( + "Vector index query failed (expected after V9 migration), \ + brute-force fallback required: {e}" + ); + Ok(VectorSearchOutcome::IndexUnavailable) + } else { + Err(WorkspaceError::SearchFailed { + reason: format!("Vector index query failed: {e}"), + }) + } + } + } +} diff --git a/src/db/libsql/workspace/vector_search_tests.rs b/src/db/libsql/workspace/vector_search_tests.rs new file mode 100644 index 000000000..03f7ef79c --- /dev/null +++ b/src/db/libsql/workspace/vector_search_tests.rs @@ -0,0 +1,83 @@ +//! Tests for libSQL workspace vector-search helpers. + +use libsql::params; +use uuid::Uuid; + +use super::super::LibSqlBackend; +use super::*; + +#[test] +fn embedding_to_vector_json_serialises_embeddings_in_index_format() { + assert_eq!( + embedding_to_vector_json(&[1.0, -2.5, 0.25]), + "[1,-2.5,0.25]" + ); +} + +#[test] +fn rank_candidates_breaks_similarity_ties_by_chunk_id() { + let earlier_chunk = Uuid::from_u128(1); + let later_chunk = Uuid::from_u128(2); + + let results = rank_candidates( + vec![ + Candidate { + chunk_id: later_chunk, + document_id: Uuid::new_v4(), + document_path: "notes/later.md".to_string(), + content: "later".to_string(), + similarity: 0.9, + }, + Candidate { + chunk_id: earlier_chunk, + document_id: Uuid::new_v4(), + document_path: "notes/earlier.md".to_string(), + content: "earlier".to_string(), + similarity: 0.9, + }, + ], + 2, + ); + + assert_eq!(results.len(), 2); + assert_eq!(results[0].chunk_id, earlier_chunk); + assert_eq!(results[0].rank, 1); + assert_eq!(results[1].chunk_id, later_chunk); + assert_eq!(results[1].rank, 2); +} + +#[tokio::test] +async fn collect_vector_index_rows_skips_rows_with_invalid_uuids() { + let backend = LibSqlBackend::new_memory() + .await + .expect("failed to create temp-file-backed backend"); + let conn = backend + .connect() + .await + .expect("failed to open libsql connection"); + let valid_chunk_id = Uuid::new_v4(); + let valid_document_id = Uuid::new_v4(); + let rows = conn + .query( + "SELECT ?1, ?2, 'notes/good.md', 'good chunk' UNION ALL SELECT 'bad-uuid', ?3, 'notes/bad.md', 'bad chunk'", + params![ + valid_chunk_id.to_string(), + valid_document_id.to_string(), + valid_document_id.to_string() + ], + ) + .await + .expect("failed to query synthetic vector rows"); + + let outcome = collect_vector_index_rows(rows, 5) + .await + .expect("vector row collection should succeed"); + + let VectorSearchOutcome::Indexed(results) = outcome else { + panic!("expected indexed vector-search outcome"); + }; + assert_eq!(results.len(), 1); + assert_eq!(results[0].chunk_id, valid_chunk_id); + assert_eq!(results[0].document_id, valid_document_id); + assert_eq!(results[0].rank, 1); +} diff --git a/src/db/mod.rs b/src/db/mod.rs index d1933761c..5cb87d253 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -88,7 +88,7 @@ pub struct DatabaseHandles { #[cfg(feature = "postgres")] pub pg_pool: Option, #[cfg(feature = "libsql")] - pub libsql_db: Option>, + pub libsql_db: Option>, } /// Connect to the database, run migrations, and return both the generic diff --git a/src/orchestrator/api/tests/catalogue_fidelity.rs b/src/orchestrator/api/tests/catalogue_fidelity.rs index 8ce1b47c4..c5ae1480c 100644 --- a/src/orchestrator/api/tests/catalogue_fidelity.rs +++ b/src/orchestrator/api/tests/catalogue_fidelity.rs @@ -76,7 +76,9 @@ async fn remote_tool_catalog_version_is_deterministic_and_sensitive_to_content() let registry_c = Arc::new(ToolRegistry::new()); registry_c - .register(build_tool_fixture(ToolFixture::CatalogBeta)) + .register(build_tool_fixture( + ToolFixture::CatalogAlphaWithDifferentPayload, + )) .await; let (_tools_a, _instructions_a, version_a) = hosted_remote_tool_catalog(®istry_a).await; @@ -89,7 +91,7 @@ async fn remote_tool_catalog_version_is_deterministic_and_sensitive_to_content() ); assert_ne!( version_a, version_c, - "different tool sets must produce different catalog versions" + "different catalogue payloads must produce different catalog versions" ); } diff --git a/src/orchestrator/api/tests/fixtures/remote_tool_mocks.rs b/src/orchestrator/api/tests/fixtures/remote_tool_mocks.rs index a7019ee6e..7c4c98517 100644 --- a/src/orchestrator/api/tests/fixtures/remote_tool_mocks.rs +++ b/src/orchestrator/api/tests/fixtures/remote_tool_mocks.rs @@ -113,11 +113,12 @@ impl NativeTool for StubTool { #[derive(Clone, Copy, Debug)] /// Shared hosted-remote-tool fixture presets for catalogue and execute tests. /// -/// `CatalogAlpha`, `CatalogBeta`, and `CatalogWasm` model hosted-safe -/// catalogue entries. `ApprovalGated` models a hosted tool that must never -/// execute without approval. +/// `CatalogAlpha`, `CatalogAlphaWithDifferentPayload`, `CatalogBeta`, and +/// `CatalogWasm` model hosted-safe catalogue entries. `ApprovalGated` models +/// a hosted tool that must never execute without approval. pub(crate) enum ToolFixture { CatalogAlpha, + CatalogAlphaWithDifferentPayload, CatalogBeta, CatalogWasm, ApprovalGated, @@ -139,6 +140,18 @@ pub(crate) fn build_tool_fixture(kind: ToolFixture) -> Arc { "required":["query"] }), )) as Arc, + ToolFixture::CatalogAlphaWithDifferentPayload => Arc::new(StubTool::hosted( + "remote_tool_catalog_fixture", + "Hosted-safe tool for catalog tests with updated payload", + serde_json::json!({ + "type":"object", + "properties":{ + "query":{"type":"string","description":"search query"}, + "limit":{"type":"integer","minimum":1} + }, + "required":["query", "limit"] + }), + )) as Arc, ToolFixture::CatalogBeta => Arc::new(StubTool::hosted( "remote_tool_catalog_fixture_beta", "Second hosted-safe tool for catalog tests", diff --git a/src/reload/manager.rs b/src/reload/manager.rs index c384f6eee..24ecbc29f 100644 --- a/src/reload/manager.rs +++ b/src/reload/manager.rs @@ -175,7 +175,7 @@ impl HotReloadManager { let is_running = controller.is_running().await; let old_addr = controller.current_addr().await; - if is_running && resolved_addrs.contains(&old_addr) { + if is_running && Self::listener_matches_resolved_addr(old_addr, &resolved_addrs) { tracing::debug!("HTTP listener address unchanged, skipping restart"); return Ok(()); } @@ -220,6 +220,16 @@ impl HotReloadManager { Err(last_error.expect("at least one candidate address").into()) } + fn listener_matches_resolved_addr( + current_addr: SocketAddr, + resolved_addrs: &[SocketAddr], + ) -> bool { + resolved_addrs.iter().any(|resolved_addr| { + *resolved_addr == current_addr + || (resolved_addr.port() == 0 && resolved_addr.ip() == current_addr.ip()) + }) + } + async fn update_channel_secrets(&self, http: &crate::config::HttpConfig) { let new_secret = http.webhook_secret.clone(); diff --git a/src/reload/manager/tests/restart_tests.rs b/src/reload/manager/tests/restart_tests.rs index 163372ec3..71267b169 100644 --- a/src/reload/manager/tests/restart_tests.rs +++ b/src/reload/manager/tests/restart_tests.rs @@ -281,6 +281,38 @@ async fn maybe_restart_listener_skips_restart_when_current_addr_matches_non_firs Ok(()) } +#[tokio::test] +async fn maybe_restart_listener_skips_restart_for_ephemeral_bind_on_same_host() +-> Result<(), Box> { + let current_addr: SocketAddr = "127.0.0.1:43123".parse().expect("valid socket address"); + let controller = Arc::new(StubListenerController::new(current_addr)); + let controller_clone = Arc::clone(&controller); + + let http_cfg = http_config("127.0.0.1", 0, None); + + let (_temp_dir, config) = test_config_with_http(None).await?; + let loader = Arc::new(StubConfigLoader::new_success(config)); + + let manager = HotReloadManager::new( + loader as Arc, + Some(controller as Arc), + None, + Vec::new(), + ); + + manager + .maybe_restart_listener(&http_cfg) + .await + .expect("ephemeral binds on the same host should not force a restart"); + + assert_eq!( + controller_clone.restart_calls().await.len(), + 0, + "listener should not restart when config still requests the same host on port 0" + ); + Ok(()) +} + #[tokio::test] async fn maybe_restart_listener_retries_multiple_candidates_until_success() -> Result<(), Box> { diff --git a/src/secrets/store.rs b/src/secrets/store.rs index 5d1fcf378..4d8962f9a 100644 --- a/src/secrets/store.rs +++ b/src/secrets/store.rs @@ -472,14 +472,14 @@ fn row_to_secret(row: &tokio_postgres::Row) -> Secret { /// matching the connection-per-request pattern used by the main `LibSqlBackend`. #[cfg(feature = "libsql")] pub struct LibSqlSecretsStore { - db: Arc, + db: Arc, crypto: Arc, } #[cfg(feature = "libsql")] impl LibSqlSecretsStore { /// Create a new store with the given shared libsql database handle and crypto instance. - pub fn new(db: Arc, crypto: Arc) -> Self { + pub fn new(db: Arc, crypto: Arc) -> Self { Self { db, crypto } } @@ -487,10 +487,8 @@ impl LibSqlSecretsStore { let conn = self .db .connect() - .map_err(|e| SecretError::Database(format!("Connection failed: {}", e)))?; - conn.query("PRAGMA busy_timeout = 5000", ()) .await - .map_err(|e| SecretError::Database(format!("Failed to set busy_timeout: {}", e)))?; + .map_err(|e| SecretError::Database(format!("Connection failed: {}", e)))?; Ok(conn) } } diff --git a/src/test_support.rs b/src/test_support.rs index 86711c46b..37db41365 100644 --- a/src/test_support.rs +++ b/src/test_support.rs @@ -9,7 +9,7 @@ use crate::llm::ToolDefinition; /// Returns the canonical complex parameters JSON schema used for fidelity testing. /// /// This schema exercises nested objects, arrays, enums, constraints, and various -/// JSON Schema features to validate that tool definitions survive serialization, +/// JSON Schema features to validate that tool definitions survive serialisation, /// transport, and reconstruction without data loss. pub fn complex_tool_definition_parameters() -> serde_json::Value { serde_json::json!({ diff --git a/src/tools/wasm/storage.rs b/src/tools/wasm/storage.rs index 33c7d5bce..0495eb014 100644 --- a/src/tools/wasm/storage.rs +++ b/src/tools/wasm/storage.rs @@ -703,12 +703,12 @@ fn row_to_tool(row: &tokio_postgres::Row) -> Result, + db: std::sync::Arc, } #[cfg(feature = "libsql")] impl LibSqlWasmToolStore { - pub fn new(db: std::sync::Arc) -> Self { + pub fn new(db: std::sync::Arc) -> Self { Self { db } } @@ -716,12 +716,8 @@ impl LibSqlWasmToolStore { let conn = self .db .connect() - .map_err(|e| WasmStorageError::Database(format!("Connection failed: {}", e)))?; - conn.query("PRAGMA busy_timeout = 5000", ()) .await - .map_err(|e| { - WasmStorageError::Database(format!("Failed to set busy_timeout: {}", e)) - })?; + .map_err(|e| WasmStorageError::Database(format!("Connection failed: {}", e)))?; Ok(conn) } } diff --git a/src/workspace/README.md b/src/workspace/README.md index 2b3ee5b48..0fe1a07c0 100644 --- a/src/workspace/README.md +++ b/src/workspace/README.md @@ -1,17 +1,21 @@ # Workspace & Memory System -Inspired by [OpenClaw](https://github.com/openclaw/openclaw), the workspace provides persistent memory for agents with a flexible filesystem-like structure. +Inspired by [OpenClaw](https://github.com/openclaw/openclaw), the workspace +provides persistent memory for agents with a flexible filesystem-like +structure. ## Key Principles -1. **"Memory is database, not RAM"** - If you want to remember something, write it explicitly +1. **"Memory is database, not RAM"** - If you want to remember something, + write it explicitly 2. **Flexible structure** - Create any directory/file hierarchy you need 3. **Self-documenting** - Use README.md files to describe directory structure -4. **Hybrid search** - Combines FTS (keyword) + vector (semantic) via Reciprocal Rank Fusion +4. **Hybrid search** - Combines FTS (keyword) + vector (semantic) via + Reciprocal Rank Fusion ## Filesystem Structure -``` +```text workspace/ ├── README.md <- Root runbook/index ├── MEMORY.md <- Long-term curated memory @@ -67,24 +71,31 @@ let prompt = workspace.system_prompt().await?; Four tools for LLM use: -- **`memory_search`** - Hybrid search, MUST be called before answering questions about prior work +- **`memory_search`** - Hybrid search, MUST be called before answering + questions about prior work - **`memory_write`** - Write to any path (memory, daily_log, or custom paths) - **`memory_read`** - Read any file by path -- **`memory_tree`** - View workspace structure as a tree (depth parameter, default 1) +- **`memory_tree`** - View workspace structure as a tree (depth parameter, + default 1) ## Hybrid Search (RRF) Combines full-text search and vector similarity using Reciprocal Rank Fusion: -``` +```text score(d) = Σ 1/(k + rank(d)) for each method where d appears ``` -Default k=60. Results from both methods are combined, with documents appearing in both getting boosted scores. +Default k=60. Results from both methods are combined, with documents +appearing in both getting boosted scores. **Backend differences:** -- **PostgreSQL:** `ts_rank_cd` for FTS, pgvector cosine distance for vectors, full RRF -- **libSQL:** FTS5 for keyword search only (vector search via `libsql_vector_idx` not yet wired) + +- **PostgreSQL:** `ts_rank_cd` for FTS, pgvector cosine distance for vectors, + full RRF +- **libSQL:** FTS5 plus vector search; uses `vector_top_k(...)` when a + compatible fixed-dimension index exists, otherwise brute-force cosine + similarity in Rust ## Heartbeat System @@ -108,6 +119,7 @@ spawn_heartbeat(config, workspace, llm, response_tx); ## Chunking Strategy Documents are chunked for search indexing: + - Default: 800 words per chunk (roughly 800 tokens for English) - 15% overlap between chunks for context preservation - Minimum chunk size: 50 words (tiny trailing chunks merge with previous)