diff --git a/colgrep/src/index/mod.rs b/colgrep/src/index/mod.rs index d4ea4e9..891c4b3 100644 --- a/colgrep/src/index/mod.rs +++ b/colgrep/src/index/mod.rs @@ -86,6 +86,18 @@ fn delete_from_index_counted(ids: &[i64], index_path: &str) -> Result { /// surviving documents, so interleaving reads and deletes would invalidate the IDs that haven't /// been deleted yet. Returns the number of documents removed. fn delete_files_from_index(index_path: &str, files: &[PathBuf]) -> Result { + delete_files_from_index_inner(index_path, files, true) +} + +fn delete_files_from_index_no_fts_rebuild(index_path: &str, files: &[PathBuf]) -> Result { + delete_files_from_index_inner(index_path, files, false) +} + +fn delete_files_from_index_inner( + index_path: &str, + files: &[PathBuf], + rebuild_fts: bool, +) -> Result { if files.is_empty() { return Ok(0); } @@ -136,10 +148,14 @@ fn delete_files_from_index(index_path: &str, files: &[PathBuf]) -> Result }; filtering::delete(index_path, &ids)?; + + if !rebuild_fts { + return Ok(ids.len()); + } + if is_suffix_delete { next_plaid::text_search::delete(index_path, &valid)?; } else { - // Survivor IDs shifted; FTS5 rowids no longer match METADATA. next_plaid::text_search::rebuild(index_path)?; } Ok(ids.len()) @@ -717,7 +733,13 @@ fn flush_update_batch( } let _guard = CriticalSectionGuard::new(); - let (_, doc_ids) = MmapIndex::update_or_create(embeddings, index_path, config, update_config)?; + let index_exists = Path::new(index_path).join("metadata.json").exists(); + let doc_ids = if index_exists { + MmapIndex::update_append(embeddings, index_path, update_config)? + } else { + let (_, ids) = MmapIndex::update_or_create(embeddings, index_path, config, update_config)?; + ids + }; embeddings.clear(); sender @@ -1909,7 +1931,9 @@ impl IndexBuilder { // Deferred from earlier to minimize the window where data is missing // from the index (for concurrent readers and interrupt safety). Batched // into a single index rewrite — see delete_files_from_index / issue #116. - delete_files_from_index(index_path, &files_changed)?; + // Skip FTS5 rebuild: we're about to re-add these files immediately via + // the encoding pipeline, which rebuilds their FTS5 entries. + delete_files_from_index_no_fts_rebuild(index_path, &files_changed)?; let sorted_units = prepare_units_for_encoding(&new_units, index_chunk_size); let was_interrupted = self.run_encoding_pipeline( @@ -2288,7 +2312,8 @@ impl IndexBuilder { } // Idempotent resume: clear any partial docs a prior interrupted run wrote for these // files, in a single batched index rewrite (issue #116). - delete_files_from_index(index_path, batch_files)?; + // Skip FTS5 rebuild: we're about to re-add these files immediately. + delete_files_from_index_no_fts_rebuild(index_path, batch_files)?; self.ensure_model_created(batch_units.len())?; let pool_factor = self.resolve_pool_factor(batch_units.len()); let sorted_units = prepare_units_for_encoding(batch_units, index_chunk_size); @@ -2501,7 +2526,14 @@ impl IndexBuilder { // 1. Delete chunks for deleted files (safe — not re-adding these). Batched into a // single index rewrite — see delete_files_from_index / issue #116. - delete_files_from_index(index_path, &plan.deleted)?; + // Skip FTS5 rebuild here when changed files will trigger encoding: the + // final rebuild after encoding covers both deletions and re-adds in one pass. + let has_changes_to_encode = !plan.added.is_empty() || !plan.changed.is_empty(); + if has_changes_to_encode { + delete_files_from_index_no_fts_rebuild(index_path, &plan.deleted)?; + } else { + delete_files_from_index(index_path, &plan.deleted)?; + } // Remove deleted files from state for path in &plan.deleted { @@ -2576,12 +2608,13 @@ impl IndexBuilder { // Delete stale index entries for skipped files that were previously indexed // (e.g., files that became unreadable due to invalid UTF-8). Batched into one rewrite. + // Skip FTS rebuild: covered by the final rebuild after encoding. let stale_skipped: Vec = skipped_files .iter() .filter(|p| plan.changed.contains(p)) .cloned() .collect(); - let _ = delete_files_from_index(index_path, &stale_skipped); + let _ = delete_files_from_index_no_fts_rebuild(index_path, &stale_skipped); // 3. Add new units to index let mut was_interrupted = false; @@ -2624,7 +2657,9 @@ impl IndexBuilder { // Deferred from earlier to minimize the window where data is missing // from the index (for concurrent readers and interrupt safety). Batched // into a single index rewrite — see delete_files_from_index / issue #116. - delete_files_from_index(index_path, &plan.changed)?; + // Skip FTS5 rebuild: the encoding pipeline re-adds FTS entries for the + // new versions; we do a single rebuild after encoding completes. + delete_files_from_index_no_fts_rebuild(index_path, &plan.changed)?; let sorted_units = prepare_units_for_encoding(&new_units, index_chunk_size); let pipeline_interrupted = self.run_encoding_pipeline( diff --git a/next-plaid/src/delete.rs b/next-plaid/src/delete.rs index 71bdf68..fa0b7a5 100644 --- a/next-plaid/src/delete.rs +++ b/next-plaid/src/delete.rs @@ -6,7 +6,7 @@ //! - IVF full rebuild after deletion //! - Metadata synchronization -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use std::fs::File; use std::io::{BufReader, BufWriter}; use std::path::Path; @@ -184,63 +184,52 @@ fn delete_from_index_impl(doc_ids: &[i64], index_path: &str, clean_buffer: bool) current_doc_offset += doclens.len() as i64; } - // Rebuild IVF from all remaining codes - // First, collect all codes from all chunks - let mut all_codes: Vec = Vec::with_capacity(total_embeddings); - - for chunk_idx in 0..num_chunks { - let codes_path = index_dir.join(format!("{}.codes.npy", chunk_idx)); - let chunk_codes: Array1 = - Array1::read_npy(File::open(&codes_path).map_err(|e| { - Error::Delete(format!("Failed to read codes for IVF rebuild: {}", e)) - })?)?; - all_codes.extend(chunk_codes.iter()); - } - - // Build IVF: map centroid -> list of document IDs - // We need to re-read doclens to map embeddings back to documents - let mut code_to_docs: BTreeMap> = BTreeMap::new(); - let mut emb_idx = 0; - let mut doc_id: i64 = 0; - - for chunk_idx in 0..num_chunks { - let doclens_path = index_dir.join(format!("doclens.{}.json", chunk_idx)); - let doclens: Vec = - serde_json::from_reader(BufReader::new(File::open(&doclens_path)?))?; - - for &len in &doclens { - for _ in 0..len { - if emb_idx < all_codes.len() { - let code = all_codes[emb_idx] as usize; - code_to_docs.entry(code).or_default().push(doc_id); + // Patch IVF in-place: remove deleted doc IDs and renumber survivors. + // This is O(IVF_size) instead of O(total_embeddings) since we avoid re-reading + // all chunk codes files. + { + let ivf_path = index_dir.join("ivf.npy"); + let ivf_lengths_path = index_dir.join("ivf_lengths.npy"); + + let old_ivf: Array1 = Array1::read_npy( + File::open(&ivf_path) + .map_err(|e| Error::Delete(format!("Failed to open IVF: {}", e)))?, + )?; + let old_ivf_lengths: Array1 = Array1::read_npy( + File::open(&ivf_lengths_path) + .map_err(|e| Error::Delete(format!("Failed to open IVF lengths: {}", e)))?, + )?; + + // Build a sorted list of deleted IDs for binary search renumbering. + let mut sorted_deleted: Vec = ids_to_delete.iter().copied().collect(); + sorted_deleted.sort_unstable(); + + let mut new_ivf_data: Vec = Vec::with_capacity(old_ivf.len()); + let mut new_ivf_lengths: Vec = Vec::with_capacity(num_partitions); + + let mut offset: usize = 0; + for &len in old_ivf_lengths.iter() { + let end = offset + len as usize; + let mut centroid_len: i32 = 0; + for &doc_id in old_ivf.as_slice().unwrap()[offset..end].iter() { + if ids_to_delete.contains(&doc_id) { + continue; } - emb_idx += 1; + // Renumber: subtract the count of deleted IDs below this one. + let shift = sorted_deleted.partition_point(|&d| d < doc_id) as i64; + new_ivf_data.push(doc_id - shift); + centroid_len += 1; } - doc_id += 1; + new_ivf_lengths.push(centroid_len); + offset = end; } - } - // Build optimized IVF (deduplicated, sorted) - let mut ivf_data: Vec = Vec::new(); - let mut ivf_lengths: Vec = vec![0; num_partitions]; - - for (centroid_id, ivf_len) in ivf_lengths.iter_mut().enumerate() { - if let Some(docs) = code_to_docs.get(¢roid_id) { - let mut unique_docs: Vec = docs.clone(); - unique_docs.sort_unstable(); - unique_docs.dedup(); - *ivf_len = unique_docs.len() as i32; - ivf_data.extend(unique_docs); - } + let new_ivf = Array1::from_vec(new_ivf_data); + let new_lengths = Array1::from_vec(new_ivf_lengths); + new_ivf.write_npy(File::create(&ivf_path)?)?; + new_lengths.write_npy(File::create(&ivf_lengths_path)?)?; } - // Save IVF - let ivf = Array1::from_vec(ivf_data); - let ivf_lengths = Array1::from_vec(ivf_lengths); - - ivf.write_npy(File::create(index_dir.join("ivf.npy"))?)?; - ivf_lengths.write_npy(File::create(index_dir.join("ivf_lengths.npy"))?)?; - // Update global metadata let final_avg_doclen = if final_num_documents > 0 { total_embeddings as f64 / final_num_documents as f64 diff --git a/next-plaid/src/filtering.rs b/next-plaid/src/filtering.rs index bf8aed6..668e82b 100644 --- a/next-plaid/src/filtering.rs +++ b/next-plaid/src/filtering.rs @@ -1152,47 +1152,65 @@ pub fn delete(index_path: &str, subset: &[i64]) -> Result { crate::text_search::drop_temp_table(&conn, name); } - // Get column names (excluding _subset_) - let mut columns: Vec = Vec::new(); - { - let mut stmt = conn.prepare("PRAGMA table_info(METADATA)")?; - let rows = stmt.query_map([], |row| row.get::<_, String>(1))?; - for row in rows { - let col = row?; - if col != SUBSET_COLUMN { - columns.push(col); + // Re-sequence _subset_ IDs to be contiguous 0-based. + // Instead of copying the entire table (expensive for large tables with TEXT/BLOB + // columns), use range-based UPDATEs that only touch the integer primary key. + // Process gaps in ascending order so decremented values never collide. + let mut sorted_ids: Vec = subset.to_vec(); + sorted_ids.sort_unstable(); + sorted_ids.dedup(); + // Only keep IDs that were actually in range (deleted rows). + let max_id: i64 = conn + .query_row( + &format!( + "SELECT COALESCE(MAX(\"{}\"), -1) FROM METADATA", + SUBSET_COLUMN + ), + [], + |row| row.get(0), + ) + .unwrap_or(-1); + + if max_id >= 0 && deleted > 0 { + // For each gap left by deleted rows, shift all subsequent rows down. + // Merge into contiguous ranges: if IDs 5,6,7 were deleted, rows >= 8 + // shift down by 3. We process from lowest gap upward. + // + // Build (range_start, range_end, shift) tuples. Consecutive deleted IDs + // form a single gap; the rows between two gaps all need the same shift + // (equal to the number of deleted IDs to their left). + let mut updates: Vec<(i64, i64, i64)> = Vec::new(); + let mut i = 0; + while i < sorted_ids.len() { + // Advance past consecutive deleted IDs + let mut j = i + 1; + while j < sorted_ids.len() && sorted_ids[j] == sorted_ids[j - 1] + 1 { + j += 1; + } + // Rows from sorted_ids[j-1]+1 up to (but not including) the next + // deleted ID need to shift down by j (total deletions so far). + let range_start = sorted_ids[j - 1] + 1; + let range_end = if j < sorted_ids.len() { + sorted_ids[j] + } else { + max_id + sorted_ids.len() as i64 + 1 + }; + if range_start < range_end { + updates.push((range_start, range_end, j as i64)); } + i = j; } - } - - let col_str = columns - .iter() - .map(|c| format!("\"{}\"", c)) - .collect::>() - .join(", "); - - // Create temp table with re-indexed _subset_ values - let create_temp_sql = format!( - "CREATE TEMP TABLE METADATA_TEMP AS \ - SELECT (ROW_NUMBER() OVER (ORDER BY \"{}\")) - 1 AS new_subset_id, {} \ - FROM METADATA", - SUBSET_COLUMN, col_str - ); - conn.execute(&create_temp_sql, [])?; - // Clear original table - conn.execute("DELETE FROM METADATA", [])?; - - // Copy back with new IDs - let insert_back_sql = format!( - "INSERT INTO METADATA (\"{}\", {}) \ - SELECT new_subset_id, {} FROM METADATA_TEMP", - SUBSET_COLUMN, col_str, col_str - ); - conn.execute(&insert_back_sql, [])?; - - // Drop temp table - conn.execute("DROP TABLE METADATA_TEMP", [])?; + for (from, to_excl, shift) in &updates { + conn.execute( + &format!( + "UPDATE METADATA SET \"{}\" = \"{}\" - ?1 WHERE \"{}\" >= ?2 AND \"{}\" < ?3", + SUBSET_COLUMN, SUBSET_COLUMN, SUBSET_COLUMN, SUBSET_COLUMN + ), + rusqlite::params![shift, from, to_excl], + )?; + } + } // Commit transaction conn.execute("COMMIT", [])?; diff --git a/next-plaid/src/index.rs b/next-plaid/src/index.rs index a261548..07f4196 100644 --- a/next-plaid/src/index.rs +++ b/next-plaid/src/index.rs @@ -1664,6 +1664,37 @@ impl MmapIndex { } } + /// Append embeddings to an existing index without loading the full MmapIndex. + /// + /// This is significantly faster than `update_or_create` for incremental updates + /// because it skips merged-file generation (628MB+ on large indices). Use this + /// when you only need the assigned doc IDs and won't search immediately after. + pub fn update_append( + embeddings: &[Array2], + index_path: &str, + update_config: &crate::update::UpdateConfig, + ) -> Result> { + use crate::codec::ResidualCodec; + use crate::update::update_index; + + let index_dir = std::path::Path::new(index_path); + let metadata = Metadata::load_from_path(index_dir)?; + let codec = ResidualCodec::load_from_dir(index_dir)?; + let start_doc_id = metadata.num_documents as i64; + let num_new_docs = embeddings.len(); + + update_index( + embeddings, + index_path, + &codec, + Some(update_config.batch_size), + false, + update_config.force_cpu, + )?; + + Ok((start_doc_id..start_doc_id + num_new_docs as i64).collect()) + } + /// Update an existing index or create a new one, with metadata and automatic /// FTS5 full-text indexing. ///