Skip to content

[Design] Per-namespace CRDT engine architecture for crdt_kv #1540

@CatherineSue

Description

@CatherineSue

This is the design note for refactoring crates/mesh/src/crdt_kv/ from one shared CrdtOrMap (with a per-prefix MergeStrategy table that branches at every entry point) into per-namespace CRDT engines that each own their own state and invariants. It is intentionally a design note, not a patch plan — the goal is to separate the generic mesh replication plumbing from the CRDT semantics of each namespace.

Short Version

The current CrdtOrMap is one shared store with a per-prefix MergeStrategy table. That worked while every key behaved like Last-Writer-Wins. It became fragile when EpochMaxWins needed different insert, remove, compaction, snapshot, merge-log, and decode behavior.

The proposed shape is:

trait NamespaceCrdtEngine: Send + Sync {
    fn put_local(&self, key: &str, value: Vec<u8>) -> ApplyResult;
    fn delete_local(&self, key: &str) -> ApplyResult;
    fn get(&self, key: &str) -> Option<Vec<u8>>;
    fn keys(&self) -> Vec<String>;

    fn export_batch(&self, since: Option<CrdtWatermark>) -> Vec<Operation>;
    fn apply_remote_batch(&self, operations: &[Operation]) -> Vec<CrdtNotification>;
    fn compact(&self);
    fn gc(&self, now: Instant);
}

enum CrdtNamespaceEngine {
    Lww(LwwEngine),
    RateLimit(RateLimitEngine),
}

MeshKV and gossip stay generic. The namespace engine owns CRDT semantics. There is no global MergeStrategy branch inside every CrdtOrMap entry point.

In the simplest migration, the wire operation shape can stay as today's Operation::{Insert, Remove}. Each engine receives a list of operations for its namespace and interprets value: Vec<u8> according to that namespace. Typed internal operation logs can come later.

What "Store" Means Here

In this proposal, "store" does not mean only a DashMap<String, Vec<u8>>. It means the whole CRDT state machine for one namespace strategy:

Part LWW example Rate-limit example
Live state key -> value + version key -> RateLimitShard
Delete state key -> tombstone version shard tombstone frontier
Operation log LWW insert/remove ops rate-limit update/remove/snapshot ops
Merge rules newest (timestamp, replica_id) wins tombstone partitions history, then epoch/count wins
Compaction rules keep latest op per key fold full shard history into canonical shard or tombstone
Public value shape raw app bytes decoded EpochCount or canonical shard policy

LwwStore and RateLimitStore would be concrete stores or engines. I prefer the name LwwEngine / RateLimitEngine because the object owns behavior, not just bytes.

Current Shape

Today the shape is approximately:

flowchart TD
    ns["CrdtNamespace"]
    map["CrdtOrMap"]
    strategies["merge_strategies: prefix -> MergeStrategy"]
    store["KvStore: key -> bytes"]
    metadata["ValueMetadata"]
    log["OperationLog"]
    epoch["epoch_max_wins.rs"]

    ns --> map
    map --> strategies
    map --> store
    map --> metadata
    map --> log
    map --> epoch
    log --> strategies
    log --> epoch
Loading

Detailed call flow (validated against main)

flowchart TD
  A["MeshKV::configure_crdt_prefix(prefix, strategy)"] --> B["CrdtOrMap::register_merge_strategy"]
  B --> C["Shared CrdtOrMap\nstore + metadata + operation_log + strategy side table"]

  P["CrdtNamespace::put"] --> I["CrdtOrMap::insert"]
  D["CrdtNamespace::delete"] --> R["CrdtOrMap::remove"]

  I --> IL["apply_insert_locked"]
  IL --> S1{"strategy for key?"}
  S1 -->|LWW| LWI["record_insert_metadata\nthen store.insert raw value"]
  S1 -->|Epoch| EWI["record_epoch_insert_metadata\nmerge_live_value\nthen store.insert normalized shard"]
  I --> AP["append_operation"]
  AP --> AWS["OperationLog::append_with_strategy"]
  AWS --> CWS["maybe compact_with_strategy"]

  R --> S2{"strategy for key?"}
  S2 -->|LWW| LWR["record_remove_metadata\nthen store.remove"]
  S2 -->|Epoch| EWR["apply_epoch_remove_locked\napply_tombstone per point"]
  R --> AP

  M["CrdtOrMap::merge(remote log)"] --> SEEN["read local op ids"]
  SEEN --> UNSEEN["build unseen_operations from incoming log\nLWW filters seen op ids\nEpoch replays all"]
  UNSEEN --> LOGMERGE["operation_log.merge_with_strategy(remote)"]
  LOGMERGE --> LOGCOMPACT["compact_operation_log"]
  LOGCOMPACT --> APPLY["apply each preselected unseen operation"]
  APPLY --> AO["apply_operation"]
  AO --> IL
  AO --> AR["apply_remove"]
  AR --> S3{"strategy for key?"}
  S3 -->|LWW| LWR
  S3 -->|Epoch| EWR

  CWS --> BYKEY["latest_operations_by_key_with_strategy"]
  LOGCOMPACT --> BYKEY
  BYKEY -->|LWW| MAXTS["latest_lww_operation"]
  BYKEY -->|Epoch| ECO["epoch_max_wins::compact_operations"]
Loading

The same CrdtOrMap handles LWW and rate-limit state. Strategy-specific logic is spread across:

Location What branches today
apply_insert_locked raw LWW insert vs normalized RateLimitShard merge
apply_remove / remove LWW tombstone vs rate-limit per-point tombstone
OperationLog::compact_with_strategy latest LWW op vs holistic rate-limit fold
OperationLog::merge_with_strategy op-id dedupe vs same-op-id shard merge
snapshot_and_truncate strategy-specific snapshot semantics
public decoders raw write payload vs normalized stored shard

That is the main design problem: the invariants are not located in one place.

Proposed Shape

flowchart TD
    mesh["MeshKV / gossip / CRDT_BATCH"]
    registry["Namespace registry"]
    lww["LwwEngine"]
    rl["RateLimitEngine"]
    lww_state["LWW values, tombstones, log"]
    rl_state["RateLimitShard state, tombstone frontier, log"]

    mesh --> registry
    registry --> lww
    registry --> rl
    lww --> lww_state
    rl --> rl_state
Loading

Detailed engine-boundary dispatch

flowchart TD
  P["CrdtNamespace::put / delete"] --> ROUTER["CrdtOrMap router\nlongest-prefix-match"]
  G["CrdtOrMap::merge(incoming log)"] --> GROUP["group operations by destination engine"]
  GROUP --> ROUTER

  ROUTER -->|prefix=worker:| LE["LwwEngine"]
  ROUTER -->|prefix=config:| LE2["LwwEngine (config:)"]
  ROUTER -->|prefix=rl:| RLE["RateLimitEngine"]
  ROUTER -->|unregistered| DEF["default LwwEngine\nbuilt-in catch-all"]

  LE --> LE_API["put_local / delete_local /\napply_remote_ops / gc_tombstones / export_ops"]
  RLE --> RLE_API["put_local / delete_local /\napply_remote_ops / gc_tombstones / export_ops"]

  LE_API --> LE_STATE["LwwEngine state:\nKvStore (raw bytes) + LwwMetadata\n(ts,replica,tombstone,GC clock)\n+ key_locks + LamportClock\n+ OperationLog"]
  RLE_API --> RLE_STATE["RateLimitEngine state:\nDashMap<String, RateLimitShard>\n+ DashMap<String, RateLimitVersion>\n(fully-killed-key tombstones)\n+ key_locks + LamportClock\n+ OperationLog"]
Loading

What RateLimitEngine internally encapsulates

The shard math that today lives in epoch_max_wins.rs (and that the LWW-shaped ValueMetadata cannot represent cleanly) lives entirely inside one engine:

flowchart TD
  RAW["raw app payload\n16 bytes (epoch, count)"] --> SFIV["state_from_insert_value"]
  SHARD["serialized RateLimitShard\nbytes from peer or local store"] --> SFIV
  SFIV --> STATE["RateLimitState\n(Live shard or Tombstone)"]

  ML["merge_live_value (insert path)"] --> CUR["decode current store shard"]
  ML --> TOMBS["read current tombstone\nfrom engine-owned metadata"]
  CUR --> RLM["RateLimitState::merge\n(tombstone-aware live points)"]
  TOMBS --> RLM
  STATE --> RLM
  RLM --> ENC["encode RateLimitShard"]
  ENC --> STORE["store bytes + live metadata"]

  RM["apply_tombstone (remove path)"] --> CUR2["decode current store shard"]
  RM --> RLM2["RateLimitState::merge with Tombstone(version)"]
  RLM2 --> SURVIVE{"post-tombstone\nlive points?"}
  SURVIVE -->|yes| STORE2["TombstoneApply::Surviving\nstore filtered shard + live_version metadata"]
  SURVIVE -->|no| EMPTY["TombstoneApply::Empty\nremove store value + tombstone metadata"]

  ECO["compact_operations (log compaction)"] --> FOLD["fold all ops into RateLimitState\nvia repeated merge"]
  FOLD --> INTO["into_operation:\nInsert(newest live op-id, encoded shard)\nor Remove(tombstone op-id)"]
Loading

The generic layer should own:

Generic layer owns Reason
Namespace registration route a key or namespace to an engine
Peer sync / batching same transport for every CRDT type
Watermarks / delivery bookkeeping not strategy-specific
Subscriber fanout common notification plumbing
Metrics and tracing common operational surface

Each engine should own:

Engine owns Reason
Stored value shape LWW bytes and rate-limit shards are not the same type
Metadata shape one ValueMetadata type should not serve every CRDT
Tombstone semantics delete boundaries are strategy-specific
Operation compaction compaction must preserve that strategy's invariants
Log merge semantics op-id collision rules are strategy-specific
Remote replay semantics apply only the canonical post-merge/post-compaction result
Public encode/decode policy subscribers and get() should see a coherent value shape
Per-key locking same-key writes must be serialized with the engine's state updates
Use of (but not ownership of) a shared Lamport clock engines stamp local ops with timestamps from a node-shared clock; see Engine Context

Engine Context

The engine should not discover every dependency globally. The registry should construct engines with a small context object:

struct NamespaceEngineContext {
    namespace: String,
    replica_id: ReplicaId,
    notifier: Arc<dyn CrdtNotifier>,
    clock: Arc<LamportClock>,   // shared across all engines on this node
}

The notifier is passed in by the namespace registry. The engine calls it after local writes and after remote writes that change observable state. That keeps notification timing inside the same critical section as CRDT application, which is safer than asking the registry to infer state changes after the fact.

Per-key locks should be engine-internal. LwwEngine and RateLimitEngine may both use the same helper type, but lock acquisition must live next to the state mutation it protects.

The Lamport clock should stay shared at the node level

Per the v2 docs (mesh-v2-qa.md), Lamport clocks and watermarks have different scopes:

Lamport clock Watermark
Scope per-node (one counter per process), stamped on every op put on the wire per-peer, never leaves the node
Purpose merge correctness — higher (timestamp, replica_id) wins delivery bookkeeping — "I've sent ops 0..N to this peer, resume from N+1 on retry"
Visible to peers? yes (embedded in every Operation) no (analogous to TCP sequence numbers — both ends use it, neither sends it)

The whole point of Lamport is "give every op produced by this node a strictly monotonic timestamp so merges are deterministic across the cluster." That's a node-level invariant, not a namespace-level one.

A per-engine clock is technically possible — op IDs only need to be unique inside the log that consumes them, and the router can dispatch by namespace before any cross-engine collision would matter. But it has a transport-layer cost:

  • Today: one shared LamportClock → one logical op stream per node → one watermark per peer (HashMap<PeerId, u64>).
  • Per-engine clocks: each engine has its own op-id namespace → each engine has its own log → watermarks become per-(peer, engine) (HashMap<(PeerId, EngineId), u64>).

That's not catastrophic but it complicates the transport layer for no correctness gain. Engines don't actually need to own the clock to use it; an Arc<LamportClock> in the context lets them stamp ops without forfeiting node-level monotonicity or the per-peer watermark shape.

So: engines share a single per-node LamportClock via Arc in NamespaceEngineContext. The trait stays strategy-agnostic; the watermark model stays simple.

Difference From CrdtValue

CrdtValue is a good idea when the entire CRDT can be represented as one mergeable value:

trait CrdtValue {
    fn merge(&mut self, other: Self);
}

That is too small for the current crdt_kv problem. The hard parts are not only "how do two values merge?" The hard parts are:

Concern Why value-level merge is insufficient
Removes tombstones partition history, not just values
Operation logs raw local writes and compacted snapshots may have different shapes
Same-op-id collisions compacted payloads can carry more information than raw payloads
Compaction needs all operations for a key, not just two values
Public API shape local subscribers and remote readers must agree on what bytes mean
Metadata rate-limit tombstone frontier should not be squeezed into LWW metadata

CrdtValue can still exist inside an engine. For example, RateLimitShard can have a typed merge method. But the namespace boundary should be an engine, because the namespace needs to own operations, tombstones, compaction, and API shape.

Difference From CrdtStrategy

CrdtStrategy can mean two different things.

If it is a stateless helper like this:

trait CrdtStrategy {
    type Metadata;

    fn apply_insert(...);
    fn apply_remove(...);
    fn compact_ops(...);
    fn merge_log(...);
}

then it is better than a MergeStrategy enum, but it can still leave the state split across CrdtOrMap, ValueMetadata, KvStore, and OperationLog. That would reduce match strategy branches but keep the same underlying seam.

If it is stateful and owns the store/log/metadata, then it is effectively the same design as NamespaceCrdtEngine. In that case the naming difference is:

Name Emphasis
CrdtStrategy conflict-resolution policy
NamespaceCrdtEngine complete namespace state machine

I prefer NamespaceCrdtEngine because the object is doing more than choosing winners. It applies local writes, applies remote batches, stores live state, records tombstones, compacts logs, and defines the public value contract.

Difference From LwwStore

LwwStore is not an alternative to NamespaceCrdtEngine; it is one implementation of it.

struct LwwEngine {
    values: DashMap<String, LwwEntry>,
    tombstones: DashMap<String, LwwTombstone>,
    log: RwLock<LwwOperationLog>,
    clock: LamportClock,
}

impl NamespaceCrdtEngine for LwwEngine { ... }

Likewise:

struct RateLimitEngine {
    shards: DashMap<String, RateLimitShard>,
    tombstones: DashMap<String, RateLimitVersion>,
    log: RwLock<RateLimitOperationLog>,
    clock: LamportClock,
}

impl NamespaceCrdtEngine for RateLimitEngine { ... }

The important point is that RateLimitEngine does not have to pretend its state is LWW-shaped. It can keep a shard frontier directly instead of encoding that frontier into generic ValueMetadata plus normalized bytes.

Associated Types: Op, Stored, and Input

A more typed version could be:

trait TypedNamespaceCrdtEngine {
    type Input;
    type Stored;
    type Op;
    type Metadata;

    fn put_local(&self, key: &str, input: Self::Input) -> ApplyResult;
    fn apply_op(&self, op: Self::Op) -> ApplyResult;
    fn get_stored(&self, key: &str) -> Option<Self::Stored>;
    fn compact_ops(&self, key: &str, ops: &[Self::Op]) -> Option<Self::Op>;
}

The associated types mean:

Type Meaning LWW Rate-limit
Input caller's local write shape Vec<u8> raw EpochCount update
Stored canonical live state shape Vec<u8> RateLimitShard or decoded EpochCount policy
Op replicated operation shape insert/remove update/remove/snapshot shard
Metadata internal ordering/delete state version/tombstone tombstone frontier/live frontier

This distinction matters because the current PR has at least two byte shapes for rl::

Shape Meaning
raw 16-byte epoch/count local write input
bincode RateLimitShard stored and compacted CRDT state

When these shapes are both represented as Vec<u8> in a generic map, the API has to compensate with permissive decoders and strategy branches. A typed engine can make the distinction explicit.

The public MeshKV API may still remain byte-oriented. The engine boundary can be typed internally even if the outer transport serializes everything into a common envelope.

Wire Format

The first migration should keep today's Operation wire shape:

pub enum Operation {
    Insert {
        key: String,
        value: Vec<u8>,
        timestamp: u64,
        replica_id: ReplicaId,
    },
    Remove {
        key: String,
        timestamp: u64,
        replica_id: ReplicaId,
    },
}

The batch or envelope layer should add namespace routing outside the operation:

struct CrdtBatch {
    namespace: String,
    operations: Vec<Operation>,
}

That keeps the generic transport simple. It also avoids inventing a polymorphic wire protocol while the code is still being decomposed.

Internally, each engine can still use a typed log:

struct LwwEngine {
    log: RwLock<LwwOperationLog>,
}

struct RateLimitEngine {
    log: RwLock<RateLimitOperationLog>,
}

The engine converts from the common wire operation into its internal operation or state representation at the boundary. If a future CRDT cannot fit into Insert / Remove, that is the point to introduce an engine-tagged wire op.

Remote Apply Footgun

The engine API must make one rule hard to violate:

Remote application must merge incoming operations into the engine log, compact or canonicalize as needed, and apply only the operations or state that survive that canonicalization.

This is the bug class behind replaying a pre-compaction unseen_operations list. If an incoming remove dominates an older insert, the live store must not replay the dominated insert just because it appeared in the incoming batch. The same principle applies to same-op-id compacted snapshots: the canonical merged log entry may carry more information than the raw entry.

A good engine API should therefore avoid returning "unseen incoming ops" as the thing to replay. Prefer one of these shapes:

fn apply_remote_batch(&self, operations: &[Operation]) -> Vec<CrdtNotification>;

or:

fn merge_remote_log(&self, operations: &[Operation]) -> CanonicalDelta;
fn apply_canonical_delta(&self, delta: CanonicalDelta) -> Vec<CrdtNotification>;

The important point is that canonicalization happens before observable state is updated.

Why This Is Cleaner For EpochMaxWins

EpochMaxWins in this codebase is not just a "max value wins" comparator. It is rate-limit shard state with these rules:

  1. A tombstone is a delete boundary by (timestamp, replica_id).
  2. Inserts before the newest tombstone cannot resurrect the key.
  3. Inserts after the tombstone recreate the key.
  4. Among live post-tombstone points, epoch wins first, count wins second.
  5. Compacted snapshots must preserve the tombstone boundary.

Those rules affect local writes, remote replay, metadata, log compaction, snapshotting, and public decoding. That is exactly why they should live in one engine.

Migration Path

A low-risk migration can be staged:

  1. Keep the wire Operation format and current tests.
  2. Extract an internal LwwEngine behind the existing CrdtOrMap API as a complete dry run of the trait surface.
  3. Extract an internal RateLimitEngine for the rl: prefix.
  4. Move rate-limit metadata and normalized shard storage out of generic ValueMetadata.
  5. Replace merge_strategies with namespace registration: prefix -> Arc<dyn NamespaceCrdtEngine>.
  6. Make OperationLog::merge non-public or engine-owned, so there is no default-to-LWW path for non-LWW data.
  7. Align get() and subscriber notification value shapes per engine.

The key is not to rewrite transport first. Keep gossip boring. Move CRDT semantics to the state machine that owns them.

Step 2 matters. The LWW path is simpler, but it still exercises local writes, remote merge, tombstones, compaction, notifications, clocks, per-key locks, GC, and operation export. If the trait surface cannot carry LWW cleanly, it is not ready for RateLimitEngine.

config: should be registered explicitly by MeshKV::new() as a built-in LWW namespace engine. That preserves today's auto-registration behavior without keeping a hidden global default strategy inside CrdtOrMap.

CRDT state is purely in-memory and synchronously mutated today, so there is no drain or shutdown step. The first engine refactor keeps that model. If CRDT state is later persisted to disk or background compaction becomes asynchronous, the trait can grow lifecycle hooks then.

Recommendation

Use NamespaceCrdtEngine as the architectural boundary.

Use LwwEngine and RateLimitEngine as concrete implementations.

Use CrdtValue only inside an engine when a value-level merge is genuinely the right abstraction.

Use CrdtStrategy only if it owns state. If it is just a stateless policy object plugged into the current CrdtOrMap, it does not solve the main design problem.

Tracking

Metadata

Metadata

Labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions