From cb043b3e3b7866a3d8602a5b94763bc323b0c4c3 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 25 Feb 2026 01:01:50 -0800 Subject: [PATCH 01/14] initial --- .../apache/cassandra/tcm/sequences/Move.java | 4 + .../test/accord/AccordRegainRangesTest.java | 114 ++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index a26d6ded4500..d755932c4885 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -53,6 +54,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.AccordTopology; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamResultFuture; @@ -203,6 +205,8 @@ public SequenceState executeNext() try { ClusterMetadata metadata = ClusterMetadata.current(); + List epochsWeAreRegainingRangesFor = AccordService.instance().topology().epochsWeAreRegainingRangesFor(AccordService.instance().topology().current(), AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); + AccordService.instance().topology().blockUntilAllEpochsRetired(epochsWeAreRegainingRangesFor); logger.info("Moving {} from {} to {}.", metadata.directory.endpoint(startMove.nodeId()), metadata.tokenMap.tokens(startMove.nodeId()), diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java new file mode 100644 index 000000000000..00aa1bc5ed0b --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import accord.local.CommandStore; +import accord.local.PreLoadContext; +import accord.primitives.AbstractRanges; +import accord.primitives.Ranges; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.service.accord.AccordService.getBlocking; +import static com.google.common.collect.Iterables.getOnlyElement; +import org.junit.BeforeClass; +import org.junit.Test; + +public class AccordRegainRangesTest extends AccordTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(AccordRegainRangesTest.class); + + @Override + protected Logger logger() + { + return logger; + } + + @BeforeClass + public static void setupClass() throws IOException + { + AccordTestBase.setupCluster(builder -> builder + .withoutVNodes() + .withConfig(config -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)), 6); + } + + @Test + public void regainRangesTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assert(token < Long.parseLong(originalToken)); + + long epoch = cluster.get(2).callOnInstance(() -> { + long priorEpoch = AccordService.instance().topology().epoch(); + StorageService.instance.move(Long.toString(token - 1000)); + return priorEpoch; + }); + + cluster.coordinator(3).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 5); + + + cluster.get(2).runOnInstance(() -> { + StorageService.instance.move(originalToken); + }); + + // Ensure no overlapping safeToRead ranges + cluster.get(2).runOnInstance(() -> { + assert (AccordService.instance().topology().active().minEpoch() > epoch); + Ranges range = Ranges.EMPTY; + for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) { + Ranges safeToReadRanges = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "No overlapping safeToReadRanges", safeCommandStore -> { + Ranges mergedRanges = Ranges.EMPTY; + for (Ranges r : safeCommandStore.safeToReadAt().values()) + mergedRanges = mergedRanges.union(AbstractRanges.UnionMode.MERGE_ADJACENT, r); + return mergedRanges; + })); + + assert(range.overlapping(safeToReadRanges).isEmpty()); + range = range.union(AbstractRanges.UnionMode.MERGE_ADJACENT, safeToReadRanges); + } + }); + }); + } + +} + From 6eddccdac75e359a4f4be0ba6fac28f79a6791c6 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 17:19:05 -0800 Subject: [PATCH 02/14] use listener and only wait for ranges that are being regained rather than epoch --- .../apache/cassandra/tcm/sequences/Move.java | 57 ++++++++++++++++++- .../test/accord/AccordRegainRangesTest.java | 19 ++++++- 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index d755932c4885..f754b355c503 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -28,12 +28,22 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.TopologyListener; +import accord.primitives.Ranges; import accord.topology.EpochReady; +import accord.topology.Topology; +import accord.topology.TopologyException; +import accord.topology.TopologyManager; +import accord.topology.TopologyNotReadyException; +import accord.topology.TopologyRetiredException; +import accord.utils.Invariants; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; @@ -75,6 +85,7 @@ import org.apache.cassandra.tcm.transformations.PrepareMove; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.concurrent.Condition; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; import org.apache.cassandra.utils.vint.VIntCoding; @@ -86,6 +97,7 @@ import static org.apache.cassandra.tcm.Transformation.Kind.START_MOVE; import static org.apache.cassandra.tcm.sequences.SequenceState.continuable; import static org.apache.cassandra.tcm.sequences.SequenceState.error; +import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; public class Move extends MultiStepOperation { @@ -205,8 +217,49 @@ public SequenceState executeNext() try { ClusterMetadata metadata = ClusterMetadata.current(); - List epochsWeAreRegainingRangesFor = AccordService.instance().topology().epochsWeAreRegainingRangesFor(AccordService.instance().topology().current(), AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); - AccordService.instance().topology().blockUntilAllEpochsRetired(epochsWeAreRegainingRangesFor); + TopologyManager.RegainingEpochRange regainingEpochRange = AccordService.instance().topology().epochAndRangeToBeRetired(AccordService.instance().topology().current(), AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); + if (regainingEpochRange != null) + { + Condition condition = newOneTimeCondition(); + + class waitForEpochAndRangeRetirement implements TopologyListener + { + final Condition condition; + final long waitingForEpoch; + final Ranges waitingForRange; + + public waitForEpochAndRangeRetirement(Condition condition, long waitingForEpoch, Ranges waitingForRange) + { + this.condition = condition; + this.waitingForEpoch = waitingForEpoch; + this.waitingForRange = waitingForRange; + } + + @Override + public void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology) + { + try + { + if (AccordService.instance().topology().active().get(waitingForEpoch).retired().containsAll(waitingForRange)) + condition.signal(); + } + catch (TopologyRetiredException e) + { + condition.signal(); + } + catch (TopologyException e) + { + logger.info("Topology exception: ", e); + } + } + } + + waitForEpochAndRangeRetirement wait = new waitForEpochAndRangeRetirement(condition, regainingEpochRange.epoch(), regainingEpochRange.range()); + AccordService.instance().topology().addListener(wait); + condition.awaitThrowUncheckedOnInterrupt(); + AccordService.instance().topology().removeListener(wait); + } + logger.info("Moving {} from {} to {}.", metadata.directory.endpoint(startMove.nodeId()), metadata.tokenMap.tokens(startMove.nodeId()), diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java index 00aa1bc5ed0b..823cdf35d3c1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java @@ -22,19 +22,27 @@ import java.util.Arrays; import java.util.List; +import accord.api.RoutingKey; import accord.local.CommandStore; import accord.local.PreLoadContext; import accord.primitives.AbstractRanges; import accord.primitives.Ranges; +import accord.topology.TopologyException; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.TokenKey; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static accord.primitives.Range.range; import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static com.google.common.collect.Iterables.getOnlyElement; import org.junit.BeforeClass; @@ -86,14 +94,21 @@ public void regainRangesTest() throws Throwable cluster.coordinator(3).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 5); - cluster.get(2).runOnInstance(() -> { StorageService.instance.move(originalToken); }); // Ensure no overlapping safeToRead ranges cluster.get(2).runOnInstance(() -> { - assert (AccordService.instance().topology().active().minEpoch() > epoch); + RoutingKey start = TokenKey.parse(TableId.fromString("tid:11"), String.valueOf(token), Murmur3Partitioner.instance); + RoutingKey end = TokenKey.parse(TableId.fromString("tid:11"), originalToken, Murmur3Partitioner.instance); + Ranges regainedRange = Ranges.of(TokenRange.create(start, end)); + try + { + assert (AccordService.instance().topology().active().get(epoch).retired().containsAll(regainedRange)); + } catch (TopologyException e) { + assert(false); + } Ranges range = Ranges.EMPTY; for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) { Ranges safeToReadRanges = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "No overlapping safeToReadRanges", safeCommandStore -> { From 398fffbd00366e61fe9da45caf1b4c6747f6e27f Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 17:20:39 -0800 Subject: [PATCH 03/14] additional check to avoid locking --- src/java/org/apache/cassandra/tcm/sequences/Move.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index f754b355c503..c60451e9ade2 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -240,7 +240,7 @@ public void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topolog { try { - if (AccordService.instance().topology().active().get(waitingForEpoch).retired().containsAll(waitingForRange)) + if (epoch >= waitingForEpoch && AccordService.instance().topology().active().get(waitingForEpoch).retired().containsAll(waitingForRange)) condition.signal(); } catch (TopologyRetiredException e) From bd8209fe35f796215504ec37fb5f2aeb35a25c4b Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Fri, 27 Feb 2026 00:53:00 -0800 Subject: [PATCH 04/14] added additional test + removed extra imports --- .gitmodules | 4 +- .../apache/cassandra/tcm/sequences/Move.java | 3 - .../test/accord/AccordRegainRangesTest.java | 60 ++++++++++++++++++- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..8b3913ed184d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/alanwang67/cassandra-accord.git + branch = CASSANDRA-21183 diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index c60451e9ade2..a1b6f48df73d 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -41,9 +40,7 @@ import accord.topology.Topology; import accord.topology.TopologyException; import accord.topology.TopologyManager; -import accord.topology.TopologyNotReadyException; import accord.topology.TopologyRetiredException; -import accord.utils.Invariants; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java index 823cdf35d3c1..ca5359d8b90a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java @@ -24,10 +24,13 @@ import accord.api.RoutingKey; import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.PreLoadContext; +import accord.local.SafeCommandStore; import accord.primitives.AbstractRanges; import accord.primitives.Ranges; import accord.topology.TopologyException; +import accord.utils.LargeBitSet; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.api.ConsistencyLevel; @@ -42,9 +45,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static accord.primitives.Range.range; +import static accord.local.CommandStores.checkQueryDisjointRangesAcrossCommandStores; import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static com.google.common.collect.Iterables.getOnlyElement; +import static org.junit.Assert.assertFalse; + import org.junit.BeforeClass; import org.junit.Test; @@ -125,5 +130,58 @@ public void regainRangesTest() throws Throwable }); } + @Test + public void querySameRangeOnDifferentCommandStoresTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assert(token < Long.parseLong(originalToken)); + + long epoch = cluster.get(2).callOnInstance(() -> { + long priorEpoch = AccordService.instance().topology().epoch(); + StorageService.instance.move(Long.toString(token - 1000)); + return priorEpoch; + }); + + cluster.coordinator(3).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 5); + + cluster.get(2).runOnInstance(() -> { + /* This call results in other nodes invoking this */ + StorageService.instance.move(originalToken); + }); + + cluster.get(2).runOnInstance(() -> { + RoutingKey start = TokenKey.parse(TableId.fromString("tid:11"), String.valueOf(token), Murmur3Partitioner.instance); + RoutingKey end = TokenKey.parse(TableId.fromString("tid:11"), originalToken, Murmur3Partitioner.instance); + Ranges regainedRange = Ranges.of(TokenRange.create(start, end)); + + int numberOfShards = AccordService.instance().node().commandStores().all().length; + LargeBitSet bitSet = new LargeBitSet(numberOfShards); + for (int i = 0; i < numberOfShards; i++) + bitSet.set(i); + + CommandStores.ShardHolder[] shardHolders = new CommandStores.ShardHolder[numberOfShards]; + int i = 0; + for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) + { + CommandStores.RangesForEpoch rangesForEpoch = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get rangesForEpoch", SafeCommandStore::ranges)); + shardHolders[i] = new CommandStores.ShardHolder(commandStore, rangesForEpoch); + i += 1; + } + + assertFalse(checkQueryDisjointRangesAcrossCommandStores(AccordService.instance().node().commandStores().overlappingCommandStores(), shardHolders, bitSet, regainedRange)); + }); + + }); + } } From a1edf9f3bba2819ed080f3fb5c7ca4b717b4ae24 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 16:47:40 -0800 Subject: [PATCH 05/14] test updates --- .../test/accord/AccordRegainRangesTest.java | 79 +++++-------------- 1 file changed, 20 insertions(+), 59 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java index ca5359d8b90a..91a44cba1c9d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import accord.api.RoutingKey; import accord.local.CommandStore; @@ -29,18 +30,24 @@ import accord.local.SafeCommandStore; import accord.primitives.AbstractRanges; import accord.primitives.Ranges; +import accord.topology.Shard; +import accord.topology.Topology; import accord.topology.TopologyException; +import accord.topology.TopologyManager; +import accord.topology.TopologyRetiredException; import accord.utils.LargeBitSet; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.TokenRange; import org.apache.cassandra.service.accord.api.TokenKey; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +90,7 @@ public void regainRangesTest() throws Throwable "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); test(ddls, cluster -> { cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); @@ -97,23 +105,30 @@ public void regainRangesTest() throws Throwable return priorEpoch; }); - cluster.coordinator(3).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 5); - cluster.get(2).runOnInstance(() -> { StorageService.instance.move(originalToken); }); + String tableName = accordTableName; + // Ensure no overlapping safeToRead ranges cluster.get(2).runOnInstance(() -> { - RoutingKey start = TokenKey.parse(TableId.fromString("tid:11"), String.valueOf(token), Murmur3Partitioner.instance); - RoutingKey end = TokenKey.parse(TableId.fromString("tid:11"), originalToken, Murmur3Partitioner.instance); + TableId tid = Schema.instance.getTableMetadata(KEYSPACE, tableName).id(); + RoutingKey start = TokenKey.parse(tid, String.valueOf(token), Murmur3Partitioner.instance); + RoutingKey end = TokenKey.parse(tid, originalToken, Murmur3Partitioner.instance); Ranges regainedRange = Ranges.of(TokenRange.create(start, end)); try { assert (AccordService.instance().topology().active().get(epoch).retired().containsAll(regainedRange)); - } catch (TopologyException e) { + } + catch (TopologyRetiredException ignored) + { + } + catch (TopologyException e) + { assert(false); } + Ranges range = Ranges.EMPTY; for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) { Ranges safeToReadRanges = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "No overlapping safeToReadRanges", safeCommandStore -> { @@ -129,59 +144,5 @@ public void regainRangesTest() throws Throwable }); }); } - - @Test - public void querySameRangeOnDifferentCommandStoresTest() throws Throwable - { - List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', - "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}", - "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); - test(ddls, cluster -> { - cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); - SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); - - String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); - - long token = (Long) result.toObjectArrays()[0][0]; - - assert(token < Long.parseLong(originalToken)); - - long epoch = cluster.get(2).callOnInstance(() -> { - long priorEpoch = AccordService.instance().topology().epoch(); - StorageService.instance.move(Long.toString(token - 1000)); - return priorEpoch; - }); - - cluster.coordinator(3).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 5); - - cluster.get(2).runOnInstance(() -> { - /* This call results in other nodes invoking this */ - StorageService.instance.move(originalToken); - }); - - cluster.get(2).runOnInstance(() -> { - RoutingKey start = TokenKey.parse(TableId.fromString("tid:11"), String.valueOf(token), Murmur3Partitioner.instance); - RoutingKey end = TokenKey.parse(TableId.fromString("tid:11"), originalToken, Murmur3Partitioner.instance); - Ranges regainedRange = Ranges.of(TokenRange.create(start, end)); - - int numberOfShards = AccordService.instance().node().commandStores().all().length; - LargeBitSet bitSet = new LargeBitSet(numberOfShards); - for (int i = 0; i < numberOfShards; i++) - bitSet.set(i); - - CommandStores.ShardHolder[] shardHolders = new CommandStores.ShardHolder[numberOfShards]; - int i = 0; - for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) - { - CommandStores.RangesForEpoch rangesForEpoch = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get rangesForEpoch", SafeCommandStore::ranges)); - shardHolders[i] = new CommandStores.ShardHolder(commandStore, rangesForEpoch); - i += 1; - } - - assertFalse(checkQueryDisjointRangesAcrossCommandStores(AccordService.instance().node().commandStores().overlappingCommandStores(), shardHolders, bitSet, regainedRange)); - }); - - }); - } } From 30e1ba1fb7f7f545e54da569cb6398303889d40c Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 16:48:48 -0800 Subject: [PATCH 06/14] fix git modules --- .gitmodules | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitmodules b/.gitmodules index 8b3913ed184d..616dacf610a7 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/alanwang67/cassandra-accord.git - branch = CASSANDRA-21183 + url = https://github.com/apache/cassandra-accord.git + branch = trunk From 93c82f654d46e0dbbf4c6b04c3c3279867ed6512 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 11 Mar 2026 15:48:35 -0700 Subject: [PATCH 07/14] test --- .../accord/AccordDeleteCommandStoreTest.java | 100 ++++++++++++++++++ .../test/accord/AccordRegainRangesTest.java | 10 -- 2 files changed, 100 insertions(+), 10 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java new file mode 100644 index 000000000000..2b2664913c3e --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import accord.local.CommandStore; +import accord.local.PreLoadContext; +import accord.primitives.Ranges; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.service.accord.AccordService.getBlocking; +import static com.google.common.collect.Iterables.getOnlyElement; + +import org.junit.BeforeClass; +import org.junit.Test; + +public class AccordDeleteCommandStoreTest extends AccordTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(AccordRegainRangesTest.class); + + @Override + protected Logger logger() + { + return logger; + } + + @BeforeClass + public static void setupClass() throws IOException + { + AccordTestBase.setupCluster(builder -> builder + .withoutVNodes() + .withConfig(config -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)), 6); + } + + @Test + public void deleteCommandStoresTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + String newToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + cluster.get(2).runOnInstance(() -> { + StorageService.instance.move(Long.toString(Long.parseLong(newToken) + 100)); + }); + + cluster.get(2).runOnInstance(() -> { + Set commandStoresThatWillBeRemoved = new HashSet<>(); + + for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) + { + Ranges ranges = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get rangesForEpoch", safeCommandStore -> safeCommandStore.ranges().currentRanges())); + + if (ranges.isEmpty()) + commandStoresThatWillBeRemoved.add(commandStore.id()); + } + + StorageService.instance.move(originalToken); + + for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) + { + assert(!commandStoresThatWillBeRemoved.contains(commandStore.id())); + } + }); + }); + } +} + diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java index 91a44cba1c9d..1bcfe18e2baa 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java @@ -21,21 +21,14 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Map; import accord.api.RoutingKey; import accord.local.CommandStore; -import accord.local.CommandStores; import accord.local.PreLoadContext; -import accord.local.SafeCommandStore; import accord.primitives.AbstractRanges; import accord.primitives.Ranges; -import accord.topology.Shard; -import accord.topology.Topology; import accord.topology.TopologyException; -import accord.topology.TopologyManager; import accord.topology.TopologyRetiredException; -import accord.utils.LargeBitSet; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.api.ConsistencyLevel; @@ -47,15 +40,12 @@ import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.TokenRange; import org.apache.cassandra.service.accord.api.TokenKey; -import org.apache.cassandra.tcm.ClusterMetadataService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static accord.local.CommandStores.checkQueryDisjointRangesAcrossCommandStores; import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static com.google.common.collect.Iterables.getOnlyElement; -import static org.junit.Assert.assertFalse; import org.junit.BeforeClass; import org.junit.Test; From 88881e9dd1d559184d99ad34771a69310477dd37 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 25 Mar 2026 12:08:51 -0700 Subject: [PATCH 08/14] address comments --- .../apache/cassandra/tcm/sequences/Move.java | 85 +++++++++++-------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index a1b6f48df73d..a97eee9dbbe2 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -35,12 +35,11 @@ import org.slf4j.LoggerFactory; import accord.api.TopologyListener; +import accord.primitives.AbstractRanges; import accord.primitives.Ranges; import accord.topology.EpochReady; import accord.topology.Topology; -import accord.topology.TopologyException; import accord.topology.TopologyManager; -import accord.topology.TopologyRetiredException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; @@ -211,50 +210,57 @@ public SequenceState executeNext() switch (next) { case START_MOVE: + class WaitForEpochAndRangeRetirement implements TopologyListener + { + final Condition condition; + final long waitingForEpoch; + final Ranges waitingForRange; + Ranges retiredRanges; + + public WaitForEpochAndRangeRetirement(Condition condition, long waitingForEpoch, Ranges waitingForRange) + { + this.condition = condition; + this.waitingForEpoch = waitingForEpoch; + this.waitingForRange = waitingForRange; + this.retiredRanges = Ranges.EMPTY; + } + + public synchronized void updateRetiredRanges(Ranges ranges) + { + retiredRanges = retiredRanges.union(AbstractRanges.UnionMode.MERGE_ADJACENT, ranges); + } + + public synchronized Ranges getRetiredRanges() + { + return retiredRanges; + } + + @Override + public void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology) + { + if (epoch >= waitingForEpoch) + updateRetiredRanges(ranges); + + if (getRetiredRanges().containsAll(waitingForRange)) + condition.signal(); + } + } + + WaitForEpochAndRangeRetirement wait = null; + try { ClusterMetadata metadata = ClusterMetadata.current(); TopologyManager.RegainingEpochRange regainingEpochRange = AccordService.instance().topology().epochAndRangeToBeRetired(AccordService.instance().topology().current(), AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); + if (regainingEpochRange != null) { Condition condition = newOneTimeCondition(); - - class waitForEpochAndRangeRetirement implements TopologyListener - { - final Condition condition; - final long waitingForEpoch; - final Ranges waitingForRange; - - public waitForEpochAndRangeRetirement(Condition condition, long waitingForEpoch, Ranges waitingForRange) - { - this.condition = condition; - this.waitingForEpoch = waitingForEpoch; - this.waitingForRange = waitingForRange; - } - - @Override - public void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology) - { - try - { - if (epoch >= waitingForEpoch && AccordService.instance().topology().active().get(waitingForEpoch).retired().containsAll(waitingForRange)) - condition.signal(); - } - catch (TopologyRetiredException e) - { - condition.signal(); - } - catch (TopologyException e) - { - logger.info("Topology exception: ", e); - } - } - } - - waitForEpochAndRangeRetirement wait = new waitForEpochAndRangeRetirement(condition, regainingEpochRange.epoch(), regainingEpochRange.range()); + wait = new WaitForEpochAndRangeRetirement(condition, regainingEpochRange.epoch(), regainingEpochRange.range()); AccordService.instance().topology().addListener(wait); + Ranges retiredRanges = AccordService.instance().topology().active().get(regainingEpochRange.epoch()).retired(); + wait.updateRetiredRanges(retiredRanges); condition.awaitThrowUncheckedOnInterrupt(); - AccordService.instance().topology().removeListener(wait); } logger.info("Moving {} from {} to {}.", @@ -268,6 +274,11 @@ public void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topolog JVMStabilityInspector.inspectThrowable(t); return continuable(); } + finally + { + if (wait != null) + AccordService.instance().topology().removeListener(wait); + } break; case MID_MOVE: try From a5f225213b59f45abb6498e34c5e6d36721953fb Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 25 Mar 2026 14:01:45 -0700 Subject: [PATCH 09/14] don't use assert --- .../test/accord/AccordDeleteCommandStoreTest.java | 3 ++- .../test/accord/AccordRegainRangesTest.java | 10 ++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java index 2b2664913c3e..49a2b5e3fbd5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java @@ -36,6 +36,7 @@ import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static com.google.common.collect.Iterables.getOnlyElement; +import static org.junit.Assert.assertFalse; import org.junit.BeforeClass; import org.junit.Test; @@ -91,7 +92,7 @@ public void deleteCommandStoresTest() throws Throwable for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) { - assert(!commandStoresThatWillBeRemoved.contains(commandStore.id())); + assertFalse(commandStoresThatWillBeRemoved.contains(commandStore.id())); } }); }); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java index 1bcfe18e2baa..3a5481101af0 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java @@ -46,6 +46,8 @@ import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static com.google.common.collect.Iterables.getOnlyElement; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.junit.BeforeClass; import org.junit.Test; @@ -87,7 +89,7 @@ public void regainRangesTest() throws Throwable long token = (Long) result.toObjectArrays()[0][0]; - assert(token < Long.parseLong(originalToken)); + assertTrue(token < Long.parseLong(originalToken)); long epoch = cluster.get(2).callOnInstance(() -> { long priorEpoch = AccordService.instance().topology().epoch(); @@ -109,14 +111,14 @@ public void regainRangesTest() throws Throwable Ranges regainedRange = Ranges.of(TokenRange.create(start, end)); try { - assert (AccordService.instance().topology().active().get(epoch).retired().containsAll(regainedRange)); + assertTrue(AccordService.instance().topology().active().get(epoch).retired().containsAll(regainedRange)); } catch (TopologyRetiredException ignored) { } catch (TopologyException e) { - assert(false); + fail(); } Ranges range = Ranges.EMPTY; @@ -128,7 +130,7 @@ public void regainRangesTest() throws Throwable return mergedRanges; })); - assert(range.overlapping(safeToReadRanges).isEmpty()); + assertTrue(range.overlapping(safeToReadRanges).isEmpty()); range = range.union(AbstractRanges.UnionMode.MERGE_ADJACENT, safeToReadRanges); } }); From 19f0a04aaa636573c23fb5efe600544af704e5c3 Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Fri, 17 Apr 2026 16:40:42 +0100 Subject: [PATCH 10/14] feedback/edits for regaining ranges --- .gitmodules | 4 +- modules/accord | 2 +- .../db/compaction/CompactionIterator.java | 4 +- .../service/accord/AccordJournal.java | 7 +- .../accord/journal/AccordTopologyUpdate.java | 47 +++++++-- .../apache/cassandra/tcm/sequences/Move.java | 95 ++++++++++--------- .../journal/AccordTopologyUpdateTest.java | 8 +- 7 files changed, 104 insertions(+), 63 deletions(-) diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..851765d73e32 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/belliottsmith/cassandra-accord.git + branch = regain-ranges diff --git a/modules/accord b/modules/accord index 93d78be37ef9..71a0d085b51b 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 93d78be37ef904118da2426716b5d77c0fb227db +Subproject commit 71a0d085b51b4b0e95f50ecc56b53291c3e6693e diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 7f97dc0663cc..54ab4ba35eb8 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -988,9 +988,9 @@ UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) t if (read.kind() == Repeat && !hasWritten) { Invariants.require(lastImage != null); - write = new TopologyImage(read.epoch(), Image, lastImage.getUpdate()); + write = new TopologyImage(read.epoch(), Image, lastImage.update()); } - else if (hasWritten && read.kind() == Repeat && lastImage.getUpdate().isEquivalent(read.getUpdate())) + else if (hasWritten && read.kind() == Repeat && lastImage.update().isEquivalent(read.update())) { write = read.asRepeat(); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 00f03be854b5..09717d911526 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -405,12 +405,13 @@ public TopologyUpdate next() logger.error("Encountered TopologyImage Repeat record for epoch {}, but no prior image record was found", ref.key().id.epoch()); return null; } - prev = reader.read().asImage(Invariants.nonNull(prev.getUpdate())); + prev = reader.read().asImage(Invariants.nonNull(prev.update())); } else prev = reader.read(); - return new TopologyUpdate(prev.getUpdate().commandStores, - prev.getUpdate().global); + return new TopologyUpdate(prev.update().commandStores, + prev.update().global, + prev.update().previouslyOwned); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java index efb92c2df15d..b1856987659a 100644 --- a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java +++ b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java @@ -26,6 +26,7 @@ import accord.api.Journal; import accord.local.CommandStores; +import accord.local.CommandStores.PreviouslyOwned; import accord.primitives.Ranges; import accord.topology.Topology; import accord.utils.Invariants; @@ -48,7 +49,7 @@ public interface AccordTopologyUpdate long epoch(); AccordTopologyUpdate asRepeat(); - Journal.TopologyUpdate getUpdate(); + Journal.TopologyUpdate update(); static AccordTopologyUpdate newTopology(Journal.TopologyUpdate update) { return new NewTopology(update); @@ -97,12 +98,20 @@ public long serializedSize(CommandStores.RangesForEpoch from) class TopologyUpdateSerializer implements UnversionedSerializer { + private static final int TOP_BIT = 0x40000000; public static final TopologyUpdateSerializer instance = new TopologyUpdateSerializer(); @Override public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IOException { - out.writeUnsignedVInt32(from.commandStores.size()); + out.writeUnsignedVInt32(from.commandStores.size() | TOP_BIT); + out.writeUnsignedVInt32(0); + out.writeUnsignedVInt32(from.previouslyOwned.size()); + for (int i = 0 ; i < from.previouslyOwned.size() ; ++i) + { + out.writeUnsignedVInt(from.previouslyOwned.epochs(i)); + KeySerializers.ranges.serialize(from.previouslyOwned.ranges(i), out); + } for (Map.Entry e : from.commandStores.entrySet()) { out.writeUnsignedVInt32(e.getKey()); @@ -115,6 +124,23 @@ public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IO public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException { int commandStoresSize = in.readUnsignedVInt32(); + int flags = 0; + PreviouslyOwned previouslyOwned = PreviouslyOwned.EMPTY; + if ((commandStoresSize & TOP_BIT) != 0) + { + commandStoresSize ^= TOP_BIT; + // future proofing + flags = in.readUnsignedVInt32(); + int previouslyOwnedSize = in.readUnsignedVInt32(); + long[] epochs = new long[previouslyOwnedSize]; + Ranges[] ranges = new Ranges[previouslyOwnedSize]; + for (int i = 0 ; i < previouslyOwnedSize ; ++i) + { + epochs[i] = in .readUnsignedVInt(); + ranges[i] = KeySerializers.ranges.deserialize(in); + } + previouslyOwned = new PreviouslyOwned(epochs.length == 0 ? 0 : epochs[0], epochs, ranges); + } Int2ObjectHashMap commandStores = new Int2ObjectHashMap<>(); for (int j = 0; j < commandStoresSize; j++) { @@ -123,13 +149,20 @@ public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException commandStores.put(commandStoreId, rangesForEpoch); } Topology global = TopologySerializers.compactTopology.deserialize(in); - return new Journal.TopologyUpdate(commandStores, global); + return new Journal.TopologyUpdate(commandStores, global, previouslyOwned); } @Override public long serializedSize(Journal.TopologyUpdate from) { - long size = TypeSizes.sizeofUnsignedVInt(from.commandStores.size()); + long size = TypeSizes.sizeofUnsignedVInt(from.commandStores.size() | TOP_BIT); + size += TypeSizes.sizeofUnsignedVInt(0); + size += TypeSizes.sizeofUnsignedVInt(from.previouslyOwned.size()); + for (int i = 0 ; i < from.previouslyOwned.size() ; ++i) + { + size += TypeSizes.sizeofUnsignedVInt(from.previouslyOwned.epochs(i)); + size += KeySerializers.ranges.serializedSize(from.previouslyOwned.ranges(i)); + } for (Map.Entry e : from.commandStores.entrySet()) { size += TypeSizes.sizeofUnsignedVInt(e.getKey()); @@ -289,7 +322,7 @@ public long epoch() } @Override - public Journal.TopologyUpdate getUpdate() + public Journal.TopologyUpdate update() { return update; } @@ -350,7 +383,7 @@ public long epoch() } @Override - public Journal.TopologyUpdate getUpdate() + public Journal.TopologyUpdate update() { return update; } @@ -413,7 +446,7 @@ public TopologyImage read() public void read(AccordTopologyUpdate update) { if (Objects.requireNonNull(update.kind()) == Kind.New) - read = new TopologyImage(update.epoch(), Kind.Image, update.getUpdate()); + read = new TopologyImage(update.epoch(), Kind.Image, update.update()); else read = (TopologyImage) update; write = read; diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index a97eee9dbbe2..c15cf267c8a7 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -35,11 +35,14 @@ import org.slf4j.LoggerFactory; import accord.api.TopologyListener; -import accord.primitives.AbstractRanges; import accord.primitives.Ranges; +import accord.primitives.Routables; +import accord.topology.ActiveEpoch; +import accord.topology.ActiveEpochs; import accord.topology.EpochReady; import accord.topology.Topology; import accord.topology.TopologyManager; +import accord.topology.TopologyManager.RegainingEpochRange; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; @@ -86,6 +89,8 @@ import org.apache.cassandra.utils.concurrent.FutureCombiner; import org.apache.cassandra.utils.vint.VIntCoding; +import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; +import static accord.primitives.Routables.Slice.Minimal; import static com.google.common.collect.ImmutableList.of; import static org.apache.cassandra.tcm.MultiStepOperation.Kind.MOVE; import static org.apache.cassandra.tcm.Transformation.Kind.FINISH_MOVE; @@ -204,62 +209,66 @@ public Transformation.Result applyTo(ClusterMetadata metadata) return applyMultipleTransformations(metadata, next, of(startMove, midMove, finishMove)); } + static class WaitForEpochAndRangeRetirement implements TopologyListener + { + final Condition condition = newOneTimeCondition(); + final long waitingForEpoch; + final Ranges waitingForRanges; + Ranges retiredRanges; + + public WaitForEpochAndRangeRetirement(long waitingForEpoch, Ranges waitingForRanges) + { + this.waitingForEpoch = waitingForEpoch; + this.waitingForRanges = waitingForRanges; + this.retiredRanges = Ranges.EMPTY; + } + + synchronized void updateRetiredRanges(Ranges ranges) + { + ranges = ranges.slice(waitingForRanges, Minimal).without(retiredRanges); + if (!ranges.isEmpty()) + { + retiredRanges = retiredRanges.union(MERGE_ADJACENT, ranges); + if (retiredRanges.containsAll(waitingForRanges)) + condition.signal(); + } + } + + @Override + public synchronized void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology) + { + if (epoch >= waitingForEpoch) + updateRetiredRanges(ranges); + } + } + @Override public SequenceState executeNext() { switch (next) { case START_MOVE: - class WaitForEpochAndRangeRetirement implements TopologyListener - { - final Condition condition; - final long waitingForEpoch; - final Ranges waitingForRange; - Ranges retiredRanges; - - public WaitForEpochAndRangeRetirement(Condition condition, long waitingForEpoch, Ranges waitingForRange) - { - this.condition = condition; - this.waitingForEpoch = waitingForEpoch; - this.waitingForRange = waitingForRange; - this.retiredRanges = Ranges.EMPTY; - } - - public synchronized void updateRetiredRanges(Ranges ranges) - { - retiredRanges = retiredRanges.union(AbstractRanges.UnionMode.MERGE_ADJACENT, ranges); - } - - public synchronized Ranges getRetiredRanges() - { - return retiredRanges; - } - - @Override - public void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology) - { - if (epoch >= waitingForEpoch) - updateRetiredRanges(ranges); - - if (getRetiredRanges().containsAll(waitingForRange)) - condition.signal(); - } - } - WaitForEpochAndRangeRetirement wait = null; try { ClusterMetadata metadata = ClusterMetadata.current(); - TopologyManager.RegainingEpochRange regainingEpochRange = AccordService.instance().topology().epochAndRangeToBeRetired(AccordService.instance().topology().current(), AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); + TopologyManager topologyManager = AccordService.instance().topology(); + AccordService.toFuture(topologyManager.await(metadata.epoch.getEpoch() - 1, null)) + .awaitThrowUncheckedOnInterrupt().rethrowIfFailed(); + + ActiveEpochs activeEpochs = topologyManager.active(); + Topology current = activeEpochs.globalForEpoch(metadata.epoch.getEpoch() - 1); + RegainingEpochRange regaining = topologyManager.computeRegaining(current, AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); - if (regainingEpochRange != null) + if (regaining != null) { Condition condition = newOneTimeCondition(); - wait = new WaitForEpochAndRangeRetirement(condition, regainingEpochRange.epoch(), regainingEpochRange.range()); - AccordService.instance().topology().addListener(wait); - Ranges retiredRanges = AccordService.instance().topology().active().get(regainingEpochRange.epoch()).retired(); - wait.updateRetiredRanges(retiredRanges); + wait = new WaitForEpochAndRangeRetirement(regaining.epoch(), regaining.ranges()); + topologyManager.addListener(wait); + ActiveEpoch e = activeEpochs.ifExists(regaining.epoch()); + if (e != null) + wait.updateRetiredRanges(e.retired()); condition.awaitThrowUncheckedOnInterrupt(); } diff --git a/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java b/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java index 21b7e7bc3aed..137c0a2fe16d 100644 --- a/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java +++ b/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java @@ -126,9 +126,7 @@ private static Gen topologyUpdateGen() for (Node.Id node : topology.nodes()) commandStores.put(node.id, rangesForEpochGen.next(rs)); - Node.Id self = rs.pick(topology.nodes()); - - return new Journal.TopologyUpdate(commandStores, topology); + return new Journal.TopologyUpdate(commandStores, topology, CommandStores.PreviouslyOwned.EMPTY); }; } @@ -156,9 +154,9 @@ private static void maybeUpdatePartitioner(Journal.TopologyUpdate expected) private static void maybeUpdatePartitioner(AccordTopologyUpdate expected) { - Journal.TopologyUpdate update = expected.getUpdate(); + Journal.TopologyUpdate update = expected.update(); if (update != null) - maybeUpdatePartitioner(expected.getUpdate()); + maybeUpdatePartitioner(expected.update()); } private void maybeUpdatePartitioner(CommandStores.RangesForEpoch expected) From 495af2eab0ce4f67780eb6ccc3128ffb00bf4e61 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Fri, 17 Apr 2026 16:18:44 -0700 Subject: [PATCH 11/14] update git modules --- .gitmodules | 4 ++-- modules/accord | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.gitmodules b/.gitmodules index 851765d73e32..6a8a494f31e6 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/belliottsmith/cassandra-accord.git - branch = regain-ranges + url = https://github.com/alanwang67/cassandra-accord.git + branch = deleteCommandStores diff --git a/modules/accord b/modules/accord index 71a0d085b51b..e6768bb40a34 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 71a0d085b51b4b0e95f50ecc56b53291c3e6693e +Subproject commit e6768bb40a34b5cd912fab378bac242084619bd9 From fa06e3c257286f13eca5c4421eea351d0e345029 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 20 Apr 2026 14:24:30 -0700 Subject: [PATCH 12/14] fixed bug --- src/java/org/apache/cassandra/tcm/sequences/Move.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index c15cf267c8a7..2bed641668b1 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -36,7 +36,6 @@ import accord.api.TopologyListener; import accord.primitives.Ranges; -import accord.primitives.Routables; import accord.topology.ActiveEpoch; import accord.topology.ActiveEpochs; import accord.topology.EpochReady; @@ -240,6 +239,11 @@ public synchronized void onEpochRetired(Ranges ranges, long epoch, @Nullable Top if (epoch >= waitingForEpoch) updateRetiredRanges(ranges); } + + public void waitForRetirement() + { + condition.awaitThrowUncheckedOnInterrupt(); + } } @Override @@ -263,13 +267,12 @@ public SequenceState executeNext() if (regaining != null) { - Condition condition = newOneTimeCondition(); wait = new WaitForEpochAndRangeRetirement(regaining.epoch(), regaining.ranges()); topologyManager.addListener(wait); ActiveEpoch e = activeEpochs.ifExists(regaining.epoch()); if (e != null) wait.updateRetiredRanges(e.retired()); - condition.awaitThrowUncheckedOnInterrupt(); + wait.waitForRetirement(); } logger.info("Moving {} from {} to {}.", From 254686652eef963a60a6be0deb442a78be542d6b Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 22 Apr 2026 10:39:56 -0700 Subject: [PATCH 13/14] delete command store --- modules/accord | 2 +- .../apache/cassandra/tcm/sequences/Move.java | 4 +- .../accord/AccordDeleteCommandStoreTest.java | 42 +++++++------------ 3 files changed, 17 insertions(+), 31 deletions(-) diff --git a/modules/accord b/modules/accord index e6768bb40a34..b93f5ad66427 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit e6768bb40a34b5cd912fab378bac242084619bd9 +Subproject commit b93f5ad66427faf1bfa1653364a44451ed4bcf6c diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index 2bed641668b1..1da8ddf284ff 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -258,11 +258,11 @@ public SequenceState executeNext() { ClusterMetadata metadata = ClusterMetadata.current(); TopologyManager topologyManager = AccordService.instance().topology(); - AccordService.toFuture(topologyManager.await(metadata.epoch.getEpoch() - 1, null)) + AccordService.toFuture(topologyManager.await(metadata.epoch.getEpoch(), null)) .awaitThrowUncheckedOnInterrupt().rethrowIfFailed(); ActiveEpochs activeEpochs = topologyManager.active(); - Topology current = activeEpochs.globalForEpoch(metadata.epoch.getEpoch() - 1); + Topology current = activeEpochs.globalForEpoch(metadata.epoch.getEpoch()); RegainingEpochRange regaining = topologyManager.computeRegaining(current, AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); if (regaining != null) diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java index 49a2b5e3fbd5..b230a5121170 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java @@ -20,13 +20,9 @@ import java.io.IOException; import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; +import accord.topology.ActiveEpoch; -import accord.local.CommandStore; -import accord.local.PreLoadContext; -import accord.primitives.Ranges; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; @@ -34,9 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static com.google.common.collect.Iterables.getOnlyElement; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Test; @@ -58,42 +53,33 @@ public static void setupClass() throws IOException .withoutVNodes() .withConfig(config -> config - .set("accord.shard_durability_target_splits", "1") - .set("accord.shard_durability_cycle", "20s") - .with(Feature.NETWORK, Feature.GOSSIP)), 6); + .set("accord.shard_durability_cycle", "15s") + .with(Feature.NETWORK, Feature.GOSSIP)), 4); } @Test public void deleteCommandStoresTest() throws Throwable { List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', - "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}", "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); test(ddls, cluster -> { String newToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); - cluster.get(2).runOnInstance(() -> { - StorageService.instance.move(Long.toString(Long.parseLong(newToken) + 100)); + long epoch = cluster.get(2).callOnInstance(() -> { + long priorEpoch = AccordService.instance().topology().epoch(); + StorageService.instance.move(Long.toString(Long.parseLong(newToken) + 1)); + return priorEpoch; }); cluster.get(2).runOnInstance(() -> { - Set commandStoresThatWillBeRemoved = new HashSet<>(); - - for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) - { - Ranges ranges = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get rangesForEpoch", safeCommandStore -> safeCommandStore.ranges().currentRanges())); - - if (ranges.isEmpty()) - commandStoresThatWillBeRemoved.add(commandStore.id()); - } - - StorageService.instance.move(originalToken); + assertTrue(epoch > 0); + try { + Thread.sleep(30000); + } catch (InterruptedException e) {} + ActiveEpoch e = AccordService.instance().node().topology().active().ifExists(epoch); - for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) - { - assertFalse(commandStoresThatWillBeRemoved.contains(commandStore.id())); - } }); }); } From 44568f69619fed8b40a43c9e6a02a6222f483f78 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 22 Apr 2026 10:45:13 -0700 Subject: [PATCH 14/14] update submodule --- modules/accord | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/accord b/modules/accord index b93f5ad66427..4673322bf2c0 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit b93f5ad66427faf1bfa1653364a44451ed4bcf6c +Subproject commit 4673322bf2c062ef16e2069429eed06bc828ef86