diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index a26d6ded4500..a1b6f48df73d 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -27,12 +27,20 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.TopologyListener; +import accord.primitives.Ranges; import accord.topology.EpochReady; +import accord.topology.Topology; +import accord.topology.TopologyException; +import accord.topology.TopologyManager; +import accord.topology.TopologyRetiredException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; @@ -53,6 +61,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.AccordTopology; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamResultFuture; @@ -73,6 +82,7 @@ import org.apache.cassandra.tcm.transformations.PrepareMove; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.concurrent.Condition; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; import org.apache.cassandra.utils.vint.VIntCoding; @@ -84,6 +94,7 @@ import static org.apache.cassandra.tcm.Transformation.Kind.START_MOVE; import static org.apache.cassandra.tcm.sequences.SequenceState.continuable; import static org.apache.cassandra.tcm.sequences.SequenceState.error; +import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; public class Move extends MultiStepOperation { @@ -203,6 +214,49 @@ public SequenceState executeNext() try { ClusterMetadata metadata = ClusterMetadata.current(); + TopologyManager.RegainingEpochRange regainingEpochRange = AccordService.instance().topology().epochAndRangeToBeRetired(AccordService.instance().topology().current(), AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); + if (regainingEpochRange != null) + { + Condition condition = newOneTimeCondition(); + + class waitForEpochAndRangeRetirement implements TopologyListener + { + final Condition condition; + final long waitingForEpoch; + final Ranges waitingForRange; + + public waitForEpochAndRangeRetirement(Condition condition, long waitingForEpoch, Ranges waitingForRange) + { + this.condition = condition; + this.waitingForEpoch = waitingForEpoch; + this.waitingForRange = waitingForRange; + } + + @Override + public void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology) + { + try + { + if (epoch >= waitingForEpoch && AccordService.instance().topology().active().get(waitingForEpoch).retired().containsAll(waitingForRange)) + condition.signal(); + } + catch (TopologyRetiredException e) + { + condition.signal(); + } + catch (TopologyException e) + { + logger.info("Topology exception: ", e); + } + } + } + + waitForEpochAndRangeRetirement wait = new waitForEpochAndRangeRetirement(condition, regainingEpochRange.epoch(), regainingEpochRange.range()); + AccordService.instance().topology().addListener(wait); + condition.awaitThrowUncheckedOnInterrupt(); + AccordService.instance().topology().removeListener(wait); + } + logger.info("Moving {} from {} to {}.", metadata.directory.endpoint(startMove.nodeId()), metadata.tokenMap.tokens(startMove.nodeId()), diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java new file mode 100644 index 000000000000..91a44cba1c9d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import accord.api.RoutingKey; +import accord.local.CommandStore; +import accord.local.CommandStores; +import accord.local.PreLoadContext; +import accord.local.SafeCommandStore; +import accord.primitives.AbstractRanges; +import accord.primitives.Ranges; +import accord.topology.Shard; +import accord.topology.Topology; +import accord.topology.TopologyException; +import accord.topology.TopologyManager; +import accord.topology.TopologyRetiredException; +import accord.utils.LargeBitSet; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.TokenKey; +import org.apache.cassandra.tcm.ClusterMetadataService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static accord.local.CommandStores.checkQueryDisjointRangesAcrossCommandStores; +import static org.apache.cassandra.service.accord.AccordService.getBlocking; +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.junit.Assert.assertFalse; + +import org.junit.BeforeClass; +import org.junit.Test; + +public class AccordRegainRangesTest extends AccordTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(AccordRegainRangesTest.class); + + @Override + protected Logger logger() + { + return logger; + } + + @BeforeClass + public static void setupClass() throws IOException + { + AccordTestBase.setupCluster(builder -> builder + .withoutVNodes() + .withConfig(config -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)), 6); + } + + @Test + public void regainRangesTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assert(token < Long.parseLong(originalToken)); + + long epoch = cluster.get(2).callOnInstance(() -> { + long priorEpoch = AccordService.instance().topology().epoch(); + StorageService.instance.move(Long.toString(token - 1000)); + return priorEpoch; + }); + + cluster.get(2).runOnInstance(() -> { + StorageService.instance.move(originalToken); + }); + + String tableName = accordTableName; + + // Ensure no overlapping safeToRead ranges + cluster.get(2).runOnInstance(() -> { + TableId tid = Schema.instance.getTableMetadata(KEYSPACE, tableName).id(); + RoutingKey start = TokenKey.parse(tid, String.valueOf(token), Murmur3Partitioner.instance); + RoutingKey end = TokenKey.parse(tid, originalToken, Murmur3Partitioner.instance); + Ranges regainedRange = Ranges.of(TokenRange.create(start, end)); + try + { + assert (AccordService.instance().topology().active().get(epoch).retired().containsAll(regainedRange)); + } + catch (TopologyRetiredException ignored) + { + } + catch (TopologyException e) + { + assert(false); + } + + Ranges range = Ranges.EMPTY; + for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) { + Ranges safeToReadRanges = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "No overlapping safeToReadRanges", safeCommandStore -> { + Ranges mergedRanges = Ranges.EMPTY; + for (Ranges r : safeCommandStore.safeToReadAt().values()) + mergedRanges = mergedRanges.union(AbstractRanges.UnionMode.MERGE_ADJACENT, r); + return mergedRanges; + })); + + assert(range.overlapping(safeToReadRanges).isEmpty()); + range = range.union(AbstractRanges.UnionMode.MERGE_ADJACENT, safeToReadRanges); + } + }); + }); + } +} +