Skip to content
Open
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/alanwang67/cassandra-accord.git
branch = deleteCommandStores
Original file line number Diff line number Diff line change
Expand Up @@ -988,9 +988,9 @@ UnfilteredRowIterator result(JournalKey journalKey, DecoratedKey partitionKey) t
if (read.kind() == Repeat && !hasWritten)
{
Invariants.require(lastImage != null);
write = new TopologyImage(read.epoch(), Image, lastImage.getUpdate());
write = new TopologyImage(read.epoch(), Image, lastImage.update());
}
else if (hasWritten && read.kind() == Repeat && lastImage.getUpdate().isEquivalent(read.getUpdate()))
else if (hasWritten && read.kind() == Repeat && lastImage.update().isEquivalent(read.update()))
{
write = read.asRepeat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,13 @@ public TopologyUpdate next()
logger.error("Encountered TopologyImage Repeat record for epoch {}, but no prior image record was found", ref.key().id.epoch());
return null;
}
prev = reader.read().asImage(Invariants.nonNull(prev.getUpdate()));
prev = reader.read().asImage(Invariants.nonNull(prev.update()));
}
else prev = reader.read();

return new TopologyUpdate(prev.getUpdate().commandStores,
prev.getUpdate().global);
return new TopologyUpdate(prev.update().commandStores,
prev.update().global,
prev.update().previouslyOwned);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import accord.api.Journal;
import accord.local.CommandStores;
import accord.local.CommandStores.PreviouslyOwned;
import accord.primitives.Ranges;
import accord.topology.Topology;
import accord.utils.Invariants;
Expand All @@ -48,7 +49,7 @@ public interface AccordTopologyUpdate
long epoch();
AccordTopologyUpdate asRepeat();

Journal.TopologyUpdate getUpdate();
Journal.TopologyUpdate update();
static AccordTopologyUpdate newTopology(Journal.TopologyUpdate update)
{
return new NewTopology(update);
Expand Down Expand Up @@ -97,12 +98,20 @@ public long serializedSize(CommandStores.RangesForEpoch from)

class TopologyUpdateSerializer implements UnversionedSerializer<Journal.TopologyUpdate>
{
private static final int TOP_BIT = 0x40000000;
public static final TopologyUpdateSerializer instance = new TopologyUpdateSerializer();

@Override
public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IOException
{
out.writeUnsignedVInt32(from.commandStores.size());
out.writeUnsignedVInt32(from.commandStores.size() | TOP_BIT);
out.writeUnsignedVInt32(0);
out.writeUnsignedVInt32(from.previouslyOwned.size());
for (int i = 0 ; i < from.previouslyOwned.size() ; ++i)
{
out.writeUnsignedVInt(from.previouslyOwned.epochs(i));
KeySerializers.ranges.serialize(from.previouslyOwned.ranges(i), out);
}
for (Map.Entry<Integer, CommandStores.RangesForEpoch> e : from.commandStores.entrySet())
{
out.writeUnsignedVInt32(e.getKey());
Expand All @@ -115,6 +124,23 @@ public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IO
public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException
{
int commandStoresSize = in.readUnsignedVInt32();
int flags = 0;
PreviouslyOwned previouslyOwned = PreviouslyOwned.EMPTY;
if ((commandStoresSize & TOP_BIT) != 0)
{
commandStoresSize ^= TOP_BIT;
// future proofing
flags = in.readUnsignedVInt32();
int previouslyOwnedSize = in.readUnsignedVInt32();
long[] epochs = new long[previouslyOwnedSize];
Ranges[] ranges = new Ranges[previouslyOwnedSize];
for (int i = 0 ; i < previouslyOwnedSize ; ++i)
{
epochs[i] = in .readUnsignedVInt();
ranges[i] = KeySerializers.ranges.deserialize(in);
}
previouslyOwned = new PreviouslyOwned(epochs.length == 0 ? 0 : epochs[0], epochs, ranges);
}
Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores = new Int2ObjectHashMap<>();
for (int j = 0; j < commandStoresSize; j++)
{
Expand All @@ -123,13 +149,20 @@ public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException
commandStores.put(commandStoreId, rangesForEpoch);
}
Topology global = TopologySerializers.compactTopology.deserialize(in);
return new Journal.TopologyUpdate(commandStores, global);
return new Journal.TopologyUpdate(commandStores, global, previouslyOwned);
}

@Override
public long serializedSize(Journal.TopologyUpdate from)
{
long size = TypeSizes.sizeofUnsignedVInt(from.commandStores.size());
long size = TypeSizes.sizeofUnsignedVInt(from.commandStores.size() | TOP_BIT);
size += TypeSizes.sizeofUnsignedVInt(0);
size += TypeSizes.sizeofUnsignedVInt(from.previouslyOwned.size());
for (int i = 0 ; i < from.previouslyOwned.size() ; ++i)
{
size += TypeSizes.sizeofUnsignedVInt(from.previouslyOwned.epochs(i));
size += KeySerializers.ranges.serializedSize(from.previouslyOwned.ranges(i));
}
for (Map.Entry<Integer, CommandStores.RangesForEpoch> e : from.commandStores.entrySet())
{
size += TypeSizes.sizeofUnsignedVInt(e.getKey());
Expand Down Expand Up @@ -289,7 +322,7 @@ public long epoch()
}

@Override
public Journal.TopologyUpdate getUpdate()
public Journal.TopologyUpdate update()
{
return update;
}
Expand Down Expand Up @@ -350,7 +383,7 @@ public long epoch()
}

@Override
public Journal.TopologyUpdate getUpdate()
public Journal.TopologyUpdate update()
{
return update;
}
Expand Down Expand Up @@ -413,7 +446,7 @@ public TopologyImage read()
public void read(AccordTopologyUpdate update)
{
if (Objects.requireNonNull(update.kind()) == Kind.New)
read = new TopologyImage(update.epoch(), Kind.Image, update.getUpdate());
read = new TopologyImage(update.epoch(), Kind.Image, update.update());
else
read = (TopologyImage) update;
write = read;
Expand Down
77 changes: 77 additions & 0 deletions src/java/org/apache/cassandra/tcm/sequences/Move.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.api.TopologyListener;
import accord.primitives.Ranges;
import accord.topology.ActiveEpoch;
import accord.topology.ActiveEpochs;
import accord.topology.EpochReady;
import accord.topology.Topology;
import accord.topology.TopologyManager;
import accord.topology.TopologyManager.RegainingEpochRange;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
Expand All @@ -53,6 +62,7 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordTopology;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
Expand All @@ -73,17 +83,21 @@
import org.apache.cassandra.tcm.transformations.PrepareMove;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.vint.VIntCoding;

import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
import static accord.primitives.Routables.Slice.Minimal;
import static com.google.common.collect.ImmutableList.of;
import static org.apache.cassandra.tcm.MultiStepOperation.Kind.MOVE;
import static org.apache.cassandra.tcm.Transformation.Kind.FINISH_MOVE;
import static org.apache.cassandra.tcm.Transformation.Kind.MID_MOVE;
import static org.apache.cassandra.tcm.Transformation.Kind.START_MOVE;
import static org.apache.cassandra.tcm.sequences.SequenceState.continuable;
import static org.apache.cassandra.tcm.sequences.SequenceState.error;
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;

public class Move extends MultiStepOperation<Epoch>
{
Expand Down Expand Up @@ -194,15 +208,73 @@ public Transformation.Result applyTo(ClusterMetadata metadata)
return applyMultipleTransformations(metadata, next, of(startMove, midMove, finishMove));
}

static class WaitForEpochAndRangeRetirement implements TopologyListener
{
final Condition condition = newOneTimeCondition();
final long waitingForEpoch;
final Ranges waitingForRanges;
Ranges retiredRanges;

public WaitForEpochAndRangeRetirement(long waitingForEpoch, Ranges waitingForRanges)
{
this.waitingForEpoch = waitingForEpoch;
this.waitingForRanges = waitingForRanges;
this.retiredRanges = Ranges.EMPTY;
}

synchronized void updateRetiredRanges(Ranges ranges)
{
ranges = ranges.slice(waitingForRanges, Minimal).without(retiredRanges);
if (!ranges.isEmpty())
{
retiredRanges = retiredRanges.union(MERGE_ADJACENT, ranges);
if (retiredRanges.containsAll(waitingForRanges))
condition.signal();
}
}

@Override
public synchronized void onEpochRetired(Ranges ranges, long epoch, @Nullable Topology topology)
{
if (epoch >= waitingForEpoch)
updateRetiredRanges(ranges);
}

public void waitForRetirement()
{
condition.awaitThrowUncheckedOnInterrupt();
}
}

@Override
public SequenceState executeNext()
{
switch (next)
{
case START_MOVE:
WaitForEpochAndRangeRetirement wait = null;

try
{
ClusterMetadata metadata = ClusterMetadata.current();
TopologyManager topologyManager = AccordService.instance().topology();
AccordService.toFuture(topologyManager.await(metadata.epoch.getEpoch(), null))
.awaitThrowUncheckedOnInterrupt().rethrowIfFailed();

ActiveEpochs activeEpochs = topologyManager.active();
Topology current = activeEpochs.globalForEpoch(metadata.epoch.getEpoch());
RegainingEpochRange regaining = topologyManager.computeRegaining(current, AccordTopology.createAccordTopology(applyTo(metadata).success().metadata));

if (regaining != null)
{
wait = new WaitForEpochAndRangeRetirement(regaining.epoch(), regaining.ranges());
topologyManager.addListener(wait);
ActiveEpoch e = activeEpochs.ifExists(regaining.epoch());
if (e != null)
wait.updateRetiredRanges(e.retired());
wait.waitForRetirement();
}

logger.info("Moving {} from {} to {}.",
metadata.directory.endpoint(startMove.nodeId()),
metadata.tokenMap.tokens(startMove.nodeId()),
Expand All @@ -214,6 +286,11 @@ public SequenceState executeNext()
JVMStabilityInspector.inspectThrowable(t);
return continuable();
}
finally
{
if (wait != null)
AccordService.instance().topology().removeListener(wait);
}
break;
case MID_MOVE:
try
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.accord;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import accord.topology.ActiveEpoch;

import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.AccordService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.collect.Iterables.getOnlyElement;
import static org.junit.Assert.assertTrue;

import org.junit.BeforeClass;
import org.junit.Test;

public class AccordDeleteCommandStoreTest extends AccordTestBase
{
private static final Logger logger = LoggerFactory.getLogger(AccordRegainRangesTest.class);

@Override
protected Logger logger()
{
return logger;
}

@BeforeClass
public static void setupClass() throws IOException
{
AccordTestBase.setupCluster(builder -> builder
.withoutVNodes()
.withConfig(config ->
config
.set("accord.shard_durability_cycle", "15s")
.with(Feature.NETWORK, Feature.GOSSIP)), 4);
}

@Test
public void deleteCommandStoresTest() throws Throwable
{
List<String> ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';',
"CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}",
"CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'");
test(ddls, cluster -> {
String newToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens()));
String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens()));

long epoch = cluster.get(2).callOnInstance(() -> {
long priorEpoch = AccordService.instance().topology().epoch();
StorageService.instance.move(Long.toString(Long.parseLong(newToken) + 1));
return priorEpoch;
});

cluster.get(2).runOnInstance(() -> {
assertTrue(epoch > 0);
try {
Thread.sleep(30000);
} catch (InterruptedException e) {}
ActiveEpoch e = AccordService.instance().node().topology().active().ifExists(epoch);

});
});
}
}

Loading