diff --git a/crates/mesh/src/crdt_kv/crdt.rs b/crates/mesh/src/crdt_kv/crdt.rs index 74318748d..5366a2c12 100644 --- a/crates/mesh/src/crdt_kv/crdt.rs +++ b/crates/mesh/src/crdt_kv/crdt.rs @@ -1,401 +1,199 @@ -use std::{ - cmp::Reverse, - collections::HashSet, - sync::Arc, - time::{Duration, Instant}, -}; - -use dashmap::{mapref::entry::Entry as MapEntry, DashMap}; -use parking_lot::{Mutex, RwLock}; -use tracing::{debug, info}; +//! CRDT OR-Map router. +//! +//! [`CrdtOrMap`] used to host both LWW and EpochMaxWins logic inline, with a +//! per-prefix strategy table that branched at every entry point. The bug +//! pattern in PR #1469 traced back to that shared shape: any strategy-specific +//! invariant had to be threaded through every shared call site, and missing +//! one site was how the same bug would resurface in a different form. +//! +//! `CrdtOrMap` is now a thin router over per-namespace engines (see +//! [`engine`](super::engine)). Each registered prefix gets its own engine +//! with its own state, log, clock, and metadata; the router just matches +//! keys to engines by longest-prefix-match and delegates. Unregistered keys +//! fall through to a built-in default LWW engine, preserving today's +//! "default LWW" semantics for callers that never call +//! `register_merge_strategy`. + +use std::{cmp::Reverse, collections::BTreeMap, sync::Arc, time::Duration}; + +use parking_lot::RwLock; +use tracing::info; use super::{ - epoch_max_wins, - kv_store::KvStore, + engine::{EngineHandle, EpochMaxWinsLegacyEngine, LwwEngine}, merge_strategy::MergeStrategy, operation::{Operation, OperationLog}, - replica::{LamportClock, ReplicaId}, + replica::ReplicaId, }; -// ============================================================================ -// CRDT OR-Map - Observed-Remove Map Implementation -// ============================================================================ - -/// Default tombstone grace period. Tombstones younger than this are not -/// garbage collected, preventing data resurrection from stale peers. -/// Gossip converges in seconds for small clusters, so 5 minutes is very -/// conservative. +/// Default tombstone grace period for [`CrdtOrMap::gc_tombstones`]. Forwarded +/// to each engine's GC. pub const DEFAULT_TOMBSTONE_GRACE: Duration = Duration::from_secs(300); -/// Value metadata for CRDT OR-Map -#[derive(Debug, Clone)] -struct ValueMetadata { - timestamp: u64, - replica_id: ReplicaId, - is_tombstone: bool, // Marks if this version is a tombstone (deletion) - /// Monotonic timestamp for tombstone GC. Tombstones younger than - /// `tombstone_grace` are not garbage collected to prevent data resurrection. - created_at: Instant, -} - -impl PartialEq for ValueMetadata { - fn eq(&self, other: &Self) -> bool { - self.timestamp == other.timestamp - && self.replica_id == other.replica_id - && self.is_tombstone == other.is_tombstone - } -} - -impl Eq for ValueMetadata {} - -impl ValueMetadata { - fn new(timestamp: u64, replica_id: ReplicaId) -> Self { - Self { - timestamp, - replica_id, - is_tombstone: false, - created_at: Instant::now(), - } - } - - fn from_rate_limit_live_version(version: epoch_max_wins::RateLimitVersion) -> Self { - Self::new(version.timestamp, version.replica_id) - } +type EngineTable = Arc<[(String, EngineHandle)]>; - fn tombstone(timestamp: u64, replica_id: ReplicaId) -> Self { - Self { - timestamp, - replica_id, - is_tombstone: true, - created_at: Instant::now(), - } - } - - fn version_key(&self) -> (u64, ReplicaId) { - (self.timestamp, self.replica_id) - } - - fn as_rate_limit_version(&self) -> epoch_max_wins::RateLimitVersion { - epoch_max_wins::RateLimitVersion::new(self.timestamp, self.replica_id) - } - - fn matches_version(&self, timestamp: u64, replica_id: ReplicaId) -> bool { - self.timestamp == timestamp && self.replica_id == replica_id - } - - fn is_newer_than(&self, timestamp: u64, replica_id: ReplicaId) -> bool { - self.version_key() > (timestamp, replica_id) - } -} - -/// Immutable snapshot of registered prefix→strategy mappings. -/// `register_merge_strategy` builds a new snapshot copy-on-write; readers -/// take a cheap `Arc::clone` and traverse it without holding any lock. -type StrategyTable = Arc<[(String, MergeStrategy)]>; - -/// CRDT OR-Map +/// CRDT OR-Map. Routes operations to per-namespace engines by prefix. #[derive(Clone)] pub struct CrdtOrMap { - store: KvStore, - metadata: Arc>>, // Key to list of versions - key_locks: Arc>>>, // Per-key critical section lock - merge_strategies: Arc>, + /// Engines explicitly registered via `register_merge_strategy`, sorted by + /// `Reverse(prefix.len())` so longest-prefix-match wins. + engines: Arc>, + /// Catch-all engine for keys not matching any registered prefix. LWW for + /// backward compatibility with callers that never register a prefix + /// (notably the in-crate tests). + default_engine: EngineHandle, replica_id: ReplicaId, - clock: LamportClock, - operation_log: Arc>, } impl CrdtOrMap { - /// Create new CRDT OR-Map pub fn new() -> Self { Self::with_replica_id(ReplicaId::new()) } - /// Create new CRDT OR-Map with specified replica ID pub fn with_replica_id(replica_id: ReplicaId) -> Self { info!("Creating CRDT OR-Map, Replica ID: {}", replica_id); Self { - store: KvStore::new(), - metadata: Arc::new(DashMap::new()), - key_locks: Arc::new(DashMap::new()), - merge_strategies: Arc::new(RwLock::new(Arc::from(Vec::new()))), + engines: Arc::new(RwLock::new(Arc::from(Vec::new()))), + default_engine: Arc::new(LwwEngine::new(replica_id)), replica_id, - clock: LamportClock::new(), - operation_log: Arc::new(RwLock::new(OperationLog::new())), } } - /// Register the merge strategy for a key prefix. Copy-on-write: builds - /// a new immutable snapshot so readers (`compact`, `append`, `merge`) - /// can grab a cheap `Arc::clone` of the current snapshot instead of - /// cloning the underlying Vec on every gossip round. + /// Register the merge strategy for a key prefix. Each prefix gets its own + /// engine instance; re-registering replaces the existing engine for that + /// prefix (state is discarded — intended for startup configuration only). pub(crate) fn register_merge_strategy(&self, prefix: String, strategy: MergeStrategy) { - let mut guard = self.merge_strategies.write(); - let mut next: Vec<(String, MergeStrategy)> = guard.iter().cloned().collect(); - if let Some((_, existing)) = next - .iter_mut() - .find(|(registered_prefix, _)| registered_prefix == &prefix) - { - *existing = strategy; + let engine: EngineHandle = match strategy { + MergeStrategy::LastWriterWins => Arc::new(LwwEngine::new(self.replica_id)), + MergeStrategy::EpochMaxWins => Arc::new(EpochMaxWinsLegacyEngine::new(self.replica_id)), + }; + let mut guard = self.engines.write(); + let mut next: Vec<(String, EngineHandle)> = guard.iter().cloned().collect(); + if let Some((_, existing)) = next.iter_mut().find(|(p, _)| p == &prefix) { + *existing = engine; } else { - next.push((prefix, strategy)); + next.push((prefix, engine)); } next.sort_by_key(|(prefix, _)| Reverse(prefix.len())); *guard = Arc::from(next); } - fn merge_strategies_snapshot(&self) -> StrategyTable { - Arc::clone(&self.merge_strategies.read()) - } - - fn merge_strategy_for_key(&self, key: &str) -> MergeStrategy { - let strategies = self.merge_strategies_snapshot(); - Self::merge_strategy_for_key_from(&strategies, key) - } - - fn merge_strategy_for_key_from( - strategies: &[(String, MergeStrategy)], - key: &str, - ) -> MergeStrategy { - strategies - .iter() - .find_map(|(prefix, strategy)| key.starts_with(prefix).then_some(*strategy)) - .unwrap_or(MergeStrategy::LastWriterWins) - } - - fn compact_operation_log(&self, operation_log: &mut OperationLog) { - let strategies = self.merge_strategies_snapshot(); - operation_log - .compact_with_strategy(|key| Self::merge_strategy_for_key_from(&strategies, key)); + fn engines_snapshot(&self) -> EngineTable { + Arc::clone(&self.engines.read()) } - fn append_operation(&self, operation: Operation) { - let mut operation_log = self.operation_log.write(); - let strategies = self.merge_strategies_snapshot(); - operation_log.append_with_strategy(operation, |key| { - Self::merge_strategy_for_key_from(&strategies, key) - }); - } - - fn key_lock_for(&self, key: &str) -> Arc> { - self.key_locks - .entry(key.to_string()) - .or_insert_with(|| Arc::new(Mutex::new(()))) - .clone() - } - - fn key_is_tombstoned_or_unknown(&self, key: &str) -> bool { - self.metadata.get(key).is_none_or(|versions| { - versions - .iter() - .max_by_key(|version| version.version_key()) - .is_none_or(|winner| winner.is_tombstone) - }) + /// Return the engine handle a key routes to. Falls back to the default + /// LWW engine if no registered prefix matches. + fn engine_for_key(&self, key: &str) -> EngineHandle { + let engines = self.engines_snapshot(); + for (prefix, engine) in engines.iter() { + if key.starts_with(prefix.as_str()) { + return Arc::clone(engine); + } + } + Arc::clone(&self.default_engine) } - fn try_cleanup_key_lock(&self, key: &str, key_lock: &Arc>) { - if self.store.contains_key(key) || !self.key_is_tombstoned_or_unknown(key) { - return; + /// Collect every engine (registered + default) so callers can fan reads + /// across all of them. + fn all_engines(&self) -> Vec { + let engines = self.engines_snapshot(); + let mut out = Vec::with_capacity(engines.len() + 1); + for (_, engine) in engines.iter() { + out.push(Arc::clone(engine)); } - - let _ = self.key_locks.remove_if(key, |_, stored_lock| { - Arc::ptr_eq(stored_lock, key_lock) - && Arc::strong_count(stored_lock) <= 2 - && stored_lock.try_lock().is_some() - }); + out.push(Arc::clone(&self.default_engine)); + out } - /// Insert key-value pair (transparent operation) - pub fn insert(&self, key: String, value: Vec) -> Option> { - let key_lock = self.key_lock_for(&key); - let key_guard = key_lock.lock(); - - let previous = self.store.get(&key); - let timestamp = self.clock.tick(); - let operation = Operation::insert(key.clone(), value, timestamp, self.replica_id); - let result = if self.apply_insert_locked(&key, operation.clone()) { - self.append_operation(operation); + // ---- Local writes ---- - debug!( - "Insert: key={}, timestamp={}, replica={}", - key, timestamp, self.replica_id - ); - - previous - } else { - self.store.get(&key).map(|bytes| bytes.to_vec()) - }; - - drop(key_guard); - self.try_cleanup_key_lock(&key, &key_lock); - result + pub fn insert(&self, key: String, value: Vec) -> Option> { + self.engine_for_key(&key).put_local(&key, value) } - /// Remove key (transparent operation) pub fn remove(&self, key: &str) -> Option> { - let key_lock = self.key_lock_for(key); - let key_guard = key_lock.lock(); - - let timestamp = self.clock.tick(); - - debug!( - "Remove: key={}, timestamp={}, replica={}", - key, timestamp, self.replica_id - ); - - let removed = match self.merge_strategy_for_key(key) { - MergeStrategy::EpochMaxWins => { - if self.apply_epoch_remove_locked(key, timestamp, self.replica_id) { - let operation = Operation::remove(key.to_string(), timestamp, self.replica_id); - self.append_operation(operation); - } - None - } - MergeStrategy::LastWriterWins => { - if self.record_remove_metadata(key, timestamp, self.replica_id) { - let operation = Operation::remove(key.to_string(), timestamp, self.replica_id); - self.append_operation(operation); - self.store.remove(key) - } else { - None - } - } - }; - - drop(key_guard); - self.try_cleanup_key_lock(key, &key_lock); - removed + self.engine_for_key(key).delete_local(key) } - /// Get value by key + // ---- Reads ---- + pub fn get(&self, key: &str) -> Option> { - self.store.get(key) + self.engine_for_key(key).get(key) } - /// Check if key exists pub fn contains_key(&self, key: &str) -> bool { - self.store.contains_key(key) + self.engine_for_key(key).contains_key(key) } - /// Mutation generation counter. Increments on every insert/remove. + /// Mutation generation counter. Sums per-engine generations: any change + /// in any engine increments the sum monotonically, so callers that key + /// off `generation()` to detect "anything changed" still work. pub fn generation(&self) -> u64 { - self.store.generation() + self.all_engines().iter().map(|e| e.generation()).sum() } - /// Get all keys without cloning values. pub fn keys(&self) -> Vec { - self.store.keys() + let mut all = Vec::new(); + for engine in self.all_engines() { + all.extend(engine.keys()); + } + all } - /// Get all key-value pairs - pub fn all(&self) -> std::collections::BTreeMap> { - self.store.all() + pub fn all(&self) -> BTreeMap> { + let mut all = BTreeMap::new(); + for engine in self.all_engines() { + for key in engine.keys() { + if let Some(value) = engine.get(&key) { + all.insert(key, value); + } + } + } + all } - /// Get number of live keys in the local store. pub fn len(&self) -> usize { - self.store.len() + self.all_engines().iter().map(|e| e.len()).sum() } pub fn is_empty(&self) -> bool { self.len() == 0 } - /// Get the replica ID pub fn replica_id(&self) -> ReplicaId { self.replica_id } - /// Remove tombstoned keys from the metadata and key_locks maps. - /// Keys that are not in the live store and whose latest metadata entry - /// is a tombstone are cleaned up to prevent unbounded memory growth. - /// - /// Tombstones younger than `grace` are NOT removed, preventing data - /// resurrection from stale peers that haven't received the tombstone yet - /// Default grace period: 5 minutes. - /// - /// Returns the number of entries removed. + // ---- Tombstone GC ---- + pub fn gc_tombstones(&self) -> usize { self.gc_tombstones_with_grace(DEFAULT_TOMBSTONE_GRACE) } - /// Like `gc_tombstones` but with a custom grace period. - /// Useful for testing with shorter durations. pub fn gc_tombstones_with_grace(&self, grace: Duration) -> usize { - let now = Instant::now(); - let mut removed = 0; - // Collect-then-remove: collect keys to check first (read-only iteration), - // then remove individually. This avoids locking all DashMap shards - // simultaneously, which would stall concurrent writers. - let keys_to_check: Vec = self - .metadata + self.all_engines() .iter() - .filter(|entry| !self.store.contains_key(entry.key())) - .map(|entry| entry.key().clone()) - .collect(); - - for key in keys_to_check { - if !self.key_is_tombstoned_or_unknown(&key) { - continue; - } - // Only remove key_locks if no other task holds the lock. - // Uses the same safety pattern as try_cleanup_key_lock: - // check strong_count and try_lock before removing. - self.key_locks.remove_if(&key, |_, lock| { - Arc::strong_count(lock) <= 2 && lock.try_lock().is_some() - }); - // Atomically remove metadata only if the key is still not in the - // live store AND still tombstoned AND the tombstone is older than - // the grace period. The remove_if closure runs under the DashMap - // shard lock, preventing a concurrent insert from racing between - // check and remove. - let was_removed = self.metadata.remove_if(&key, |_, versions| { - !self.store.contains_key(&key) - && versions - .iter() - .max_by_key(|v| v.version_key()) - .is_none_or(|winner| { - winner.is_tombstone - && now.saturating_duration_since(winner.created_at) >= grace - }) - }); - if was_removed.is_some() { - removed += 1; - } - } - removed + .map(|e| e.gc_tombstones(grace)) + .sum() } - /// Get the operation log - pub fn get_operation_log(&self) -> OperationLog { - self.operation_log.read().clone() - } + // ---- Replication ---- - /// Apply a single operation - fn apply_operation(&self, operation: &Operation) { - match operation { - Operation::Insert { - key, - value, - timestamp, - replica_id, - } => { - self.clock.update(*timestamp); - self.apply_insert(key, value.clone(), *timestamp, *replica_id); - } - Operation::Remove { - key, - timestamp, - replica_id, - } => { - self.clock.update(*timestamp); - let _ = self.apply_remove(key, *timestamp, *replica_id); - } + /// Snapshot the operation log seen by gossip. Concatenates each engine's + /// log into a single [`OperationLog`]. + pub fn get_operation_log(&self) -> OperationLog { + let mut ops = Vec::new(); + for engine in self.all_engines() { + ops.extend(engine.export_ops()); } + OperationLog::from_operations(ops) } - /// Merge operation log from another replica - /// This is the core CRDT merge operation - state is derived from log + /// Merge an incoming operation log. Groups ops by destination engine + /// (longest-prefix-match) and hands each engine its slice. Engines + /// canonicalise (merge → compact → apply) internally, so dominated ops + /// in the incoming batch never reach live state. pub fn merge(&self, log: &OperationLog) { info!( "Merging {} operations into replica {}", @@ -403,360 +201,38 @@ impl CrdtOrMap { self.replica_id ); - let strategies = self.merge_strategies_snapshot(); - let seen_operations: HashSet<(ReplicaId, u64)> = { - let local_log = self.operation_log.read(); - local_log - .operations() - .iter() - .map(|operation| (operation.replica_id(), operation.timestamp())) - .collect() - }; + let engines = self.engines_snapshot(); + // Bucket index: 0..engines.len() for registered prefixes, + // engines.len() for the default engine. + let default_idx = engines.len(); + let mut buckets: Vec> = (0..=default_idx).map(|_| Vec::new()).collect(); - // EpochMaxWins re-applies same-op-id operations because a compacted - // payload may carry an embedded tombstone_version that the receiver's - // raw-payload version is missing (`merge_live_value.changed` gates the - // store update so identical bytes are still a no-op). - let mut unseen_operations: Vec = - log.operations() + for op in log.operations() { + let idx = engines .iter() - .filter(|operation| { - match Self::merge_strategy_for_key_from(&strategies, operation.key()) { - MergeStrategy::LastWriterWins => !seen_operations - .contains(&(operation.replica_id(), operation.timestamp())), - MergeStrategy::EpochMaxWins => true, - } - }) - .cloned() - .collect(); - unseen_operations.sort_by_key(|operation| (operation.timestamp(), operation.replica_id())); - - { - let mut local_log = self.operation_log.write(); - local_log.merge_with_strategy(log, |key| { - Self::merge_strategy_for_key_from(&strategies, key) - }); - self.compact_operation_log(&mut local_log); + .position(|(prefix, _)| op.key().starts_with(prefix.as_str())) + .unwrap_or(default_idx); + buckets[idx].push(op.clone()); } - // Apply only new operations in deterministic order. - for operation in &unseen_operations { - self.apply_operation(operation); + for (idx, ops) in buckets.into_iter().enumerate() { + if ops.is_empty() { + continue; + } + let engine = if idx == default_idx { + Arc::clone(&self.default_engine) + } else { + Arc::clone(&engines[idx].1) + }; + engine.apply_remote_ops(&ops); } } - /// Convenience method: merge from another replica instance - /// In distributed systems, prefer using merge(&log) with serialized logs + /// Convenience: merge another replica's full log. pub fn merge_replica(&self, other: &CrdtOrMap) { let other_log = other.get_operation_log(); self.merge(&other_log); } - - // ======================================================================== - // Internal methods for applying operations - // ======================================================================== - - /// Apply insert (LWW semantic; newer tombstones can suppress older inserts). - fn apply_insert(&self, key: &str, value: Vec, timestamp: u64, replica_id: ReplicaId) { - let key_lock = self.key_lock_for(key); - let key_guard = key_lock.lock(); - let operation = Operation::insert(key.to_string(), value, timestamp, replica_id); - - self.apply_insert_locked(key, operation); - - drop(key_guard); - self.try_cleanup_key_lock(key, &key_lock); - } - - fn apply_insert_locked(&self, key: &str, operation: Operation) -> bool { - let Operation::Insert { - value, - timestamp, - replica_id, - .. - } = operation - else { - return false; - }; - - match self.merge_strategy_for_key(key) { - MergeStrategy::EpochMaxWins => { - if let Some(merged) = - self.record_epoch_insert_metadata(key, &value, timestamp, replica_id) - { - self.store.insert(key.to_string(), merged); - true - } else { - false - } - } - MergeStrategy::LastWriterWins => { - if self.record_insert_metadata(key, timestamp, replica_id) { - self.store.insert(key.to_string(), value); - true - } else { - false - } - } - } - } - - fn compact_key_metadata(versions: &mut Vec) { - if versions.len() <= 1 { - return; - } - - if let Some(winner) = versions.iter().max_by_key(|v| v.version_key()).cloned() { - versions.clear(); - versions.push(winner); - } - } - - fn newest_rate_limit_tombstone_version( - versions: &[ValueMetadata], - ) -> Option { - versions - .iter() - .filter(|version| version.is_tombstone) - .max_by_key(|version| version.version_key()) - .map(ValueMetadata::as_rate_limit_version) - } - - fn record_insert_metadata(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> bool { - let new_metadata = ValueMetadata::new(timestamp, replica_id); - - match self.metadata.entry(key.to_string()) { - MapEntry::Occupied(mut entry) => { - let versions = entry.get_mut(); - - let has_existing_entry = versions - .iter() - .any(|v| v.matches_version(timestamp, replica_id)); - if has_existing_entry { - Self::compact_key_metadata(versions); - return false; - } - - let current_winner = versions.iter().max_by_key(|v| v.version_key()); - - if current_winner.is_some_and(|winner| winner.is_newer_than(timestamp, replica_id)) - { - Self::compact_key_metadata(versions); - return false; - } - - versions.push(new_metadata); - Self::compact_key_metadata(versions); - true - } - MapEntry::Vacant(entry) => { - entry.insert(vec![new_metadata]); - true - } - } - } - - // Tombstones for EpochMaxWins keys are tracked in two places: - // (a) `ValueMetadata { is_tombstone: true, .. }` in `metadata` - // — used locally for LWW ordering + tombstone GC. - // (b) `tombstone_version` embedded in the stored shard payload - // — propagates across replicas via snapshot/compaction so - // a peer that receives only the post-tombstone Insert - // (the Remove op gone after compaction) still filters - // pre-tombstone inserts. See - // `test_epoch_max_wins_snapshot_only_propagation_preserves_tombstone_boundary`. - fn record_epoch_insert_metadata( - &self, - key: &str, - value: &[u8], - timestamp: u64, - replica_id: ReplicaId, - ) -> Option> { - let incoming_version = epoch_max_wins::RateLimitVersion::new(timestamp, replica_id); - let current = self.store.get(key); - - match self.metadata.entry(key.to_string()) { - MapEntry::Occupied(mut entry) => { - let versions = entry.get_mut(); - // No op-id short-circuit: a same-(timestamp, replica_id) op - // may carry a richer payload (e.g. a compacted shard with an - // embedded tombstone_version). `merge_live_value.changed` - // gates the store update so identical bytes are still a no-op. - let current_tombstone = Self::newest_rate_limit_tombstone_version(versions); - let Some(merged) = epoch_max_wins::merge_live_value( - current.as_deref(), - current_tombstone, - value, - incoming_version, - ) else { - Self::compact_key_metadata(versions); - return None; - }; - - if !merged.changed { - Self::compact_key_metadata(versions); - return None; - } - versions.clear(); - versions.push(ValueMetadata::from_rate_limit_live_version( - merged.live_version, - )); - Some(merged.value) - } - MapEntry::Vacant(entry) => { - let merged = epoch_max_wins::merge_live_value(None, None, value, incoming_version)?; - entry.insert(vec![ValueMetadata::from_rate_limit_live_version( - merged.live_version, - )]); - Some(merged.value) - } - } - } - - /// Apply remove - fn apply_remove(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> Option> { - let key_lock = self.key_lock_for(key); - let key_guard = key_lock.lock(); - - let removed = match self.merge_strategy_for_key(key) { - MergeStrategy::EpochMaxWins => { - self.apply_epoch_remove_locked(key, timestamp, replica_id); - None - } - MergeStrategy::LastWriterWins => { - if self.record_remove_metadata(key, timestamp, replica_id) { - self.store.remove(key) - } else { - None - } - } - }; - - drop(key_guard); - self.try_cleanup_key_lock(key, &key_lock); - removed - } - - // Per-point tombstone application for EpochMaxWins keys. The stored shard - // is filtered against the merged (existing ∪ incoming) tombstone so live - // path and `compact_operations` agree (spec §2.5). Returns whether the - // tombstone was newly accepted (used by `remove` to decide whether to - // append an operation). - fn apply_epoch_remove_locked(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> bool { - let incoming_tombstone = epoch_max_wins::RateLimitVersion::new(timestamp, replica_id); - let current = self.store.get(key); - - match self.metadata.entry(key.to_string()) { - MapEntry::Occupied(mut entry) => { - let versions = entry.get_mut(); - let already_recorded = versions - .iter() - .any(|v| v.is_tombstone && v.matches_version(timestamp, replica_id)); - if already_recorded { - Self::compact_key_metadata(versions); - return false; - } - let current_tombstone = Self::newest_rate_limit_tombstone_version(versions); - let result = epoch_max_wins::apply_tombstone( - current.as_deref(), - current_tombstone, - incoming_tombstone, - ); - match result { - epoch_max_wins::TombstoneApply::Surviving { - value, - live_version, - } => { - versions.clear(); - versions.push(ValueMetadata::from_rate_limit_live_version(live_version)); - self.store.insert(key.to_string(), value); - } - epoch_max_wins::TombstoneApply::Empty { tombstone_version } => { - // Preserve `created_at` on an existing tombstone whose - // (timestamp, replica_id) already matches the merged - // result - older delayed Removes from a lagging peer - // must not refresh the GC clock. - let already_matches = versions.iter().any(|v| { - v.is_tombstone - && v.matches_version( - tombstone_version.timestamp, - tombstone_version.replica_id, - ) - }); - if !already_matches { - versions.clear(); - versions.push(ValueMetadata::tombstone( - tombstone_version.timestamp, - tombstone_version.replica_id, - )); - } - self.store.remove(key); - } - } - true - } - MapEntry::Vacant(entry) => { - // A tombstone for a never-seen key still records ordering - // information so a delayed pre-tombstone insert is suppressed. - let result = - epoch_max_wins::apply_tombstone(current.as_deref(), None, incoming_tombstone); - let mut versions = Vec::new(); - match result { - epoch_max_wins::TombstoneApply::Surviving { - value, - live_version, - } => { - versions.push(ValueMetadata::from_rate_limit_live_version(live_version)); - self.store.insert(key.to_string(), value); - } - epoch_max_wins::TombstoneApply::Empty { tombstone_version } => { - versions.push(ValueMetadata::tombstone( - tombstone_version.timestamp, - tombstone_version.replica_id, - )); - self.store.remove(key); - } - } - entry.insert(versions); - true - } - } - } - - fn record_remove_metadata(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> bool { - let tombstone = ValueMetadata::tombstone(timestamp, replica_id); - - match self.metadata.entry(key.to_string()) { - MapEntry::Occupied(mut entry) => { - let versions = entry.get_mut(); - let has_existing_entry = versions - .iter() - .any(|v| v.is_tombstone && v.matches_version(timestamp, replica_id)); - if has_existing_entry { - Self::compact_key_metadata(versions); - return false; - } - - let has_newer_version = versions - .iter() - .any(|v| v.is_newer_than(timestamp, replica_id)); - if has_newer_version { - Self::compact_key_metadata(versions); - return false; - } - - versions.push(tombstone); - Self::compact_key_metadata(versions); - true - } - MapEntry::Vacant(entry) => { - // Record the tombstone even for a never-seen key so a delayed - // older insert cannot resurrect it (CRDT OR-map semantics). - entry.insert(vec![tombstone]); - true - } - } - } } impl Default for CrdtOrMap { @@ -764,45 +240,3 @@ impl Default for CrdtOrMap { Self::new() } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn epoch_equal_value_insert_does_not_rewind_metadata() { - let replica = CrdtOrMap::new(); - replica.register_merge_strategy("rl:".to_string(), MergeStrategy::EpochMaxWins); - - let key = "rl:global:node-a"; - let newer_insert_replica = ReplicaId::new(); - let older_insert_replica = ReplicaId::new(); - let tombstone_replica = ReplicaId::new(); - - assert!(replica.apply_insert_locked( - key, - Operation::insert( - key.to_string(), - epoch_max_wins::encode(6, 0).to_vec(), - 100, - newer_insert_replica - ), - )); - assert!(!replica.apply_insert_locked( - key, - Operation::insert( - key.to_string(), - epoch_max_wins::encode(6, 0).to_vec(), - 10, - older_insert_replica - ), - )); - - assert_eq!(replica.apply_remove(key, 50, tombstone_replica), None); - assert_eq!( - replica.get(key).and_then(|value| epoch_max_wins::decode(&value)), - Some(epoch_max_wins::EpochCount { epoch: 6, count: 0 }), - "older equal-value insert must not let an intermediate tombstone delete the newer live value", - ); - } -} diff --git a/crates/mesh/src/crdt_kv/engine/epoch_max_wins.rs b/crates/mesh/src/crdt_kv/engine/epoch_max_wins.rs new file mode 100644 index 000000000..38f57f708 --- /dev/null +++ b/crates/mesh/src/crdt_kv/engine/epoch_max_wins.rs @@ -0,0 +1,442 @@ +//! Transitional EpochMaxWins engine. +//! +//! This is a behavior-preserving wrapper around the legacy EpochMaxWins logic +//! that lived inline in `CrdtOrMap`. It owns its own state (so the engine +//! trait surface is exercised end-to-end), but its internal shape still +//! mirrors the LWW layout (one shared `ValueMetadata` vec per key, etc.). +//! PR #3 will replace this with a real `RateLimitEngine` that holds a typed +//! `RateLimitShard` per key without the LWW-shaped metadata layer. + +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use dashmap::{mapref::entry::Entry as MapEntry, DashMap}; +use parking_lot::{Mutex, RwLock}; +use tracing::debug; + +use super::NamespaceCrdtEngine; +use crate::crdt_kv::{ + epoch_max_wins as ratelimit, + kv_store::KvStore, + operation::{Operation, OperationLog}, + replica::{LamportClock, ReplicaId}, +}; + +#[derive(Debug, Clone)] +struct ValueMetadata { + timestamp: u64, + replica_id: ReplicaId, + is_tombstone: bool, + created_at: Instant, +} + +impl PartialEq for ValueMetadata { + fn eq(&self, other: &Self) -> bool { + self.timestamp == other.timestamp + && self.replica_id == other.replica_id + && self.is_tombstone == other.is_tombstone + } +} + +impl Eq for ValueMetadata {} + +impl ValueMetadata { + fn from_rate_limit_live_version(version: ratelimit::RateLimitVersion) -> Self { + Self { + timestamp: version.timestamp, + replica_id: version.replica_id, + is_tombstone: false, + created_at: Instant::now(), + } + } + + fn tombstone(timestamp: u64, replica_id: ReplicaId) -> Self { + Self { + timestamp, + replica_id, + is_tombstone: true, + created_at: Instant::now(), + } + } + + fn version_key(&self) -> (u64, ReplicaId) { + (self.timestamp, self.replica_id) + } + + fn as_rate_limit_version(&self) -> ratelimit::RateLimitVersion { + ratelimit::RateLimitVersion::new(self.timestamp, self.replica_id) + } + + fn matches_version(&self, timestamp: u64, replica_id: ReplicaId) -> bool { + self.timestamp == timestamp && self.replica_id == replica_id + } +} + +pub(crate) struct EpochMaxWinsLegacyEngine { + store: KvStore, + metadata: Arc>>, + key_locks: Arc>>>, + log: Arc>, + clock: LamportClock, + replica_id: ReplicaId, +} + +impl EpochMaxWinsLegacyEngine { + pub(crate) fn new(replica_id: ReplicaId) -> Self { + Self { + store: KvStore::new(), + metadata: Arc::new(DashMap::new()), + key_locks: Arc::new(DashMap::new()), + log: Arc::new(RwLock::new(OperationLog::new())), + clock: LamportClock::new(), + replica_id, + } + } + + fn key_lock_for(&self, key: &str) -> Arc> { + self.key_locks + .entry(key.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + } + + fn key_is_tombstoned_or_unknown(&self, key: &str) -> bool { + self.metadata.get(key).is_none_or(|versions| { + versions + .iter() + .max_by_key(|version| version.version_key()) + .is_none_or(|winner| winner.is_tombstone) + }) + } + + fn try_cleanup_key_lock(&self, key: &str, key_lock: &Arc>) { + if self.store.contains_key(key) || !self.key_is_tombstoned_or_unknown(key) { + return; + } + let _ = self.key_locks.remove_if(key, |_, stored_lock| { + Arc::ptr_eq(stored_lock, key_lock) + && Arc::strong_count(stored_lock) <= 2 + && stored_lock.try_lock().is_some() + }); + } + + fn compact_key_metadata(versions: &mut Vec) { + if versions.len() <= 1 { + return; + } + if let Some(winner) = versions.iter().max_by_key(|v| v.version_key()).cloned() { + versions.clear(); + versions.push(winner); + } + } + + fn newest_tombstone_version(versions: &[ValueMetadata]) -> Option { + versions + .iter() + .filter(|version| version.is_tombstone) + .max_by_key(|version| version.version_key()) + .map(ValueMetadata::as_rate_limit_version) + } + + fn record_epoch_insert_metadata( + &self, + key: &str, + value: &[u8], + timestamp: u64, + replica_id: ReplicaId, + ) -> Option> { + let incoming_version = ratelimit::RateLimitVersion::new(timestamp, replica_id); + let current = self.store.get(key); + + match self.metadata.entry(key.to_string()) { + MapEntry::Occupied(mut entry) => { + let versions = entry.get_mut(); + let current_tombstone = Self::newest_tombstone_version(versions); + let Some(merged) = ratelimit::merge_live_value( + current.as_deref(), + current_tombstone, + value, + incoming_version, + ) else { + Self::compact_key_metadata(versions); + return None; + }; + if !merged.changed { + Self::compact_key_metadata(versions); + return None; + } + versions.clear(); + versions.push(ValueMetadata::from_rate_limit_live_version( + merged.live_version, + )); + Some(merged.value) + } + MapEntry::Vacant(entry) => { + let merged = ratelimit::merge_live_value(None, None, value, incoming_version)?; + entry.insert(vec![ValueMetadata::from_rate_limit_live_version( + merged.live_version, + )]); + Some(merged.value) + } + } + } + + fn apply_epoch_remove_locked(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> bool { + let incoming_tombstone = ratelimit::RateLimitVersion::new(timestamp, replica_id); + let current = self.store.get(key); + + match self.metadata.entry(key.to_string()) { + MapEntry::Occupied(mut entry) => { + let versions = entry.get_mut(); + let already_recorded = versions + .iter() + .any(|v| v.is_tombstone && v.matches_version(timestamp, replica_id)); + if already_recorded { + Self::compact_key_metadata(versions); + return false; + } + let current_tombstone = Self::newest_tombstone_version(versions); + let result = ratelimit::apply_tombstone( + current.as_deref(), + current_tombstone, + incoming_tombstone, + ); + match result { + ratelimit::TombstoneApply::Surviving { + value, + live_version, + } => { + versions.clear(); + versions.push(ValueMetadata::from_rate_limit_live_version(live_version)); + self.store.insert(key.to_string(), value); + } + ratelimit::TombstoneApply::Empty { tombstone_version } => { + // Preserve `created_at` on a dominated tombstone (PR #1469 + // codex P2: older delayed Removes must not refresh GC). + let already_matches = versions.iter().any(|v| { + v.is_tombstone + && v.matches_version( + tombstone_version.timestamp, + tombstone_version.replica_id, + ) + }); + if !already_matches { + versions.clear(); + versions.push(ValueMetadata::tombstone( + tombstone_version.timestamp, + tombstone_version.replica_id, + )); + } + self.store.remove(key); + } + } + true + } + MapEntry::Vacant(entry) => { + // Tombstone for a never-seen key still records ordering so a + // delayed pre-tombstone insert is suppressed (PR #1469). + let result = + ratelimit::apply_tombstone(current.as_deref(), None, incoming_tombstone); + let mut versions = Vec::new(); + match result { + ratelimit::TombstoneApply::Surviving { + value, + live_version, + } => { + versions.push(ValueMetadata::from_rate_limit_live_version(live_version)); + self.store.insert(key.to_string(), value); + } + ratelimit::TombstoneApply::Empty { tombstone_version } => { + versions.push(ValueMetadata::tombstone( + tombstone_version.timestamp, + tombstone_version.replica_id, + )); + self.store.remove(key); + } + } + entry.insert(versions); + true + } + } + } + + fn apply_remote_insert( + &self, + key: &str, + value: Vec, + timestamp: u64, + replica_id: ReplicaId, + ) { + let key_lock = self.key_lock_for(key); + let key_guard = key_lock.lock(); + if let Some(stored) = self.record_epoch_insert_metadata(key, &value, timestamp, replica_id) + { + self.store.insert(key.to_string(), stored); + } + drop(key_guard); + self.try_cleanup_key_lock(key, &key_lock); + } + + fn apply_remote_remove(&self, key: &str, timestamp: u64, replica_id: ReplicaId) { + let key_lock = self.key_lock_for(key); + let key_guard = key_lock.lock(); + self.apply_epoch_remove_locked(key, timestamp, replica_id); + drop(key_guard); + self.try_cleanup_key_lock(key, &key_lock); + } + + fn append_op(&self, op: Operation) { + self.log + .write() + .append_with_strategy(op, |_| crate::crdt_kv::MergeStrategy::EpochMaxWins); + } +} + +impl NamespaceCrdtEngine for EpochMaxWinsLegacyEngine { + fn put_local(&self, key: &str, value: Vec) -> Option> { + let key_lock = self.key_lock_for(key); + let key_guard = key_lock.lock(); + + let previous = self.store.get(key); + let timestamp = self.clock.tick(); + let result = if let Some(stored) = + self.record_epoch_insert_metadata(key, &value, timestamp, self.replica_id) + { + let op = Operation::insert(key.to_string(), value, timestamp, self.replica_id); + self.store.insert(key.to_string(), stored); + self.append_op(op); + debug!( + "EpochMaxWinsLegacyEngine insert: key={}, timestamp={}, replica={}", + key, timestamp, self.replica_id + ); + previous + } else { + self.store.get(key).map(|bytes| bytes.to_vec()) + }; + + drop(key_guard); + self.try_cleanup_key_lock(key, &key_lock); + result + } + + fn delete_local(&self, key: &str) -> Option> { + let key_lock = self.key_lock_for(key); + let key_guard = key_lock.lock(); + + let timestamp = self.clock.tick(); + debug!( + "EpochMaxWinsLegacyEngine remove: key={}, timestamp={}, replica={}", + key, timestamp, self.replica_id + ); + if self.apply_epoch_remove_locked(key, timestamp, self.replica_id) { + let op = Operation::remove(key.to_string(), timestamp, self.replica_id); + self.append_op(op); + } + + drop(key_guard); + self.try_cleanup_key_lock(key, &key_lock); + // EpochMaxWins removes are per-point; no clean "what was removed" answer. + None + } + + fn get(&self, key: &str) -> Option> { + self.store.get(key) + } + + fn contains_key(&self, key: &str) -> bool { + self.store.contains_key(key) + } + + fn keys(&self) -> Vec { + self.store.keys() + } + + fn len(&self) -> usize { + self.store.len() + } + + fn generation(&self) -> u64 { + self.store.generation() + } + + fn export_ops(&self) -> Vec { + self.log.read().operations().to_vec() + } + + fn apply_remote_ops(&self, ops: &[Operation]) { + if ops.is_empty() { + return; + } + + // EpochMaxWins always replays incoming ops because a compacted snapshot + // can carry an embedded tombstone_version at the same op-id as a + // previously-seen raw payload. `merge_live_value.changed` gates the + // store update so identical bytes are still a no-op (PR #1469). + let mut to_apply: Vec = ops.to_vec(); + to_apply.sort_by_key(|op| (op.timestamp(), op.replica_id())); + + { + let mut log = self.log.write(); + let incoming = OperationLog::from_operations(ops.to_vec()); + log.merge_with_strategy(&incoming, |_| crate::crdt_kv::MergeStrategy::EpochMaxWins); + log.compact_with_strategy(|_| crate::crdt_kv::MergeStrategy::EpochMaxWins); + } + + for op in &to_apply { + self.clock.update(op.timestamp()); + match op { + Operation::Insert { + key, + value, + timestamp, + replica_id, + } => { + self.apply_remote_insert(key, value.clone(), *timestamp, *replica_id); + } + Operation::Remove { + key, + timestamp, + replica_id, + } => { + self.apply_remote_remove(key, *timestamp, *replica_id); + } + } + } + } + + fn gc_tombstones(&self, grace: Duration) -> usize { + let now = Instant::now(); + let mut removed = 0; + let keys_to_check: Vec = self + .metadata + .iter() + .filter(|entry| !self.store.contains_key(entry.key())) + .map(|entry| entry.key().clone()) + .collect(); + + for key in keys_to_check { + if !self.key_is_tombstoned_or_unknown(&key) { + continue; + } + self.key_locks.remove_if(&key, |_, lock| { + Arc::strong_count(lock) <= 2 && lock.try_lock().is_some() + }); + let was_removed = self.metadata.remove_if(&key, |_, versions| { + !self.store.contains_key(&key) + && versions + .iter() + .max_by_key(|v| v.version_key()) + .is_none_or(|winner| { + winner.is_tombstone + && now.saturating_duration_since(winner.created_at) >= grace + }) + }); + if was_removed.is_some() { + removed += 1; + } + } + removed + } +} diff --git a/crates/mesh/src/crdt_kv/engine/lww.rs b/crates/mesh/src/crdt_kv/engine/lww.rs new file mode 100644 index 000000000..c898456d1 --- /dev/null +++ b/crates/mesh/src/crdt_kv/engine/lww.rs @@ -0,0 +1,395 @@ +//! Last-writer-wins engine. +//! +//! Conflicts are resolved by `(timestamp, replica_id)` strictly. Tombstones +//! and live writes follow the same ordering; the newer wins. +//! +//! State owned by this engine: +//! - [`KvStore`] for live bytes +//! - per-key metadata vec ([`ValueMetadata`]) carrying timestamp / replica / +//! tombstone flag / GC clock +//! - per-key locks (so same-key writes serialise with metadata updates) +//! - a [`LamportClock`] for stamping local writes +//! - an [`OperationLog`] for replication + +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use dashmap::{mapref::entry::Entry as MapEntry, DashMap}; +use parking_lot::{Mutex, RwLock}; +use tracing::debug; + +use super::NamespaceCrdtEngine; +use crate::crdt_kv::{ + kv_store::KvStore, + operation::{Operation, OperationLog}, + replica::{LamportClock, ReplicaId}, +}; + +#[derive(Debug, Clone)] +struct ValueMetadata { + timestamp: u64, + replica_id: ReplicaId, + is_tombstone: bool, + created_at: Instant, +} + +impl PartialEq for ValueMetadata { + fn eq(&self, other: &Self) -> bool { + self.timestamp == other.timestamp + && self.replica_id == other.replica_id + && self.is_tombstone == other.is_tombstone + } +} + +impl Eq for ValueMetadata {} + +impl ValueMetadata { + fn new(timestamp: u64, replica_id: ReplicaId) -> Self { + Self { + timestamp, + replica_id, + is_tombstone: false, + created_at: Instant::now(), + } + } + + fn tombstone(timestamp: u64, replica_id: ReplicaId) -> Self { + Self { + timestamp, + replica_id, + is_tombstone: true, + created_at: Instant::now(), + } + } + + fn version_key(&self) -> (u64, ReplicaId) { + (self.timestamp, self.replica_id) + } + + fn matches_version(&self, timestamp: u64, replica_id: ReplicaId) -> bool { + self.timestamp == timestamp && self.replica_id == replica_id + } + + fn is_newer_than(&self, timestamp: u64, replica_id: ReplicaId) -> bool { + self.version_key() > (timestamp, replica_id) + } +} + +pub(crate) struct LwwEngine { + store: KvStore, + metadata: Arc>>, + key_locks: Arc>>>, + log: Arc>, + clock: LamportClock, + replica_id: ReplicaId, +} + +impl LwwEngine { + pub(crate) fn new(replica_id: ReplicaId) -> Self { + Self { + store: KvStore::new(), + metadata: Arc::new(DashMap::new()), + key_locks: Arc::new(DashMap::new()), + log: Arc::new(RwLock::new(OperationLog::new())), + clock: LamportClock::new(), + replica_id, + } + } + + fn key_lock_for(&self, key: &str) -> Arc> { + self.key_locks + .entry(key.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + } + + fn key_is_tombstoned_or_unknown(&self, key: &str) -> bool { + self.metadata.get(key).is_none_or(|versions| { + versions + .iter() + .max_by_key(|version| version.version_key()) + .is_none_or(|winner| winner.is_tombstone) + }) + } + + fn try_cleanup_key_lock(&self, key: &str, key_lock: &Arc>) { + if self.store.contains_key(key) || !self.key_is_tombstoned_or_unknown(key) { + return; + } + let _ = self.key_locks.remove_if(key, |_, stored_lock| { + Arc::ptr_eq(stored_lock, key_lock) + && Arc::strong_count(stored_lock) <= 2 + && stored_lock.try_lock().is_some() + }); + } + + fn compact_key_metadata(versions: &mut Vec) { + if versions.len() <= 1 { + return; + } + if let Some(winner) = versions.iter().max_by_key(|v| v.version_key()).cloned() { + versions.clear(); + versions.push(winner); + } + } + + fn record_insert_metadata(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> bool { + let new_metadata = ValueMetadata::new(timestamp, replica_id); + match self.metadata.entry(key.to_string()) { + MapEntry::Occupied(mut entry) => { + let versions = entry.get_mut(); + let has_existing_entry = versions + .iter() + .any(|v| v.matches_version(timestamp, replica_id)); + if has_existing_entry { + Self::compact_key_metadata(versions); + return false; + } + let current_winner = versions.iter().max_by_key(|v| v.version_key()); + if current_winner.is_some_and(|winner| winner.is_newer_than(timestamp, replica_id)) + { + Self::compact_key_metadata(versions); + return false; + } + versions.push(new_metadata); + Self::compact_key_metadata(versions); + true + } + MapEntry::Vacant(entry) => { + entry.insert(vec![new_metadata]); + true + } + } + } + + fn record_remove_metadata(&self, key: &str, timestamp: u64, replica_id: ReplicaId) -> bool { + let tombstone = ValueMetadata::tombstone(timestamp, replica_id); + match self.metadata.entry(key.to_string()) { + MapEntry::Occupied(mut entry) => { + let versions = entry.get_mut(); + let has_existing_entry = versions + .iter() + .any(|v| v.is_tombstone && v.matches_version(timestamp, replica_id)); + if has_existing_entry { + Self::compact_key_metadata(versions); + return false; + } + let has_newer_version = versions + .iter() + .any(|v| v.is_newer_than(timestamp, replica_id)); + if has_newer_version { + Self::compact_key_metadata(versions); + return false; + } + versions.push(tombstone); + Self::compact_key_metadata(versions); + true + } + MapEntry::Vacant(entry) => { + // Tombstone for a never-seen key still records ordering so a + // delayed older insert is suppressed (PR #1469). + entry.insert(vec![tombstone]); + true + } + } + } + + fn apply_insert(&self, key: &str, value: Vec, timestamp: u64, replica_id: ReplicaId) { + let key_lock = self.key_lock_for(key); + let key_guard = key_lock.lock(); + if self.record_insert_metadata(key, timestamp, replica_id) { + self.store.insert(key.to_string(), value); + } + drop(key_guard); + self.try_cleanup_key_lock(key, &key_lock); + } + + fn apply_remove_inner( + &self, + key: &str, + timestamp: u64, + replica_id: ReplicaId, + ) -> Option> { + let key_lock = self.key_lock_for(key); + let key_guard = key_lock.lock(); + let removed = if self.record_remove_metadata(key, timestamp, replica_id) { + self.store.remove(key) + } else { + None + }; + drop(key_guard); + self.try_cleanup_key_lock(key, &key_lock); + removed + } + + fn append_op(&self, op: Operation) { + self.log + .write() + .append_with_strategy(op, |_| crate::crdt_kv::MergeStrategy::LastWriterWins); + } +} + +impl NamespaceCrdtEngine for LwwEngine { + fn put_local(&self, key: &str, value: Vec) -> Option> { + let key_lock = self.key_lock_for(key); + let key_guard = key_lock.lock(); + + let previous = self.store.get(key); + let timestamp = self.clock.tick(); + let accepted = self.record_insert_metadata(key, timestamp, self.replica_id); + let result = if accepted { + let op = Operation::insert(key.to_string(), value.clone(), timestamp, self.replica_id); + self.store.insert(key.to_string(), value); + self.append_op(op); + debug!( + "LwwEngine insert: key={}, timestamp={}, replica={}", + key, timestamp, self.replica_id + ); + previous + } else { + self.store.get(key).map(|bytes| bytes.to_vec()) + }; + + drop(key_guard); + self.try_cleanup_key_lock(key, &key_lock); + result + } + + fn delete_local(&self, key: &str) -> Option> { + let key_lock = self.key_lock_for(key); + let key_guard = key_lock.lock(); + + let timestamp = self.clock.tick(); + debug!( + "LwwEngine remove: key={}, timestamp={}, replica={}", + key, timestamp, self.replica_id + ); + let removed = if self.record_remove_metadata(key, timestamp, self.replica_id) { + let op = Operation::remove(key.to_string(), timestamp, self.replica_id); + self.append_op(op); + self.store.remove(key) + } else { + None + }; + + drop(key_guard); + self.try_cleanup_key_lock(key, &key_lock); + removed + } + + fn get(&self, key: &str) -> Option> { + self.store.get(key) + } + + fn contains_key(&self, key: &str) -> bool { + self.store.contains_key(key) + } + + fn keys(&self) -> Vec { + self.store.keys() + } + + fn len(&self) -> usize { + self.store.len() + } + + fn generation(&self) -> u64 { + self.store.generation() + } + + fn export_ops(&self) -> Vec { + self.log.read().operations().to_vec() + } + + fn apply_remote_ops(&self, ops: &[Operation]) { + if ops.is_empty() { + return; + } + + // Determine which incoming ops the local log has not yet seen. LWW + // dedups by op-id; an op already in the log is a no-op. + let seen: std::collections::HashSet<(ReplicaId, u64)> = self + .log + .read() + .operations() + .iter() + .map(|op| (op.replica_id(), op.timestamp())) + .collect(); + + let mut unseen: Vec = ops + .iter() + .filter(|op| !seen.contains(&(op.replica_id(), op.timestamp()))) + .cloned() + .collect(); + unseen.sort_by_key(|op| (op.timestamp(), op.replica_id())); + + // Merge into the log first so subsequent compaction sees the full + // set, then compact. Strategy callback is LWW since this engine only + // hosts LWW keys. + { + let mut log = self.log.write(); + let incoming = OperationLog::from_operations(ops.to_vec()); + log.merge_with_strategy(&incoming, |_| crate::crdt_kv::MergeStrategy::LastWriterWins); + log.compact_with_strategy(|_| crate::crdt_kv::MergeStrategy::LastWriterWins); + } + + // Apply unseen ops to live state. Lamport clock observes each remote + // timestamp so subsequent local ticks beat it. + for op in &unseen { + self.clock.update(op.timestamp()); + match op { + Operation::Insert { + key, + value, + timestamp, + replica_id, + } => { + self.apply_insert(key, value.clone(), *timestamp, *replica_id); + } + Operation::Remove { + key, + timestamp, + replica_id, + } => { + let _ = self.apply_remove_inner(key, *timestamp, *replica_id); + } + } + } + } + + fn gc_tombstones(&self, grace: Duration) -> usize { + let now = Instant::now(); + let mut removed = 0; + let keys_to_check: Vec = self + .metadata + .iter() + .filter(|entry| !self.store.contains_key(entry.key())) + .map(|entry| entry.key().clone()) + .collect(); + + for key in keys_to_check { + if !self.key_is_tombstoned_or_unknown(&key) { + continue; + } + self.key_locks.remove_if(&key, |_, lock| { + Arc::strong_count(lock) <= 2 && lock.try_lock().is_some() + }); + let was_removed = self.metadata.remove_if(&key, |_, versions| { + !self.store.contains_key(&key) + && versions + .iter() + .max_by_key(|v| v.version_key()) + .is_none_or(|winner| { + winner.is_tombstone + && now.saturating_duration_since(winner.created_at) >= grace + }) + }); + if was_removed.is_some() { + removed += 1; + } + } + removed + } +} diff --git a/crates/mesh/src/crdt_kv/engine/mod.rs b/crates/mesh/src/crdt_kv/engine/mod.rs new file mode 100644 index 000000000..666c26501 --- /dev/null +++ b/crates/mesh/src/crdt_kv/engine/mod.rs @@ -0,0 +1,78 @@ +//! Namespace CRDT engines. +//! +//! Each namespace (`worker:`, `rl:`, `config:`, ...) is owned by exactly one +//! engine that implements [`NamespaceCrdtEngine`]. The engine owns its live +//! state, metadata, operation log, per-key locks, and logical clock - all the +//! invariants that make its CRDT strategy work. +//! +//! [`crdt::CrdtOrMap`](super::crdt::CrdtOrMap) above this layer is just a +//! router: it matches each key to the right engine by registered prefix and +//! delegates. The router does not know LWW vs EpochMaxWins. +//! +//! This split exists so a new strategy (EpochMaxWins today, future ones later) +//! does not require strategy branches inside every entry point of the shared +//! store - the seam that produced most of the bug class in PR #1469. + +use std::{sync::Arc, time::Duration}; + +use super::operation::Operation; + +mod epoch_max_wins; +mod lww; + +pub(super) use epoch_max_wins::EpochMaxWinsLegacyEngine; +pub(super) use lww::LwwEngine; + +/// The state machine a single namespace runs. +/// +/// All methods are byte-oriented at this boundary. Engines are free to use +/// typed internal representations (e.g. `RateLimitShard` inside +/// `EpochMaxWinsLegacyEngine`); the trait deliberately does not expose those +/// types so the dispatch layer stays strategy-agnostic. +pub(super) trait NamespaceCrdtEngine: Send + Sync { + // ---- Local writes ---- + + /// Apply a local put. Returns the previous live bytes when the write was + /// accepted, or `None` when the write was rejected or did not displace a + /// well-defined previous value (e.g. per-point shard updates). + fn put_local(&self, key: &str, value: Vec) -> Option>; + + /// Apply a local delete. Returns the previous live bytes when the delete + /// removed an existing value, or `None` otherwise. + fn delete_local(&self, key: &str) -> Option>; + + // ---- Reads ---- + + fn get(&self, key: &str) -> Option>; + fn contains_key(&self, key: &str) -> bool; + fn keys(&self) -> Vec; + fn len(&self) -> usize; + + /// Monotonically increasing mutation counter. Increments on every accepted + /// local or remote write that changes live state. + fn generation(&self) -> u64; + + // ---- Replication ---- + + /// Snapshot every operation this engine has retained, in deterministic + /// order. The router concatenates snapshots from all engines to build the + /// gossip-visible operation log. + fn export_ops(&self) -> Vec; + + /// Merge a batch of incoming operations into this engine. The engine + /// merges into its log, canonicalises (compaction, tombstone collapse, + /// same-op-id folding), and applies only the post-canonicalisation result + /// to live state. This is where the "post-compaction-replay footgun" (PR + /// #1469) gets sealed inside the engine. + fn apply_remote_ops(&self, ops: &[Operation]); + + // ---- Maintenance ---- + + /// Garbage-collect tombstones older than `grace`. Returns the number of + /// metadata entries removed. + fn gc_tombstones(&self, grace: Duration) -> usize; +} + +/// Strategy-agnostic engine handle. Routers hold `Arc` keyed by registered prefix. +pub(super) type EngineHandle = Arc; diff --git a/crates/mesh/src/crdt_kv/kv_store.rs b/crates/mesh/src/crdt_kv/kv_store.rs index 820a53213..204402fe1 100644 --- a/crates/mesh/src/crdt_kv/kv_store.rs +++ b/crates/mesh/src/crdt_kv/kv_store.rs @@ -66,14 +66,6 @@ impl KvStore { pub fn keys(&self) -> Vec { self.store.iter().map(|entry| entry.key().clone()).collect() } - - /// Get all key-value pairs as a BTreeMap - pub fn all(&self) -> std::collections::BTreeMap> { - self.store - .iter() - .map(|entry| (entry.key().clone(), entry.value().clone())) - .collect() - } } impl Default for KvStore { diff --git a/crates/mesh/src/crdt_kv/mod.rs b/crates/mesh/src/crdt_kv/mod.rs index ebc825622..cc98fe646 100644 --- a/crates/mesh/src/crdt_kv/mod.rs +++ b/crates/mesh/src/crdt_kv/mod.rs @@ -3,6 +3,7 @@ // ============================================================================ mod crdt; +mod engine; mod epoch_max_wins; mod kv_store; mod merge_strategy; diff --git a/crates/mesh/src/crdt_kv/operation.rs b/crates/mesh/src/crdt_kv/operation.rs index a50487c5f..30ce6b76e 100644 --- a/crates/mesh/src/crdt_kv/operation.rs +++ b/crates/mesh/src/crdt_kv/operation.rs @@ -101,6 +101,13 @@ impl OperationLog { } } + /// Build an operation log from a pre-collected vector. Used by the + /// engine router to concatenate per-engine ops back into a single log + /// for gossip export. + pub(super) fn from_operations(operations: Vec) -> Self { + Self { operations } + } + /// Threshold at which auto-compaction triggers. After compaction, the log /// shrinks to at most one entry per unique key, so the next compaction /// won't trigger until enough new operations accumulate again.