From 7ebd7ee435aa441aa5ecf4f63d5c045dc94c25f0 Mon Sep 17 00:00:00 2001 From: Carl Sverre <82591+carlsverre@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:33:08 -0700 Subject: [PATCH] Fix stale remote delta overwriting executor state When ReplicatedShardActor applied a remote delta, it first merged the delta into replica_state and then updated the executor from the incoming delta payload. That meant a stale delta could lose the merge in replica_state but still overwrite the executor, so subsequent reads returned the wrong value. Fix this by reloading the merged value from replica_state after apply_remote_delta and projecting that merged value into the executor. This preserves the existing CRDT and LWW merge semantics, keeps read behavior aligned with replicated state, and is safe because it only changes which already-computed value is mirrored into the executor; it does not change message ordering, conflict resolution, or local write generation. --- src/production/replicated_shard_actor.rs | 62 ++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/src/production/replicated_shard_actor.rs b/src/production/replicated_shard_actor.rs index 83f28d1..4bb3d06 100644 --- a/src/production/replicated_shard_actor.rs +++ b/src/production/replicated_shard_actor.rs @@ -451,7 +451,11 @@ impl ReplicatedShardActor { self.replica_state.apply_remote_delta(delta.clone()); - if delta.value.is_hash() { + let Some(merged_value) = self.replica_state.replicated_keys.get(&delta.key).cloned() else { + return; + }; + + if merged_value.is_hash() { // TigerStyle: Postcondition - replica_state should have the hash #[cfg(debug_assertions)] { @@ -462,7 +466,7 @@ impl ReplicatedShardActor { } // Apply hash delta - if let Some(hash) = delta.value.get_hash() { + if let Some(hash) = merged_value.get_hash() { let pairs: Vec<(crate::redis::SDS, crate::redis::SDS)> = hash .iter() .filter_map(|(field, lww)| { @@ -509,8 +513,8 @@ impl ReplicatedShardActor { } } } - } else if let Some(value) = delta.value.get() { - if let Some(expiry_ms) = delta.value.expiry_ms { + } else if let Some(value) = merged_value.get() { + if let Some(expiry_ms) = merged_value.expiry_ms { let seconds = (expiry_ms / 1000) as i64; let cmd = Command::setex(delta.key.clone(), seconds, value.clone()); self.executor.execute(&cmd); @@ -518,7 +522,7 @@ impl ReplicatedShardActor { let cmd = Command::set(delta.key.clone(), value.clone()); self.executor.execute(&cmd); } - } else if delta.value.is_tombstone() { + } else if merged_value.is_tombstone() { let cmd = Command::del(delta.key.clone()); self.executor.execute(&cmd); } @@ -627,6 +631,54 @@ mod tests { handle.shutdown().await; } + #[tokio::test] + async fn test_replicated_shard_actor_remote_stale_delta_does_not_overwrite_executor() { + let handle = ReplicatedShardActor::spawn(ReplicaId::new(1), ConsistencyLevel::Eventual, 0); + + let newer_replica = ReplicaId::new(2); + let older_replica = ReplicaId::new(1); + + let mut newer_clock = crate::replication::lattice::LamportClock::new(newer_replica); + newer_clock.time = 10; + let newer_delta = ReplicationDelta::new( + "conflict_key".to_string(), + crate::replication::state::ReplicatedValue::with_value( + crate::redis::SDS::from_str("newer"), + newer_clock, + ), + newer_replica, + ); + + let mut older_clock = crate::replication::lattice::LamportClock::new(older_replica); + older_clock.time = 9; + let older_delta = ReplicationDelta::new( + "conflict_key".to_string(), + crate::replication::state::ReplicatedValue::with_value( + crate::redis::SDS::from_str("older"), + older_clock, + ), + older_replica, + ); + + handle.apply_remote_delta(newer_delta); + handle.apply_remote_delta(older_delta); + + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + + let result = handle + .execute_readonly(Command::Get("conflict_key".to_string())) + .await; + + match result { + RespValue::BulkString(Some(bytes)) => { + assert_eq!(bytes, b"newer".to_vec()); + } + other => panic!("expected bulk string, got {:?}", other), + } + + handle.shutdown().await; + } + #[tokio::test] async fn test_replicated_shard_actor_snapshot() { let handle = ReplicatedShardActor::spawn(ReplicaId::new(1), ConsistencyLevel::Eventual, 0);