Add in-memory type index for event prefix queries#716
Add in-memory type index for event prefix queries#716iamnbutler wants to merge 1 commit intomainfrom
Conversation
query_by_type_prefix previously scanned every task file on every call — O(tasks × events_per_task). For 1000 tasks with 10k events each that's 10M events scanned per query. Add a lightweight TypeIndex that maps event_type → [(task_id, ts)] and is populated lazily on first query, then kept current by append(). Queries now only read task files that actually contain matching events. Key design decisions addressing prior review feedback: - **No race condition:** The type_index write lock is held across both the file write and the index update in append(). This prevents the race where lazy population reads a just-written event from disk and then append() duplicates it in the index. - **Memory-bounded:** Index stores only (task_id, timestamp) per entry, not full events. Capped at 2000 entries per event type (~50 types × 2000 × ~50 bytes ≈ 5 MB), which covers typical query limits (100-500). - **Compaction-safe:** Type index is invalidated when compact_task removes events, so the next query re-populates from current disk state. Closes #507 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
This PR correctly solves the O(tasks × events) full-scan bottleneck in query_by_type_prefix. The TypeIndex design is clean: lazy population, cap-per-type to bound memory, and compaction invalidation are all handled correctly. All 55 tests pass and clippy is clean.
Two suggestions around lock granularity — neither is a correctness bug, but both are worth considering before this ships in a high-throughput environment.
Lock scope in append() (line 243): The type_index write lock is held across the full file I/O (dir create, open, write, flush). The reason given is to prevent a duplicate index entry if ensure_type_index_populated() races in between the file write and the index update. But task_ids_for_prefix() already deduplicates task IDs, so a duplicate (task_id, ts) entry is harmless. Narrowing the lock to just the index.insert() call would eliminate write-path contention for reads.
Cold-start population holds write lock for full scan (line 393): ensure_type_index_populated() holds the write lock while reading every task file. This serializes all concurrent append() calls for the duration — a potentially multi-second stall on first query with a large store. Reading into a local buffer without the lock (then briefly locking to merge) would avoid this at the cost of the same benign duplicate.
The implementation is otherwise solid. The double-check pattern in ensure_type_index_populated() is correct, compact_task() properly invalidates on compaction, and the new tests cover the key behavioral invariants (post-population appends, unrelated task file skipping).
References:
Reviewed by PR / Review
| @@ -178,6 +255,10 @@ impl EventStore { | |||
| file.write_all(line.as_bytes()).await?; | |||
| file.flush().await?; | |||
There was a problem hiding this comment.
[SUGGESTION] Priority: Code Quality
The type_index write lock is held across create_dir_all, open, write_all, and flush (lines 243–256). The stated goal is to prevent ensure_type_index_populated() from reading the newly-written event off disk before append() has a chance to add it to the index, which would result in a duplicate (task_id, timestamp) entry.
But task_ids_for_prefix() already calls .dedup() on the deduplicated task ID list, so a duplicate entry in the vec is harmless — it just costs one extra element before the cap evicts it. The only real invariant that needs protection is: an append to a populated index mustn't be silently lost. That invariant is already satisfied by if index.populated { index.insert(...) } as long as the check and the insert are inside the same lock scope.
Consider narrowing the lock scope to just the index.insert() call:
let _guard = lock.lock().await;
// file I/O (no index lock held)
fs::create_dir_all(parent).await?;
let mut file = OpenOptions::new().create(true).append(true).open(&path).await?;
file.write_all(line.as_bytes()).await?;
file.flush().await?;
// Brief write lock only for the index update.
let mut index = self.type_index.write().await;
if index.populated {
index.insert(event.event_type.as_str(), &event.task, event.ts);
}As written, every append serializes all concurrent query_by_type_prefix read-lock acquisitions while doing disk I/O. In an agent workload where events are appended frequently, this adds latency to orchestrator queries.
| let mut index = self.type_index.write().await; | ||
| // Double-check after acquiring the write lock. | ||
| if index.populated { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let task_ids = self.list_tasks().await?; | ||
| for task_id in &task_ids { | ||
| let events = self.read_task(task_id).await?; | ||
| for event in &events { | ||
| index.insert(event.event_type.as_str(), &event.task, event.ts); | ||
| } | ||
| } |
There was a problem hiding this comment.
[SUGGESTION] Priority: Correctness / Performance
The write lock is held for the entire cold-start scan — list_tasks() plus one read_task() per task (lines 393–405). While this is documented as a "one-time cost at startup," in practice it blocks every concurrent append() call for the full scan duration, because append() acquires the per-task lock first and then waits for the type_index write lock (line 243). With 1000+ tasks that each have thousands of events, this could cause a multi-second write stall on the first orchestrator query.
A narrower approach: read all tasks into a local buffer without holding the lock, then briefly acquire the write lock to merge and set populated = true. Any events appended during the scan will either be caught by the merge or will be added by append() immediately after (since they hold the write lock before writing). The worst case is a duplicate entry — benign as noted above.
If changing the locking is out of scope for this PR, at minimum consider logging a warning (rather than just debug!) when the scan takes longer than a threshold, so operators can see cold-start stalls in production.
|
Orchestrator Evaluation: Rejected
Feedback for agent:
|
Summary
TypeIndextoEventStorethat maps event type strings to(task_id, timestamp)pairsquery_by_type_prefixnow reads only task files that contain matching events instead of scanning all files — reducing complexity from O(tasks × events) to O(matching_tasks × events_per_task)append()Addressing prior review feedback
Race condition fix: The
type_indexwrite lock is held across both the file write and the index update inappend(). This makes the file write and index update atomic — lazy population inquery_by_type_prefixcannot observe the on-disk event without the corresponding index entry (or vice-versa).Memory bounded: Index stores only
(task_id, timestamp)per entry — no full event/payload cloning. Capped at 2000 entries per event type (~50 types × 2000 × ~50 bytes ≈ 5 MB max), covering typical query limits (100–500).Compaction safe: Type index is invalidated when
compact_taskremoves events, so the next query re-populates from current disk state.Test plan
cargo test --package events)cargo check)Closes #507
🤖 Generated with Claude Code