Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions colgrep/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ fn delete_from_index_counted(ids: &[i64], index_path: &str) -> Result<usize> {
/// surviving documents, so interleaving reads and deletes would invalidate the IDs that haven't
/// been deleted yet. Returns the number of documents removed.
fn delete_files_from_index(index_path: &str, files: &[PathBuf]) -> Result<usize> {
delete_files_from_index_inner(index_path, files, true)
}

fn delete_files_from_index_no_fts_rebuild(index_path: &str, files: &[PathBuf]) -> Result<usize> {
delete_files_from_index_inner(index_path, files, false)
}

fn delete_files_from_index_inner(
index_path: &str,
files: &[PathBuf],
rebuild_fts: bool,
) -> Result<usize> {
if files.is_empty() {
return Ok(0);
}
Expand Down Expand Up @@ -136,10 +148,14 @@ fn delete_files_from_index(index_path: &str, files: &[PathBuf]) -> Result<usize>
};

filtering::delete(index_path, &ids)?;

if !rebuild_fts {
return Ok(ids.len());
}

if is_suffix_delete {
next_plaid::text_search::delete(index_path, &valid)?;
} else {
// Survivor IDs shifted; FTS5 rowids no longer match METADATA.
next_plaid::text_search::rebuild(index_path)?;
}
Ok(ids.len())
Expand Down Expand Up @@ -717,7 +733,13 @@ fn flush_update_batch(
}

let _guard = CriticalSectionGuard::new();
let (_, doc_ids) = MmapIndex::update_or_create(embeddings, index_path, config, update_config)?;
let index_exists = Path::new(index_path).join("metadata.json").exists();
let doc_ids = if index_exists {
MmapIndex::update_append(embeddings, index_path, update_config)?
} else {
let (_, ids) = MmapIndex::update_or_create(embeddings, index_path, config, update_config)?;
ids
};
embeddings.clear();

sender
Expand Down Expand Up @@ -1909,7 +1931,9 @@ impl IndexBuilder {
// Deferred from earlier to minimize the window where data is missing
// from the index (for concurrent readers and interrupt safety). Batched
// into a single index rewrite — see delete_files_from_index / issue #116.
delete_files_from_index(index_path, &files_changed)?;
// Skip FTS5 rebuild: we're about to re-add these files immediately via
// the encoding pipeline, which rebuilds their FTS5 entries.
delete_files_from_index_no_fts_rebuild(index_path, &files_changed)?;

let sorted_units = prepare_units_for_encoding(&new_units, index_chunk_size);
let was_interrupted = self.run_encoding_pipeline(
Expand Down Expand Up @@ -2288,7 +2312,8 @@ impl IndexBuilder {
}
// Idempotent resume: clear any partial docs a prior interrupted run wrote for these
// files, in a single batched index rewrite (issue #116).
delete_files_from_index(index_path, batch_files)?;
// Skip FTS5 rebuild: we're about to re-add these files immediately.
delete_files_from_index_no_fts_rebuild(index_path, batch_files)?;
self.ensure_model_created(batch_units.len())?;
let pool_factor = self.resolve_pool_factor(batch_units.len());
let sorted_units = prepare_units_for_encoding(batch_units, index_chunk_size);
Expand Down Expand Up @@ -2501,7 +2526,14 @@ impl IndexBuilder {

// 1. Delete chunks for deleted files (safe — not re-adding these). Batched into a
// single index rewrite — see delete_files_from_index / issue #116.
delete_files_from_index(index_path, &plan.deleted)?;
// Skip FTS5 rebuild here when changed files will trigger encoding: the
// final rebuild after encoding covers both deletions and re-adds in one pass.
let has_changes_to_encode = !plan.added.is_empty() || !plan.changed.is_empty();
if has_changes_to_encode {
delete_files_from_index_no_fts_rebuild(index_path, &plan.deleted)?;
} else {
delete_files_from_index(index_path, &plan.deleted)?;
}

// Remove deleted files from state
for path in &plan.deleted {
Expand Down Expand Up @@ -2576,12 +2608,13 @@ impl IndexBuilder {

// Delete stale index entries for skipped files that were previously indexed
// (e.g., files that became unreadable due to invalid UTF-8). Batched into one rewrite.
// Skip FTS rebuild: covered by the final rebuild after encoding.
let stale_skipped: Vec<PathBuf> = skipped_files
.iter()
.filter(|p| plan.changed.contains(p))
.cloned()
.collect();
let _ = delete_files_from_index(index_path, &stale_skipped);
let _ = delete_files_from_index_no_fts_rebuild(index_path, &stale_skipped);

// 3. Add new units to index
let mut was_interrupted = false;
Expand Down Expand Up @@ -2624,7 +2657,9 @@ impl IndexBuilder {
// Deferred from earlier to minimize the window where data is missing
// from the index (for concurrent readers and interrupt safety). Batched
// into a single index rewrite — see delete_files_from_index / issue #116.
delete_files_from_index(index_path, &plan.changed)?;
// Skip FTS5 rebuild: the encoding pipeline re-adds FTS entries for the
// new versions; we do a single rebuild after encoding completes.
delete_files_from_index_no_fts_rebuild(index_path, &plan.changed)?;

let sorted_units = prepare_units_for_encoding(&new_units, index_chunk_size);
let pipeline_interrupted = self.run_encoding_pipeline(
Expand Down
93 changes: 41 additions & 52 deletions next-plaid/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! - IVF full rebuild after deletion
//! - Metadata synchronization

use std::collections::{BTreeMap, HashSet};
use std::collections::HashSet;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::path::Path;
Expand Down Expand Up @@ -184,63 +184,52 @@ fn delete_from_index_impl(doc_ids: &[i64], index_path: &str, clean_buffer: bool)
current_doc_offset += doclens.len() as i64;
}

// Rebuild IVF from all remaining codes
// First, collect all codes from all chunks
let mut all_codes: Vec<i64> = Vec::with_capacity(total_embeddings);

for chunk_idx in 0..num_chunks {
let codes_path = index_dir.join(format!("{}.codes.npy", chunk_idx));
let chunk_codes: Array1<i64> =
Array1::read_npy(File::open(&codes_path).map_err(|e| {
Error::Delete(format!("Failed to read codes for IVF rebuild: {}", e))
})?)?;
all_codes.extend(chunk_codes.iter());
}

// Build IVF: map centroid -> list of document IDs
// We need to re-read doclens to map embeddings back to documents
let mut code_to_docs: BTreeMap<usize, Vec<i64>> = BTreeMap::new();
let mut emb_idx = 0;
let mut doc_id: i64 = 0;

for chunk_idx in 0..num_chunks {
let doclens_path = index_dir.join(format!("doclens.{}.json", chunk_idx));
let doclens: Vec<i64> =
serde_json::from_reader(BufReader::new(File::open(&doclens_path)?))?;

for &len in &doclens {
for _ in 0..len {
if emb_idx < all_codes.len() {
let code = all_codes[emb_idx] as usize;
code_to_docs.entry(code).or_default().push(doc_id);
// Patch IVF in-place: remove deleted doc IDs and renumber survivors.
// This is O(IVF_size) instead of O(total_embeddings) since we avoid re-reading
// all chunk codes files.
{
let ivf_path = index_dir.join("ivf.npy");
let ivf_lengths_path = index_dir.join("ivf_lengths.npy");

let old_ivf: Array1<i64> = Array1::read_npy(
File::open(&ivf_path)
.map_err(|e| Error::Delete(format!("Failed to open IVF: {}", e)))?,
)?;
let old_ivf_lengths: Array1<i32> = Array1::read_npy(
File::open(&ivf_lengths_path)
.map_err(|e| Error::Delete(format!("Failed to open IVF lengths: {}", e)))?,
)?;

// Build a sorted list of deleted IDs for binary search renumbering.
let mut sorted_deleted: Vec<i64> = ids_to_delete.iter().copied().collect();
sorted_deleted.sort_unstable();

let mut new_ivf_data: Vec<i64> = Vec::with_capacity(old_ivf.len());
let mut new_ivf_lengths: Vec<i32> = Vec::with_capacity(num_partitions);

let mut offset: usize = 0;
for &len in old_ivf_lengths.iter() {
let end = offset + len as usize;
let mut centroid_len: i32 = 0;
for &doc_id in old_ivf.as_slice().unwrap()[offset..end].iter() {
if ids_to_delete.contains(&doc_id) {
continue;
}
emb_idx += 1;
// Renumber: subtract the count of deleted IDs below this one.
let shift = sorted_deleted.partition_point(|&d| d < doc_id) as i64;
new_ivf_data.push(doc_id - shift);
centroid_len += 1;
}
doc_id += 1;
new_ivf_lengths.push(centroid_len);
offset = end;
}
}

// Build optimized IVF (deduplicated, sorted)
let mut ivf_data: Vec<i64> = Vec::new();
let mut ivf_lengths: Vec<i32> = vec![0; num_partitions];

for (centroid_id, ivf_len) in ivf_lengths.iter_mut().enumerate() {
if let Some(docs) = code_to_docs.get(&centroid_id) {
let mut unique_docs: Vec<i64> = docs.clone();
unique_docs.sort_unstable();
unique_docs.dedup();
*ivf_len = unique_docs.len() as i32;
ivf_data.extend(unique_docs);
}
let new_ivf = Array1::from_vec(new_ivf_data);
let new_lengths = Array1::from_vec(new_ivf_lengths);
new_ivf.write_npy(File::create(&ivf_path)?)?;
new_lengths.write_npy(File::create(&ivf_lengths_path)?)?;
}

// Save IVF
let ivf = Array1::from_vec(ivf_data);
let ivf_lengths = Array1::from_vec(ivf_lengths);

ivf.write_npy(File::create(index_dir.join("ivf.npy"))?)?;
ivf_lengths.write_npy(File::create(index_dir.join("ivf_lengths.npy"))?)?;

// Update global metadata
let final_avg_doclen = if final_num_documents > 0 {
total_embeddings as f64 / final_num_documents as f64
Expand Down
94 changes: 56 additions & 38 deletions next-plaid/src/filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,47 +1152,65 @@ pub fn delete(index_path: &str, subset: &[i64]) -> Result<usize> {
crate::text_search::drop_temp_table(&conn, name);
}

// Get column names (excluding _subset_)
let mut columns: Vec<String> = Vec::new();
{
let mut stmt = conn.prepare("PRAGMA table_info(METADATA)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
for row in rows {
let col = row?;
if col != SUBSET_COLUMN {
columns.push(col);
// Re-sequence _subset_ IDs to be contiguous 0-based.
// Instead of copying the entire table (expensive for large tables with TEXT/BLOB
// columns), use range-based UPDATEs that only touch the integer primary key.
// Process gaps in ascending order so decremented values never collide.
let mut sorted_ids: Vec<i64> = subset.to_vec();
sorted_ids.sort_unstable();
sorted_ids.dedup();
// Only keep IDs that were actually in range (deleted rows).
let max_id: i64 = conn
.query_row(
&format!(
"SELECT COALESCE(MAX(\"{}\"), -1) FROM METADATA",
SUBSET_COLUMN
),
[],
|row| row.get(0),
)
.unwrap_or(-1);

if max_id >= 0 && deleted > 0 {
// For each gap left by deleted rows, shift all subsequent rows down.
// Merge into contiguous ranges: if IDs 5,6,7 were deleted, rows >= 8
// shift down by 3. We process from lowest gap upward.
//
// Build (range_start, range_end, shift) tuples. Consecutive deleted IDs
// form a single gap; the rows between two gaps all need the same shift
// (equal to the number of deleted IDs to their left).
let mut updates: Vec<(i64, i64, i64)> = Vec::new();
let mut i = 0;
while i < sorted_ids.len() {
// Advance past consecutive deleted IDs
let mut j = i + 1;
while j < sorted_ids.len() && sorted_ids[j] == sorted_ids[j - 1] + 1 {
j += 1;
}
// Rows from sorted_ids[j-1]+1 up to (but not including) the next
// deleted ID need to shift down by j (total deletions so far).
let range_start = sorted_ids[j - 1] + 1;
let range_end = if j < sorted_ids.len() {
sorted_ids[j]
} else {
max_id + sorted_ids.len() as i64 + 1
};
if range_start < range_end {
updates.push((range_start, range_end, j as i64));
}
i = j;
}
}

let col_str = columns
.iter()
.map(|c| format!("\"{}\"", c))
.collect::<Vec<_>>()
.join(", ");

// Create temp table with re-indexed _subset_ values
let create_temp_sql = format!(
"CREATE TEMP TABLE METADATA_TEMP AS \
SELECT (ROW_NUMBER() OVER (ORDER BY \"{}\")) - 1 AS new_subset_id, {} \
FROM METADATA",
SUBSET_COLUMN, col_str
);
conn.execute(&create_temp_sql, [])?;

// Clear original table
conn.execute("DELETE FROM METADATA", [])?;

// Copy back with new IDs
let insert_back_sql = format!(
"INSERT INTO METADATA (\"{}\", {}) \
SELECT new_subset_id, {} FROM METADATA_TEMP",
SUBSET_COLUMN, col_str, col_str
);
conn.execute(&insert_back_sql, [])?;

// Drop temp table
conn.execute("DROP TABLE METADATA_TEMP", [])?;
for (from, to_excl, shift) in &updates {
conn.execute(
&format!(
"UPDATE METADATA SET \"{}\" = \"{}\" - ?1 WHERE \"{}\" >= ?2 AND \"{}\" < ?3",
SUBSET_COLUMN, SUBSET_COLUMN, SUBSET_COLUMN, SUBSET_COLUMN
),
rusqlite::params![shift, from, to_excl],
)?;
}
}

// Commit transaction
conn.execute("COMMIT", [])?;
Expand Down
31 changes: 31 additions & 0 deletions next-plaid/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,37 @@ impl MmapIndex {
}
}

/// Append embeddings to an existing index without loading the full MmapIndex.
///
/// This is significantly faster than `update_or_create` for incremental updates
/// because it skips merged-file generation (628MB+ on large indices). Use this
/// when you only need the assigned doc IDs and won't search immediately after.
pub fn update_append(
embeddings: &[Array2<f32>],
index_path: &str,
update_config: &crate::update::UpdateConfig,
) -> Result<Vec<i64>> {
use crate::codec::ResidualCodec;
use crate::update::update_index;

let index_dir = std::path::Path::new(index_path);
let metadata = Metadata::load_from_path(index_dir)?;
let codec = ResidualCodec::load_from_dir(index_dir)?;
let start_doc_id = metadata.num_documents as i64;
let num_new_docs = embeddings.len();

update_index(
embeddings,
index_path,
&codec,
Some(update_config.batch_size),
false,
update_config.force_cpu,
)?;

Ok((start_doc_id..start_doc_id + num_new_docs as i64).collect())
}

/// Update an existing index or create a new one, with metadata and automatic
/// FTS5 full-text indexing.
///
Expand Down
Loading