Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 167 additions & 13 deletions crates/events/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,23 @@ impl Default for RetentionPolicy {
///
/// Concurrent writes to the same task file are serialized via a per-task
/// mutex to prevent byte interleaving.
///
/// ## Type index
///
/// An in-memory secondary index maps event type strings to events, avoiding
/// the O(n*m) full scan in [`query_by_type_prefix`](EventStore::query_by_type_prefix).
/// The index is lazily populated from disk on first query and kept in sync
/// on every [`append`](EventStore::append). Compaction invalidates the index
/// so it is rebuilt on the next query.
pub struct EventStore {
root: PathBuf,
/// Per-task write locks to serialize concurrent appends to the same file.
write_locks: Mutex<HashMap<String, Arc<Mutex<()>>>>,
/// Retention policy for compaction.
retention: RetentionPolicy,
/// In-memory index: event_type string → events sorted by timestamp.
/// `None` means the index hasn't been built yet (lazy initialization).
type_index: Mutex<Option<HashMap<String, Vec<Event>>>>,
}

impl EventStore {
Expand All @@ -97,6 +108,7 @@ impl EventStore {
root: root.as_ref().to_path_buf(),
write_locks: Mutex::new(HashMap::new()),
retention,
type_index: Mutex::new(None),
}
}

Expand Down Expand Up @@ -178,6 +190,17 @@ impl EventStore {
file.write_all(line.as_bytes()).await?;
file.flush().await?;

drop(_guard);

// Update the in-memory type index if it has been initialized.
{
let mut idx = self.type_index.lock().await;
if let Some(ref mut index) = *idx {
let type_key = event.event_type.as_str().to_string();
index.entry(type_key).or_default().push(event.clone());
}
}

Ok(())
}

Expand Down Expand Up @@ -260,37 +283,64 @@ impl EventStore {

/// Query events across all tasks, filtered by an event-type prefix.
///
/// Scans every task's event log and returns events whose type starts with
/// `type_prefix` (e.g. `"orchestrator:"`). Results are sorted by timestamp
/// ascending and truncated to `limit`.
/// Uses an in-memory type index for O(k) lookups where k is the number of
/// matching events, instead of scanning every event file on disk.
/// The index is lazily built on first call and kept in sync by `append()`.
///
/// Results are sorted by timestamp ascending and truncated to `limit`
/// (keeping the most recent).
pub async fn query_by_type_prefix(
&self,
type_prefix: &str,
limit: usize,
) -> Result<Vec<Event>, StoreError> {
let task_ids = self.list_tasks().await?;
let mut matching = Vec::new();
// Ensure the type index is populated.
self.ensure_type_index().await?;

for task_id in &task_ids {
let events = self.read_task(task_id).await?;
for event in events {
if event.event_type.as_str().starts_with(type_prefix) {
matching.push(event);
}
let idx = self.type_index.lock().await;
let index = idx.as_ref().expect("index was just initialized");

// Collect events whose type key starts with the prefix.
let mut matching = Vec::new();
for (type_key, events) in index {
if type_key.starts_with(type_prefix) {
matching.extend(events.iter().cloned());
}
}

// Sort by timestamp ascending
// Sort by timestamp ascending.
matching.sort_by(|a, b| a.ts.cmp(&b.ts));

// Apply limit (take from the end to get the most recent)
// Apply limit (take from the end to get the most recent).
if matching.len() > limit {
matching = matching.split_off(matching.len() - limit);
}

Ok(matching)
}

/// Lazily build the in-memory type index from all events on disk.
async fn ensure_type_index(&self) -> Result<(), StoreError> {
let mut idx = self.type_index.lock().await;
if idx.is_some() {
return Ok(());
}

let mut index: HashMap<String, Vec<Event>> = HashMap::new();
let task_ids = self.list_tasks().await?;

for task_id in &task_ids {
let events = self.read_task(task_id).await?;
for event in events {
let type_key = event.event_type.as_str().to_string();
index.entry(type_key).or_default().push(event);
}
}

*idx = Some(index);
Ok(())
}

/// List all task IDs that have event logs.
pub async fn list_tasks(&self) -> Result<Vec<String>, StoreError> {
let mut tasks = Vec::new();
Expand Down Expand Up @@ -329,6 +379,9 @@ impl EventStore {
}

if total_removed > 0 {
// Invalidate the type index so it gets rebuilt on next query.
*self.type_index.lock().await = None;

tracing::info!(
removed = total_removed,
tasks = task_ids.len(),
Expand Down Expand Up @@ -445,6 +498,9 @@ impl EventStore {
}

if removed > 0 {
// Invalidate the type index since tasks were removed.
*self.type_index.lock().await = None;

// Clean up write locks for removed tasks.
let mut locks = self.write_locks.lock().await;
locks.retain(|_, lock| Arc::strong_count(lock) > 1);
Expand Down Expand Up @@ -678,6 +734,104 @@ mod tests {
assert_eq!(results.len(), 3);
}

#[tokio::test]
async fn query_by_type_prefix_index_stays_in_sync() {
let dir = tempdir().unwrap();
let store = EventStore::new(dir.path());

// Append an event and trigger index build via first query.
store
.append(&Event::new(
EventType::OrchestratorDecision,
"task-1",
Actor::Orchestrator,
serde_json::json!({}),
))
.await
.unwrap();

let results = store
.query_by_type_prefix("orchestrator:", 100)
.await
.unwrap();
assert_eq!(results.len(), 1);

// Append more events after the index is built — they should appear.
store
.append(&Event::new(
EventType::OrchestratorMessage,
"task-2",
Actor::Orchestrator,
serde_json::json!({}),
))
.await
.unwrap();

store
.append(&Event::new(
EventType::TaskCreated,
"task-3",
Actor::System,
serde_json::json!({}),
))
.await
.unwrap();

let results = store
.query_by_type_prefix("orchestrator:", 100)
.await
.unwrap();
assert_eq!(results.len(), 2);

// Non-matching prefix should still return 0.
let results = store
.query_by_type_prefix("agent:", 100)
.await
.unwrap();
assert_eq!(results.len(), 0);
}

#[tokio::test]
async fn query_by_type_prefix_rebuilds_after_compaction() {
let dir = tempdir().unwrap();
let policy = RetentionPolicy {
max_events: Some(1),
max_age: None,
};
let store = EventStore::with_retention(dir.path(), policy);

// Write 3 orchestrator events to the same task.
for i in 0..3 {
store
.append(&Event::new(
EventType::OrchestratorDecision,
"task-1",
Actor::Orchestrator,
serde_json::json!({"i": i}),
))
.await
.unwrap();
}

// Build the index.
let results = store
.query_by_type_prefix("orchestrator:", 100)
.await
.unwrap();
assert_eq!(results.len(), 3);

// Compact (keeps only 1 event per task).
let removed = store.compact_all().await.unwrap();
assert_eq!(removed, 2);

// Index should be rebuilt and reflect compacted state.
let results = store
.query_by_type_prefix("orchestrator:", 100)
.await
.unwrap();
assert_eq!(results.len(), 1);
}

#[tokio::test]
async fn read_task_skips_corrupted_lines() {
let dir = tempdir().unwrap();
Expand Down
Loading