diff --git a/src/production/replicated_shard_actor.rs b/src/production/replicated_shard_actor.rs index 83f28d1..4bb3d06 100644 --- a/src/production/replicated_shard_actor.rs +++ b/src/production/replicated_shard_actor.rs @@ -451,7 +451,11 @@ impl ReplicatedShardActor { self.replica_state.apply_remote_delta(delta.clone()); - if delta.value.is_hash() { + let Some(merged_value) = self.replica_state.replicated_keys.get(&delta.key).cloned() else { + return; + }; + + if merged_value.is_hash() { // TigerStyle: Postcondition - replica_state should have the hash #[cfg(debug_assertions)] { @@ -462,7 +466,7 @@ impl ReplicatedShardActor { } // Apply hash delta - if let Some(hash) = delta.value.get_hash() { + if let Some(hash) = merged_value.get_hash() { let pairs: Vec<(crate::redis::SDS, crate::redis::SDS)> = hash .iter() .filter_map(|(field, lww)| { @@ -509,8 +513,8 @@ impl ReplicatedShardActor { } } } - } else if let Some(value) = delta.value.get() { - if let Some(expiry_ms) = delta.value.expiry_ms { + } else if let Some(value) = merged_value.get() { + if let Some(expiry_ms) = merged_value.expiry_ms { let seconds = (expiry_ms / 1000) as i64; let cmd = Command::setex(delta.key.clone(), seconds, value.clone()); self.executor.execute(&cmd); @@ -518,7 +522,7 @@ impl ReplicatedShardActor { let cmd = Command::set(delta.key.clone(), value.clone()); self.executor.execute(&cmd); } - } else if delta.value.is_tombstone() { + } else if merged_value.is_tombstone() { let cmd = Command::del(delta.key.clone()); self.executor.execute(&cmd); } @@ -627,6 +631,54 @@ mod tests { handle.shutdown().await; } + #[tokio::test] + async fn test_replicated_shard_actor_remote_stale_delta_does_not_overwrite_executor() { + let handle = ReplicatedShardActor::spawn(ReplicaId::new(1), ConsistencyLevel::Eventual, 0); + + let newer_replica = ReplicaId::new(2); + let older_replica = ReplicaId::new(1); + + let mut newer_clock = crate::replication::lattice::LamportClock::new(newer_replica); + newer_clock.time = 10; + let newer_delta = ReplicationDelta::new( + "conflict_key".to_string(), + crate::replication::state::ReplicatedValue::with_value( + crate::redis::SDS::from_str("newer"), + newer_clock, + ), + newer_replica, + ); + + let mut older_clock = crate::replication::lattice::LamportClock::new(older_replica); + older_clock.time = 9; + let older_delta = ReplicationDelta::new( + "conflict_key".to_string(), + crate::replication::state::ReplicatedValue::with_value( + crate::redis::SDS::from_str("older"), + older_clock, + ), + older_replica, + ); + + handle.apply_remote_delta(newer_delta); + handle.apply_remote_delta(older_delta); + + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + + let result = handle + .execute_readonly(Command::Get("conflict_key".to_string())) + .await; + + match result { + RespValue::BulkString(Some(bytes)) => { + assert_eq!(bytes, b"newer".to_vec()); + } + other => panic!("expected bulk string, got {:?}", other), + } + + handle.shutdown().await; + } + #[tokio::test] async fn test_replicated_shard_actor_snapshot() { let handle = ReplicatedShardActor::spawn(ReplicaId::new(1), ConsistencyLevel::Eventual, 0);