refactor(mesh): extract NamespaceCrdtEngine + LwwEngine + transitional EpochMaxWins#1539
refactor(mesh): extract NamespaceCrdtEngine + LwwEngine + transitional EpochMaxWins#1539CatherineSue wants to merge 1 commit into
Conversation
…l EpochMaxWins Step 2 of the per-strategy engine refactor (.claude/docs/mesh/crdt-kv-namespace-engine-design.md). No behavior change; preserves the full public surface of `CrdtOrMap` and all 149 mesh lib tests. Why this shape The old `CrdtOrMap` was one shared store - one `KvStore`, one `ValueMetadata` map, one `key_locks`, one `LamportClock`, one `OperationLog` - plus a per-prefix `MergeStrategy` table that branched at every entry point. Every strategy-specific invariant had to be threaded through every shared call site, and every bug in PR #1469 traced back to that seam. What changed - `crdt_kv::engine::NamespaceCrdtEngine` is the new boundary: a single trait that owns local writes, reads, replication, GC, and the in-engine state machine for one namespace's CRDT strategy. State (store, metadata, locks, clock, log) is engine-internal; the trait surface stays byte-oriented at the boundary. - `LwwEngine` implements LWW with its own self-contained state. The old LWW code (`record_insert_metadata`, `record_remove_metadata`, `compact_key_metadata`, `gc_tombstones`, LWW branches of `apply_insert_locked` / `apply_remove`) moves verbatim into this engine. - `EpochMaxWinsLegacyEngine` is a transitional wrapper that lifts the current inline EpochMaxWins code into the trait shape with its own state. PR #3 will replace it with a real `RateLimitEngine` that holds typed `RateLimitShard` state directly without the LWW-shaped `ValueMetadata` layer. - `CrdtOrMap` becomes a thin router: a sorted prefix table maps each key to its engine via longest-prefix-match. Unregistered keys fall through to a built-in default LWW engine so callers that never call `register_merge_strategy` (notably the in-crate tests) keep working. `crdt.rs` shrinks from 808 lines to ~240. What remains for follow-ups - PR #3: replace `EpochMaxWinsLegacyEngine` with `RateLimitEngine` (typed shard state, no LWW-shaped metadata, drops the `_with_strategy` siblings on `OperationLog`). - PR #4 (cosmetic): split `replica.rs` into `replica.rs` + `clock.rs`; introduce associated types on the trait if needed; promote `OperationLog::{merge,compact}_with_strategy` to per-engine ops. - Lifecycle hooks (drain/shutdown) intentionally deferred - CRDT state is purely in-memory today, no need for them yet. Verification - `cargo test -p smg-mesh --lib` - 149 tests pass. - `cargo clippy --workspace --all-targets --all-features -- -D warnings` - clean. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
📝 WalkthroughWalkthroughCrdtOrMap is refactored from a monolithic OR-Map implementation into a thin router that delegates operations to per-namespace CRDT engines. Two concrete engines provide merge semantics: LwwEngine (default, Last-Writer-Wins) and EpochMaxWinsLegacyEngine (backwards-compatible). The router uses longest-prefix-match over registered namespaces to select the appropriate engine. ChangesEngine Router Architecture
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 77327d171c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let engine: EngineHandle = match strategy { | ||
| MergeStrategy::LastWriterWins => Arc::new(LwwEngine::new(self.replica_id)), | ||
| MergeStrategy::EpochMaxWins => Arc::new(EpochMaxWinsLegacyEngine::new(self.replica_id)), |
There was a problem hiding this comment.
Preserve key state when registering a new prefix engine
Creating a brand-new engine on every register_merge_strategy call immediately changes routing for matching keys but does not migrate any existing entries from the default/older engine. If a prefix is registered after data has already been written (or after a more-general prefix has handled writes), subsequent get/remove/merge calls for those keys hit an empty engine and the previously stored values/tombstones become unreachable, which is a behavioral regression from the prior single-store design. Consider migrating matching keys/log state during registration or rejecting late/overlapping registrations once data exists.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request refactors CrdtOrMap into a router that delegates operations to prefix-specific engines via a new NamespaceCrdtEngine trait, isolating strategy-specific logic to prevent regression bugs. Review feedback identifies an issue with engine prioritization in all_engines that could lead to data duplication in all() and keys(), and suggests using HashSet for unique key collection. Reviewers also recommended performance optimizations, such as zero-copy operation handling and more efficient state retrieval methods. Additionally, a discrepancy between the put_local documentation and its implementation was noted, along with a potential violation of the monotonic property in the generation counter.
| fn all_engines(&self) -> Vec<EngineHandle> { | ||
| let engines = self.engines_snapshot(); | ||
| let mut out = Vec::with_capacity(engines.len() + 1); | ||
| for (_, engine) in engines.iter() { | ||
| out.push(Arc::clone(engine)); | ||
| } | ||
|
|
||
| let _ = self.key_locks.remove_if(key, |_, stored_lock| { | ||
| Arc::ptr_eq(stored_lock, key_lock) | ||
| && Arc::strong_count(stored_lock) <= 2 | ||
| && stored_lock.try_lock().is_some() | ||
| }); | ||
| out.push(Arc::clone(&self.default_engine)); | ||
| out | ||
| } |
There was a problem hiding this comment.
The all_engines method returns engines in decreasing order of priority (longest prefix first, followed by the default engine). This order causes issues in methods like all() and keys() because lower-priority engines are processed last, allowing them to overwrite or duplicate data from higher-priority engines. Reversing this order ensures that higher-priority engines (longest prefix matches) have the final say in deduplicated collections.
| fn all_engines(&self) -> Vec<EngineHandle> { | |
| let engines = self.engines_snapshot(); | |
| let mut out = Vec::with_capacity(engines.len() + 1); | |
| for (_, engine) in engines.iter() { | |
| out.push(Arc::clone(engine)); | |
| } | |
| let _ = self.key_locks.remove_if(key, |_, stored_lock| { | |
| Arc::ptr_eq(stored_lock, key_lock) | |
| && Arc::strong_count(stored_lock) <= 2 | |
| && stored_lock.try_lock().is_some() | |
| }); | |
| out.push(Arc::clone(&self.default_engine)); | |
| out | |
| } | |
| fn all_engines(&self) -> Vec<EngineHandle> { | |
| let engines = self.engines_snapshot(); | |
| let mut out = Vec::with_capacity(engines.len() + 1); | |
| // Default engine has the lowest priority | |
| out.push(Arc::clone(&self.default_engine)); | |
| // Registered engines in increasing order of prefix length (lowest priority first) | |
| for (_, engine) in engines.iter().rev() { | |
| out.push(Arc::clone(engine)); | |
| } | |
| out | |
| } |
| pub fn generation(&self) -> u64 { | ||
| self.store.generation() | ||
| self.all_engines().iter().map(|e| e.generation()).sum() | ||
| } |
There was a problem hiding this comment.
The generation() method sums the generations of all engines. However, if an engine is replaced via register_merge_strategy, its internal generation resets to 0. This can cause the total generation sum to decrease, violating the "monotonically increasing" property typically expected of such counters (and explicitly mentioned in the NamespaceCrdtEngine::generation docstring). While this might be acceptable for startup-only configuration, it's worth noting as a potential side effect for callers relying on monotonic progress.
| pub fn keys(&self) -> Vec<String> { | ||
| self.store.keys() | ||
| let mut all = Vec::new(); | ||
| for engine in self.all_engines() { | ||
| all.extend(engine.keys()); | ||
| } | ||
| all | ||
| } |
There was a problem hiding this comment.
The keys() method now returns a concatenation of all keys from all engines. If namespaces overlap (e.g., a: and ab:), a key matching both prefixes could appear multiple times in the result. To prevent vulnerabilities from duplicate entries, use a data structure that inherently enforces uniqueness, such as HashSet, instead of manually deduplicating the Vec to maintain backward compatibility.
References
- To prevent vulnerabilities from duplicate entries, use data structures that inherently enforce uniqueness, such as
HashSet, instead of manually deduplicating collections likeVec.
| pub fn all(&self) -> BTreeMap<String, Vec<u8>> { | ||
| let mut all = BTreeMap::new(); | ||
| for engine in self.all_engines() { | ||
| for key in engine.keys() { | ||
| if let Some(value) = engine.get(&key) { | ||
| all.insert(key, value); | ||
| } | ||
| } | ||
| } | ||
| all | ||
| } |
There was a problem hiding this comment.
The implementation of all() is inefficient as it performs a full scan of keys via engine.keys() followed by individual engine.get() calls for each key. This results in all() method to the NamespaceCrdtEngine trait to allow engines to return their full state more efficiently (e.g., by iterating over their internal KvStore directly).
| /// Apply a local put. Returns the previous live bytes when the write was | ||
| /// accepted, or `None` when the write was rejected or did not displace a | ||
| /// well-defined previous value (e.g. per-point shard updates). | ||
| fn put_local(&self, key: &str, value: Vec<u8>) -> Option<Vec<u8>>; |
There was a problem hiding this comment.
The docstring for put_local states it returns None when the write is rejected. However, the implementations in LwwEngine and EpochMaxWinsLegacyEngine return the current value from the store when a write is rejected (e.g., due to an older timestamp). Please update the docstring to accurately reflect the behavior or adjust the implementations to return None on rejection.
| /// same-op-id folding), and applies only the post-canonicalisation result | ||
| /// to live state. This is where the "post-compaction-replay footgun" (PR | ||
| /// #1469) gets sealed inside the engine. | ||
| fn apply_remote_ops(&self, ops: &[Operation]); |
There was a problem hiding this comment.
To enable zero-copy operations and improve performance, design functions to take ownership of data (e.g., Vec<Operation>) rather than borrowing. The router already creates owned buckets in merge(), so passing them by value would allow engines to move the data into their OperationLog without an extra to_vec() call.
| fn apply_remote_ops(&self, ops: &[Operation]); | |
| fn apply_remote_ops(&self, ops: Vec<Operation>); |
References
- To enable zero-copy operations and improve performance, design functions to take ownership of data rather than borrowing, especially when the caller no longer needs the data.
| // hosts LWW keys. | ||
| { | ||
| let mut log = self.log.write(); | ||
| let incoming = OperationLog::from_operations(ops.to_vec()); |
There was a problem hiding this comment.
ops.to_vec() creates a redundant deep clone of the entire operation batch. By updating the NamespaceCrdtEngine trait to take ownership, this allocation can be eliminated by moving the bucket directly into OperationLog::from_operations, enabling zero-copy operations.
References
- To enable zero-copy operations and improve performance, design functions to take ownership of data rather than borrowing, especially when the caller no longer needs the data.
- Avoid cloning data structures when extending a buffer; instead, extend directly from the guarded slice to enable zero-copy operations.
| previous | ||
| } else { | ||
| self.store.get(key).map(|bytes| bytes.to_vec()) | ||
| }; |
There was a problem hiding this comment.
🟡 Nit: The trait doc on put_local says it returns None when the write is rejected, but this branch returns Some(current_value). Same pattern in EpochMaxWinsLegacyEngine::put_local at epoch_max_wins.rs:316.
In practice this is dead code for local writes — the Lamport clock always advances past existing timestamps under the key lock, so record_insert_metadata never returns false here. And the sole external caller (kv.rs:264) discards the return value. So no behavioral impact today. But worth aligning with the documented contract so a future consumer that pattern-matches on None to detect rejection isn't surprised.
| previous | |
| } else { | |
| self.store.get(key).map(|bytes| bytes.to_vec()) | |
| }; | |
| } else { | |
| None | |
| }; |
| previous | ||
| } else { | ||
| self.store.get(key).map(|bytes| bytes.to_vec()) | ||
| }; |
There was a problem hiding this comment.
🟡 Nit: Same trait-contract mismatch as LwwEngine — should return None on rejection per the put_local doc. Same reasoning applies: dead code for local writes, no current caller checks the return, but worth aligning.
| previous | |
| } else { | |
| self.store.get(key).map(|bytes| bytes.to_vec()) | |
| }; | |
| } else { | |
| None | |
| }; |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/mesh/src/crdt_kv/engine/mod.rs`:
- Around line 35-37: The put_local trait docs currently state rejected writes
return None but actual engines return the current live value on rejection;
update the doc for the put_local method in the engine trait to reflect the real
contract: specify that put_local returns Some(previous_live_bytes) when the
write is accepted, returns Some(current_live_bytes) when the write is rejected
(i.e., the value that prevented the write), and only returns None for cases with
no well-defined previous value (e.g., per-point shard updates); reference the
put_local method in the engine trait and ensure wording matches existing engine
implementations or else adjust implementations to match the updated contract.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: c6e8f953-4b70-44cd-a112-e33ab3d025a3
📒 Files selected for processing (7)
crates/mesh/src/crdt_kv/crdt.rscrates/mesh/src/crdt_kv/engine/epoch_max_wins.rscrates/mesh/src/crdt_kv/engine/lww.rscrates/mesh/src/crdt_kv/engine/mod.rscrates/mesh/src/crdt_kv/kv_store.rscrates/mesh/src/crdt_kv/mod.rscrates/mesh/src/crdt_kv/operation.rs
💤 Files with no reviewable changes (1)
- crates/mesh/src/crdt_kv/kv_store.rs
| /// Apply a local put. Returns the previous live bytes when the write was | ||
| /// accepted, or `None` when the write was rejected or did not displace a | ||
| /// well-defined previous value (e.g. per-point shard updates). |
There was a problem hiding this comment.
Fix put_local contract text to match actual engine behavior.
The trait docs say rejected writes return None, but both current engines can return the current live value on rejection. Please align the doc contract (or impls) to avoid ambiguous semantics for future engines.
Suggested doc adjustment
- /// Apply a local put. Returns the previous live bytes when the write was
- /// accepted, or `None` when the write was rejected or did not displace a
- /// well-defined previous value (e.g. per-point shard updates).
+ /// Apply a local put. Returns the previous live bytes when the write is
+ /// accepted. If rejected by strategy ordering, returns the current live
+ /// bytes (if any). Returns `None` when no live value is available.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Apply a local put. Returns the previous live bytes when the write was | |
| /// accepted, or `None` when the write was rejected or did not displace a | |
| /// well-defined previous value (e.g. per-point shard updates). | |
| /// Apply a local put. Returns the previous live bytes when the write is | |
| /// accepted. If rejected by strategy ordering, returns the current live | |
| /// bytes (if any). Returns `None` when no live value is available. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/mesh/src/crdt_kv/engine/mod.rs` around lines 35 - 37, The put_local
trait docs currently state rejected writes return None but actual engines return
the current live value on rejection; update the doc for the put_local method in
the engine trait to reflect the real contract: specify that put_local returns
Some(previous_live_bytes) when the write is accepted, returns
Some(current_live_bytes) when the write is rejected (i.e., the value that
prevented the write), and only returns None for cases with no well-defined
previous value (e.g., per-point shard updates); reference the put_local method
in the engine trait and ensure wording matches existing engine implementations
or else adjust implementations to match the updated contract.
Description
Problem
The old
CrdtOrMapwas one shared store — oneKvStore, oneValueMetadatamap, onekey_locks, oneLamportClock, oneOperationLog— plus a per-prefixMergeStrategytable that branched at every entry point (apply_insert_locked,apply_remove,merge,compact_with_strategy,merge_with_strategy,snapshot_and_truncate). Every strategy-specific invariant had to be threaded through every shared call site, and every bug fixed in #1469 traced back to that seam.Solution
Extract
NamespaceCrdtEngineas the architectural boundary. Each registered prefix gets its own engine that owns its full CRDT state machine (live store, metadata, key locks, clock, operation log, conflict-resolution rules).CrdtOrMapbecomes a thin router that matches each key to the right engine by longest-prefix-match and delegates.This PR implements step 2 of the migration plan: extract the trait, implement
LwwEngine, and lift the existing EpochMaxWins logic into a transitionalEpochMaxWinsLegacyEngineso the trait surface is exercised end-to-end. A follow-up PR will replace the legacy wrapper with a realRateLimitEnginethat holds typedRateLimitShardstate without LWW-shaped metadata.Design tracking issue: #1540 has the full design — problem, alternatives considered, what each layer owns, wire-format compatibility, the remote-apply footgun the engine API seals, and the full migration plan.
No behavior change in this PR. All 149 mesh lib tests pass without modification. Public surface of
CrdtOrMapis preserved.Changes
New module
crates/mesh/src/crdt_kv/engine/mod.rs—NamespaceCrdtEnginetrait. Byte-oriented at the boundary (per design): the engine can use typed state internally (e.g.RateLimitShard) butput_local/get/export_opsdeal inVec<u8>andOperation. Methods cover local writes, reads, replication (export_ops/apply_remote_ops), and GC. State (store, metadata, locks, clock, log) is engine-internal — the trait deliberately doesn't exposeValueMetadata.lww.rs—LwwEngine. Self-contained state: ownKvStore, ownValueMetadatamap, ownkey_locks, ownLamportClock, ownOperationLog. The old LWW code (record_insert_metadata,record_remove_metadata,compact_key_metadata,gc_tombstones, LWW branches ofapply_insert_locked/apply_remove) moves verbatim into this engine.epoch_max_wins.rs—EpochMaxWinsLegacyEngine. Transitional wrapper that lifts the current inline EpochMaxWins code into the trait shape with its own state. A follow-up will replace it with a realRateLimitEnginethat holds typedRateLimitShardstate directly without the LWW-shapedValueMetadatalayer.Router rewrite
crates/mesh/src/crdt_kv/crdt.rs—CrdtOrMaprewritten as a router. Sorted prefix table (Arc<RwLock<Arc<[(String, EngineHandle)]>>>) maps each key to its engine via longest-prefix-match. Unregistered keys fall through to a built-in defaultLwwEngineso callers that never callregister_merge_strategy(notably the in-crate tests) keep working unchanged. File shrinks from 808 lines to ~240.Small supporting changes
OperationLog::from_operations(Vec<Operation>) -> Self— crate-private constructor used by the router to concatenate per-engine logs back into a singleOperationLogfor gossip export.KvStore::allremoved — dead code after the router stopped using it; engines aggregate viakeys()+get().Out of scope (deliberate follow-ups)
EpochMaxWinsLegacyEnginewith a realRateLimitEngine(typed shard state, no LWW-shaped metadata, drop the_with_strategysiblings onOperationLog).replica.rsintoreplica.rs+clock.rs; introduce associated types on the trait if needed.Test Plan
cargo test -p smg-mesh --lib— 149 tests pass (same as pre-refactor).cargo clippy --workspace --all-targets --all-features -- -D warnings— clean.cargo build -p smg— clean.Checklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspasses