Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions src/production/replicated_shard_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,11 @@ impl ReplicatedShardActor {

self.replica_state.apply_remote_delta(delta.clone());

if delta.value.is_hash() {
let Some(merged_value) = self.replica_state.replicated_keys.get(&delta.key).cloned() else {
return;
};

if merged_value.is_hash() {
// TigerStyle: Postcondition - replica_state should have the hash
#[cfg(debug_assertions)]
{
Expand All @@ -462,7 +466,7 @@ impl ReplicatedShardActor {
}

// Apply hash delta
if let Some(hash) = delta.value.get_hash() {
if let Some(hash) = merged_value.get_hash() {
let pairs: Vec<(crate::redis::SDS, crate::redis::SDS)> = hash
.iter()
.filter_map(|(field, lww)| {
Expand Down Expand Up @@ -509,16 +513,16 @@ impl ReplicatedShardActor {
}
}
}
} else if let Some(value) = delta.value.get() {
if let Some(expiry_ms) = delta.value.expiry_ms {
} else if let Some(value) = merged_value.get() {
if let Some(expiry_ms) = merged_value.expiry_ms {
let seconds = (expiry_ms / 1000) as i64;
let cmd = Command::setex(delta.key.clone(), seconds, value.clone());
self.executor.execute(&cmd);
} else {
let cmd = Command::set(delta.key.clone(), value.clone());
self.executor.execute(&cmd);
}
} else if delta.value.is_tombstone() {
} else if merged_value.is_tombstone() {
let cmd = Command::del(delta.key.clone());
self.executor.execute(&cmd);
}
Expand Down Expand Up @@ -627,6 +631,54 @@ mod tests {
handle.shutdown().await;
}

#[tokio::test]
async fn test_replicated_shard_actor_remote_stale_delta_does_not_overwrite_executor() {
let handle = ReplicatedShardActor::spawn(ReplicaId::new(1), ConsistencyLevel::Eventual, 0);

let newer_replica = ReplicaId::new(2);
let older_replica = ReplicaId::new(1);

let mut newer_clock = crate::replication::lattice::LamportClock::new(newer_replica);
newer_clock.time = 10;
let newer_delta = ReplicationDelta::new(
"conflict_key".to_string(),
crate::replication::state::ReplicatedValue::with_value(
crate::redis::SDS::from_str("newer"),
newer_clock,
),
newer_replica,
);

let mut older_clock = crate::replication::lattice::LamportClock::new(older_replica);
older_clock.time = 9;
let older_delta = ReplicationDelta::new(
"conflict_key".to_string(),
crate::replication::state::ReplicatedValue::with_value(
crate::redis::SDS::from_str("older"),
older_clock,
),
older_replica,
);

handle.apply_remote_delta(newer_delta);
handle.apply_remote_delta(older_delta);

tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;

let result = handle
.execute_readonly(Command::Get("conflict_key".to_string()))
.await;

match result {
RespValue::BulkString(Some(bytes)) => {
assert_eq!(bytes, b"newer".to_vec());
}
other => panic!("expected bulk string, got {:?}", other),
}

handle.shutdown().await;
}

#[tokio::test]
async fn test_replicated_shard_actor_snapshot() {
let handle = ReplicatedShardActor::spawn(ReplicaId::new(1), ConsistencyLevel::Eventual, 0);
Expand Down