diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..6a8a494f31e6 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/alanwang67/cassandra-accord.git + branch = deleteCommandStores diff --git a/modules/accord b/modules/accord index 93d78be37ef9..4673322bf2c0 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 93d78be37ef904118da2426716b5d77c0fb227db +Subproject commit 4673322bf2c062ef16e2069429eed06bc828ef86 diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 7f97dc0663cc..54ab4ba35eb8 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -988,9 +988,9 @@ UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) t if (read.kind() == Repeat && !hasWritten) { Invariants.require(lastImage != null); - write = new TopologyImage(read.epoch(), Image, lastImage.getUpdate()); + write = new TopologyImage(read.epoch(), Image, lastImage.update()); } - else if (hasWritten && read.kind() == Repeat && lastImage.getUpdate().isEquivalent(read.getUpdate())) + else if (hasWritten && read.kind() == Repeat && lastImage.update().isEquivalent(read.update())) { write = read.asRepeat(); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 00f03be854b5..09717d911526 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -405,12 +405,13 @@ public TopologyUpdate next() logger.error("Encountered TopologyImage Repeat record for epoch {}, but no prior image record was found", ref.key().id.epoch()); return null; } - prev = reader.read().asImage(Invariants.nonNull(prev.getUpdate())); + prev = reader.read().asImage(Invariants.nonNull(prev.update())); } else prev = reader.read(); - return new TopologyUpdate(prev.getUpdate().commandStores, - prev.getUpdate().global); + return new TopologyUpdate(prev.update().commandStores, + prev.update().global, + prev.update().previouslyOwned); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java index efb92c2df15d..b1856987659a 100644 --- a/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java +++ b/src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java @@ -26,6 +26,7 @@ import accord.api.Journal; import accord.local.CommandStores; +import accord.local.CommandStores.PreviouslyOwned; import accord.primitives.Ranges; import accord.topology.Topology; import accord.utils.Invariants; @@ -48,7 +49,7 @@ public interface AccordTopologyUpdate long epoch(); AccordTopologyUpdate asRepeat(); - Journal.TopologyUpdate getUpdate(); + Journal.TopologyUpdate update(); static AccordTopologyUpdate newTopology(Journal.TopologyUpdate update) { return new NewTopology(update); @@ -97,12 +98,20 @@ public long serializedSize(CommandStores.RangesForEpoch from) class TopologyUpdateSerializer implements UnversionedSerializer { + private static final int TOP_BIT = 0x40000000; public static final TopologyUpdateSerializer instance = new TopologyUpdateSerializer(); @Override public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IOException { - out.writeUnsignedVInt32(from.commandStores.size()); + out.writeUnsignedVInt32(from.commandStores.size() | TOP_BIT); + out.writeUnsignedVInt32(0); + out.writeUnsignedVInt32(from.previouslyOwned.size()); + for (int i = 0 ; i < from.previouslyOwned.size() ; ++i) + { + out.writeUnsignedVInt(from.previouslyOwned.epochs(i)); + KeySerializers.ranges.serialize(from.previouslyOwned.ranges(i), out); + } for (Map.Entry e : from.commandStores.entrySet()) { out.writeUnsignedVInt32(e.getKey()); @@ -115,6 +124,23 @@ public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IO public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException { int commandStoresSize = in.readUnsignedVInt32(); + int flags = 0; + PreviouslyOwned previouslyOwned = PreviouslyOwned.EMPTY; + if ((commandStoresSize & TOP_BIT) != 0) + { + commandStoresSize ^= TOP_BIT; + // future proofing + flags = in.readUnsignedVInt32(); + int previouslyOwnedSize = in.readUnsignedVInt32(); + long[] epochs = new long[previouslyOwnedSize]; + Ranges[] ranges = new Ranges[previouslyOwnedSize]; + for (int i = 0 ; i < previouslyOwnedSize ; ++i) + { + epochs[i] = in .readUnsignedVInt(); + ranges[i] = KeySerializers.ranges.deserialize(in); + } + previouslyOwned = new PreviouslyOwned(epochs.length == 0 ? 0 : epochs[0], epochs, ranges); + } Int2ObjectHashMap commandStores = new Int2ObjectHashMap<>(); for (int j = 0; j < commandStoresSize; j++) { @@ -123,13 +149,20 @@ public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException commandStores.put(commandStoreId, rangesForEpoch); } Topology global = TopologySerializers.compactTopology.deserialize(in); - return new Journal.TopologyUpdate(commandStores, global); + return new Journal.TopologyUpdate(commandStores, global, previouslyOwned); } @Override public long serializedSize(Journal.TopologyUpdate from) { - long size = TypeSizes.sizeofUnsignedVInt(from.commandStores.size()); + long size = TypeSizes.sizeofUnsignedVInt(from.commandStores.size() | TOP_BIT); + size += TypeSizes.sizeofUnsignedVInt(0); + size += TypeSizes.sizeofUnsignedVInt(from.previouslyOwned.size()); + for (int i = 0 ; i < from.previouslyOwned.size() ; ++i) + { + size += TypeSizes.sizeofUnsignedVInt(from.previouslyOwned.epochs(i)); + size += KeySerializers.ranges.serializedSize(from.previouslyOwned.ranges(i)); + } for (Map.Entry e : from.commandStores.entrySet()) { size += TypeSizes.sizeofUnsignedVInt(e.getKey()); @@ -289,7 +322,7 @@ public long epoch() } @Override - public Journal.TopologyUpdate getUpdate() + public Journal.TopologyUpdate update() { return update; } @@ -350,7 +383,7 @@ public long epoch() } @Override - public Journal.TopologyUpdate getUpdate() + public Journal.TopologyUpdate update() { return update; } @@ -413,7 +446,7 @@ public TopologyImage read() public void read(AccordTopologyUpdate update) { if (Objects.requireNonNull(update.kind()) == Kind.New) - read = new TopologyImage(update.epoch(), Kind.Image, update.getUpdate()); + read = new TopologyImage(update.epoch(), Kind.Image, update.update()); else read = (TopologyImage) update; write = read; diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index a26d6ded4500..1da8ddf284ff 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -27,12 +27,21 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.TopologyListener; +import accord.primitives.Ranges; +import accord.topology.ActiveEpoch; +import accord.topology.ActiveEpochs; import accord.topology.EpochReady; +import accord.topology.Topology; +import accord.topology.TopologyManager; +import accord.topology.TopologyManager.RegainingEpochRange; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; @@ -53,6 +62,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.AccordTopology; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamResultFuture; @@ -73,10 +83,13 @@ import org.apache.cassandra.tcm.transformations.PrepareMove; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.concurrent.Condition; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; import org.apache.cassandra.utils.vint.VIntCoding; +import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; +import static accord.primitives.Routables.Slice.Minimal; import static com.google.common.collect.ImmutableList.of; import static org.apache.cassandra.tcm.MultiStepOperation.Kind.MOVE; import static org.apache.cassandra.tcm.Transformation.Kind.FINISH_MOVE; @@ -84,6 +97,7 @@ import static org.apache.cassandra.tcm.Transformation.Kind.START_MOVE; import static org.apache.cassandra.tcm.sequences.SequenceState.continuable; import static org.apache.cassandra.tcm.sequences.SequenceState.error; +import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition; public class Move extends MultiStepOperation { @@ -194,15 +208,73 @@ public Transformation.Result applyTo(ClusterMetadata metadata) return applyMultipleTransformations(metadata, next, of(startMove, midMove, finishMove)); } + static class WaitForEpochAndRangeRetirement implements TopologyListener + { + final Condition condition = newOneTimeCondition(); + final long waitingForEpoch; + final Ranges waitingForRanges; + Ranges retiredRanges; + + public WaitForEpochAndRangeRetirement(long waitingForEpoch, Ranges waitingForRanges) + { + this.waitingForEpoch = waitingForEpoch; + this.waitingForRanges = waitingForRanges; + this.retiredRanges = Ranges.EMPTY; + } + + synchronized void updateRetiredRanges(Ranges ranges) + { + ranges = ranges.slice(waitingForRanges, Minimal).without(retiredRanges); + if (!ranges.isEmpty()) + { + retiredRanges = retiredRanges.union(MERGE_ADJACENT, ranges); + if (retiredRanges.containsAll(waitingForRanges)) + condition.signal(); + } + } + + @Override + public synchronized void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology) + { + if (epoch >= waitingForEpoch) + updateRetiredRanges(ranges); + } + + public void waitForRetirement() + { + condition.awaitThrowUncheckedOnInterrupt(); + } + } + @Override public SequenceState executeNext() { switch (next) { case START_MOVE: + WaitForEpochAndRangeRetirement wait = null; + try { ClusterMetadata metadata = ClusterMetadata.current(); + TopologyManager topologyManager = AccordService.instance().topology(); + AccordService.toFuture(topologyManager.await(metadata.epoch.getEpoch(), null)) + .awaitThrowUncheckedOnInterrupt().rethrowIfFailed(); + + ActiveEpochs activeEpochs = topologyManager.active(); + Topology current = activeEpochs.globalForEpoch(metadata.epoch.getEpoch()); + RegainingEpochRange regaining = topologyManager.computeRegaining(current, AccordTopology.createAccordTopology(applyTo(metadata).success().metadata)); + + if (regaining != null) + { + wait = new WaitForEpochAndRangeRetirement(regaining.epoch(), regaining.ranges()); + topologyManager.addListener(wait); + ActiveEpoch e = activeEpochs.ifExists(regaining.epoch()); + if (e != null) + wait.updateRetiredRanges(e.retired()); + wait.waitForRetirement(); + } + logger.info("Moving {} from {} to {}.", metadata.directory.endpoint(startMove.nodeId()), metadata.tokenMap.tokens(startMove.nodeId()), @@ -214,6 +286,11 @@ public SequenceState executeNext() JVMStabilityInspector.inspectThrowable(t); return continuable(); } + finally + { + if (wait != null) + AccordService.instance().topology().removeListener(wait); + } break; case MID_MOVE: try diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java new file mode 100644 index 000000000000..b230a5121170 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordDeleteCommandStoreTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import accord.topology.ActiveEpoch; + +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.junit.Assert.assertTrue; + +import org.junit.BeforeClass; +import org.junit.Test; + +public class AccordDeleteCommandStoreTest extends AccordTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(AccordRegainRangesTest.class); + + @Override + protected Logger logger() + { + return logger; + } + + @BeforeClass + public static void setupClass() throws IOException + { + AccordTestBase.setupCluster(builder -> builder + .withoutVNodes() + .withConfig(config -> + config + .set("accord.shard_durability_cycle", "15s") + .with(Feature.NETWORK, Feature.GOSSIP)), 4); + } + + @Test + public void deleteCommandStoresTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + String newToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long epoch = cluster.get(2).callOnInstance(() -> { + long priorEpoch = AccordService.instance().topology().epoch(); + StorageService.instance.move(Long.toString(Long.parseLong(newToken) + 1)); + return priorEpoch; + }); + + cluster.get(2).runOnInstance(() -> { + assertTrue(epoch > 0); + try { + Thread.sleep(30000); + } catch (InterruptedException e) {} + ActiveEpoch e = AccordService.instance().node().topology().active().ifExists(epoch); + + }); + }); + } +} + diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java new file mode 100644 index 000000000000..3a5481101af0 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordRegainRangesTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import accord.api.RoutingKey; +import accord.local.CommandStore; +import accord.local.PreLoadContext; +import accord.primitives.AbstractRanges; +import accord.primitives.Ranges; +import accord.topology.TopologyException; +import accord.topology.TopologyRetiredException; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.TokenKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.service.accord.AccordService.getBlocking; +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.BeforeClass; +import org.junit.Test; + +public class AccordRegainRangesTest extends AccordTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(AccordRegainRangesTest.class); + + @Override + protected Logger logger() + { + return logger; + } + + @BeforeClass + public static void setupClass() throws IOException + { + AccordTestBase.setupCluster(builder -> builder + .withoutVNodes() + .withConfig(config -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)), 6); + } + + @Test + public void regainRangesTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assertTrue(token < Long.parseLong(originalToken)); + + long epoch = cluster.get(2).callOnInstance(() -> { + long priorEpoch = AccordService.instance().topology().epoch(); + StorageService.instance.move(Long.toString(token - 1000)); + return priorEpoch; + }); + + cluster.get(2).runOnInstance(() -> { + StorageService.instance.move(originalToken); + }); + + String tableName = accordTableName; + + // Ensure no overlapping safeToRead ranges + cluster.get(2).runOnInstance(() -> { + TableId tid = Schema.instance.getTableMetadata(KEYSPACE, tableName).id(); + RoutingKey start = TokenKey.parse(tid, String.valueOf(token), Murmur3Partitioner.instance); + RoutingKey end = TokenKey.parse(tid, originalToken, Murmur3Partitioner.instance); + Ranges regainedRange = Ranges.of(TokenRange.create(start, end)); + try + { + assertTrue(AccordService.instance().topology().active().get(epoch).retired().containsAll(regainedRange)); + } + catch (TopologyRetiredException ignored) + { + } + catch (TopologyException e) + { + fail(); + } + + Ranges range = Ranges.EMPTY; + for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) { + Ranges safeToReadRanges = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "No overlapping safeToReadRanges", safeCommandStore -> { + Ranges mergedRanges = Ranges.EMPTY; + for (Ranges r : safeCommandStore.safeToReadAt().values()) + mergedRanges = mergedRanges.union(AbstractRanges.UnionMode.MERGE_ADJACENT, r); + return mergedRanges; + })); + + assertTrue(range.overlapping(safeToReadRanges).isEmpty()); + range = range.union(AbstractRanges.UnionMode.MERGE_ADJACENT, safeToReadRanges); + } + }); + }); + } +} + diff --git a/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java b/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java index 21b7e7bc3aed..137c0a2fe16d 100644 --- a/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java +++ b/test/unit/org/apache/cassandra/service/accord/journal/AccordTopologyUpdateTest.java @@ -126,9 +126,7 @@ private static Gen topologyUpdateGen() for (Node.Id node : topology.nodes()) commandStores.put(node.id, rangesForEpochGen.next(rs)); - Node.Id self = rs.pick(topology.nodes()); - - return new Journal.TopologyUpdate(commandStores, topology); + return new Journal.TopologyUpdate(commandStores, topology, CommandStores.PreviouslyOwned.EMPTY); }; } @@ -156,9 +154,9 @@ private static void maybeUpdatePartitioner(Journal.TopologyUpdate expected) private static void maybeUpdatePartitioner(AccordTopologyUpdate expected) { - Journal.TopologyUpdate update = expected.getUpdate(); + Journal.TopologyUpdate update = expected.update(); if (update != null) - maybeUpdatePartitioner(expected.getUpdate()); + maybeUpdatePartitioner(expected.update()); } private void maybeUpdatePartitioner(CommandStores.RangesForEpoch expected)