diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index d321bc6794..f59be8c237 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -187,6 +187,7 @@ CommandStore create(int id, * But they may still be ordered for other key ranges they participate in. */ private NavigableMap safeToRead = emptySafeToRead(); + private Ranges regainedRanges = Ranges.EMPTY; private final Set bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private RejectBefore rejectBefore; @@ -401,10 +402,24 @@ protected void unsafeAcceptRequests(Ranges accept) */ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { + if (newSafeToRead != null) + { + for (Map.Entry entry : newSafeToRead.entrySet()) + { + Ranges rangeExcluded = entry.getValue().without(this.regainedRanges); + logger.info("{} is excluded from newSafeToRead because it is in the regained ranges", rangeExcluded); + } + } + node.updateStamp(); this.safeToRead = newSafeToRead; } + final void unsafeAddToRegainedRanges(Ranges newRegainedRanges) + { + this.regainedRanges = newRegainedRanges.union(MERGE_ADJACENT, this.regainedRanges); + } + protected final void unsafeClearSafeToRead() { unsafeSetSafeToRead(null); @@ -1181,6 +1196,13 @@ final void markUnsafeToRead(Ranges ranges) } } + final void markAsRegained(Ranges ranges) + { + execute((Empty) () -> "Mark Range As Regained", safeStore -> { + safeStore.addToRegainedRanges(ranges); + }, agent); + } + public final DataStore unsafeGetDataStore() { return dataStore; diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9a6e2d29af..d073992e38 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.HashMap; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -37,6 +38,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.topology.TopologyException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -109,9 +111,9 @@ class StandardLatentStoreSelector implements LatentStoreSelector @Override public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants) { - return snapshot -> StoreFinder.find(snapshot, participants) + return snapshot -> new StoreIterator(StoreFinder.find(snapshot, participants) .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()) - .iterator(snapshot); + .iterator(snapshot), txnId.epoch()); } } @@ -124,7 +126,7 @@ static LatentStoreSelector standard() public interface StoreSelector extends LatentStoreSelector { default StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants) { return this; } - Iterator select(Snapshot snapshot); + StoreIterator select(Snapshot snapshot); } public static class IncludingSpecificStoreSelector implements StoreSelector @@ -143,14 +145,35 @@ public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Particip StoreFinder finder = StoreFinder.find(snapshot, participants) .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()); finder.set(snapshot.byId.get(storeId)); - return finder.iterator(snapshot); + return new StoreIterator(finder.iterator(snapshot), txnId.epoch()); }; } @Override - public Iterator select(Snapshot snapshot) + public StoreIterator select(Snapshot snapshot) { - return Collections.singletonList(snapshot.byId(storeId)).iterator(); + return new StoreIterator(Collections.singletonList(snapshot.byId(storeId)).iterator(), null); + } + } + + public static class StoreIterator + { + public final Iterator storeIterator; + public final Long minEpoch; + + public StoreIterator(Iterator storeIterator, @Nullable Long minEpoch) + { + this.storeIterator = storeIterator; + this.minEpoch = (minEpoch == null) ? 0L : minEpoch; + } + + public Iterator getStoreIterator() + { + return storeIterator; + } + + public Long getMinEpoch() { + return minEpoch; } } @@ -175,7 +198,7 @@ public static StoreSelector selector(Unseekables unseekables, long minEpoch, return snapshot -> { StoreFinder finder = StoreFinder.find(snapshot, unseekables); finder.filter(snapshot, unseekables, minEpoch, maxEpoch); - return finder.iterator(snapshot); + return new StoreIterator(finder.iterator(snapshot), minEpoch); }; } @@ -577,6 +600,7 @@ public static class Snapshot extends Journal.TopologyUpdate implements Iterable< final Int2IntHashMap byId; private final int[] indexForRange; final SearchableRangeList lookupByRange; + final Map overlappingCommandStores; public Snapshot(ShardHolder[] shards, Topology local, Topology global) { @@ -625,6 +649,22 @@ class RangeAndIndex indexForRange[i] = rangesAndIndexes[i].index; } lookupByRange = SearchableRangeList.build(ranges); + + overlappingCommandStores = new HashMap<>(); + for (int i = 0; i < shards.length ; i++) + overlappingCommandStores.put(i, new LargeBitSet(shards.length)); + + for (int i = 0; i < shards.length; i++) + { + for (int j = i; j < shards.length; j++) + { + if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty()) + { + overlappingCommandStores.get(i).set(j); + overlappingCommandStores.get(j).set(i); + } + } + } } // This method exists to ensure we do not hold references to command stores @@ -759,6 +799,14 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); } + + Ranges regainedRanges = shard.ranges().all().slice(added, Minimal); + if (!regainedRanges.isEmpty()) + { + shard.store.markUnsafeToRead(regainedRanges); + shard.store.markAsRegained(regainedRanges); + } + // TODO (desired): only sync affected shards Ranges ranges = shard.ranges().currentRanges(); // ranges can be empty when ranges are lost or consolidated across epochs. @@ -852,7 +900,7 @@ public void forAllUnsafe(Consumer forEach) public AsyncChain forAll(String reason, Consumer forEach) { - return mapReduce(snapshot -> Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), new MapReduceCommandStores<>(RoutingKeys.EMPTY) + return mapReduce(snapshot -> new StoreIterator(Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), null), new MapReduceCommandStores<>(RoutingKeys.EMPTY) { @Override public Void reduce(Void o1, Void o2) { return null; } @Override public TxnId primaryTxnId() { return null; } @@ -921,27 +969,59 @@ public Cancellable mapReduceConsume(IntStream commandStoreIds, MapReduceCons public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandStores mapReduce) { - return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); + return mapReduce(snapshot -> new StoreIterator(commandStoreIds.mapToObj(snapshot::byId).iterator(), null), mapReduce); } public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; - Iterator stores = selector.select(snapshot); + StoreIterator storeIterator = selector.select(snapshot); + Iterator stores = storeIterator.getStoreIterator(); AsyncChain chain = null; + LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length); while (stores.hasNext()) { - AsyncChain next = mapReduceConsume.applyAsync(stores.next()); + CommandStore store = stores.next(); + bitSet.set(snapshot.byId.get(store.id())); + AsyncChain next = mapReduceConsume.applyAsync(store); if (next != null) chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } + + if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), storeIterator.getMinEpoch())) + return AsyncChains.failure(new RuntimeException("Tried to query more than one CommandStore for the same range")); + return chain == null ? AsyncChains.success(null) : chain; } + public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange, Long minEpoch) + { + // Check that we are not querying two command stores for the same range + for (Map.Entry entry : overlappingCommandStores.entrySet()) + { + Ranges range = Ranges.EMPTY; + LargeBitSet overlappingCommandStore = entry.getValue(); + for (int i = overlappingCommandStore.firstSetBit(); i >= 0 ; i = overlappingCommandStore.nextSetBit(i + 1, -1)) + { + if (commandStoresSeen.get(i)) + { + Ranges touchedKeys = queryRange.slice(shards[i].ranges().allSince(minEpoch), Minimal); + + if (!range.slice(touchedKeys, Minimal).isEmpty()) + return false; + + range = range.with(touchedKeys.toRanges()); + } + } + } + + return true; + } + public O mapReduceUnsafe(StoreSelector selector, Function map, BiFunction reduce, O accumulator) { Snapshot snapshot = current; - Iterator stores = selector.select(snapshot); + Iterator stores = selector.select(snapshot).getStoreIterator(); while (stores.hasNext()) { O next = map.apply(stores.next()); diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 9c4f1620c4..b4f57313aa 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -765,7 +765,7 @@ public CoordinationAdapter coordinationAdapter(TxnId txnId, Kind kind) public AsyncChain updateMinHlc(long minHlc) { // TODO (required): command stores that are not ready due to bootstrap need to refresh their min HLC on bootstrap completion - StoreSelector selector = snapshot -> Stream.of(snapshot.shards).map(sh -> sh.store).iterator(); + StoreSelector selector = snapshot -> new CommandStores.StoreIterator(Stream.of(snapshot.shards).map(sh -> sh.store).iterator(), null); return commandStores().mapReduce(selector, new MapReduceCommandStores<>(RoutingKeys.EMPTY) { @Override public Void reduce(Void o1, Void o2) { return null; } diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 99faad7a0b..ba88a5ddf8 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -384,6 +384,11 @@ public void setSafeToRead(NavigableMap newSafeToRead) commandStore().unsafeSetSafeToRead(newSafeToRead); } + public void addToRegainedRanges(Ranges newRegainedRanges) + { + commandStore().unsafeAddToRegainedRanges(newRegainedRanges); + } + public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) { commandStore().unsafeSetRangesForEpoch(rangesForEpoch); diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index cba5a4c20a..b0d9508662 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -765,6 +765,23 @@ public void forEach(Consumer forEach) forEach.accept(shards[i]); } + public static Map computeNodeAdditions(Topology current, Topology next) + { + Map additions = new HashMap<>(); + for (Id node : next.nodes()) + { + Ranges prev = current.rangesForNode(node); + if (prev == null) prev = Ranges.EMPTY; + + Ranges added = next.rangesForNode(node).without(prev); + if (added.isEmpty()) + continue; + + additions.put(node, added); + } + return additions; + } + public SortedArrayList nodes() { return nodes; diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 4ca397da9f..52eedf507d 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -19,6 +19,7 @@ package accord.topology; import java.util.IdentityHashMap; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -27,6 +28,7 @@ import java.util.function.Supplier; import javax.annotation.Nullable; +import accord.primitives.Routables; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,8 @@ import accord.utils.async.AsyncResults; import accord.utils.async.NestedAsyncResult; +import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; + /** * Manages topology state changes and update bookkeeping * @@ -308,6 +312,56 @@ public void reportTopology(Topology topology) updateActive(); } + public static class RegainingEpochRange + { + public final long epoch; + public final Ranges range; + + public RegainingEpochRange(long epoch, Ranges range) + { + this.epoch = epoch; + this.range = range; + } + + public long epoch() + { + return epoch; + } + + public Ranges range() + { + return range; + } + } + + @Nullable + public RegainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next) + { + Map additions = Topology.computeNodeAdditions(curr, next); + long greatestEpoch = -1; + Ranges range = Ranges.EMPTY; + + synchronized (this) + { + for (Map.Entry entry : additions.entrySet()) + { + for (ActiveEpoch activeEpoch : this.active) { + if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue()) && greatestEpoch < activeEpoch.epoch()) + { + greatestEpoch = activeEpoch.epoch(); + range = range.union(MERGE_ADJACENT, activeEpoch.all().rangesForNode(entry.getKey()).slice(entry.getValue(), Routables.Slice.Minimal)); + break; + } + } + } + } + + if (greatestEpoch != -1) + return new RegainingEpochRange(greatestEpoch, range); + + return null; + } + private final AtomicBoolean updatingActive = new AtomicBoolean(); private void updateActive() { diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java index a8f8490acd..c778e707c1 100644 --- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java +++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java @@ -413,33 +413,37 @@ private static int prefix(Shard shard) return ((PrefixedIntHashKey) shard.range.start()).prefix; } - private static Map getAdditions(Topology current, Topology next) + private boolean validToReassignRange(Topology current, Shard[] nextShards, Map previouslyReplicated) { - Map additions = new HashMap<>(); - for (Id node : next.nodes()) - { - Ranges prev = current.rangesForNode(node); - if (prev == null) prev = Ranges.EMPTY; - - Ranges added = next.rangesForNode(node).without(prev); - if (added.isEmpty()) - continue; + Topology next = new Topology(current.epoch + 1, nextShards); + Map additions = Topology.computeNodeAdditions(current, next); - additions.put(node, added); + for (Map.Entry entry : additions.entrySet()) + { + if (previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY).intersects(entry.getValue()) + && !(previousEpochForRegainedRangeRetired(current, entry.getValue()))) + return false; } - return additions; + + return true; } - private static boolean reassignsRanges(Topology current, Shard[] nextShards, Map previouslyReplicated) + private boolean previousEpochForRegainedRangeRetired(Topology current, Ranges regainingRanges) { - Topology next = new Topology(current.epoch + 1, nextShards); - Map additions = getAdditions(current, next); - - for (Map.Entry entry : additions.entrySet()) + for (Id id : current.nodes()) { - if (previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY).intersects(entry.getValue())) + Node node = this.nodeLookup.apply(id); + boolean isRetiredForNode = true; + for (ActiveEpoch epoch : node.topology().active()) + { + if (epoch.all().ranges.intersects(regainingRanges) && !epoch.allRetired()) + isRetiredForNode = false; + } + + if (isRetiredForNode) return true; } + return false; } @@ -472,7 +476,7 @@ public synchronized Topology updateTopology() Shard[] testShards = type.apply(state, random); Arrays.sort(testShards, (a, b) -> a.range.compareTo(b.range)); if (!everyShardHasQuorumOverlaps(oldShards, testShards) - || reassignsRanges(current, testShards, previouslyReplicated)) + || !validToReassignRange(current, testShards, previouslyReplicated)) { ++rejectedMutations; } @@ -488,7 +492,7 @@ public synchronized Topology updateTopology() Topology nextTopology = new Topology(current.epoch + 1, newShards); - Map nextAdditions = getAdditions(current, nextTopology); + Map nextAdditions = Topology.computeNodeAdditions(current, nextTopology); for (Map.Entry entry : nextAdditions.entrySet()) { previouslyReplicated.merge(entry.getKey(), entry.getValue(), (a, b) -> a.union(MERGE_ADJACENT, b));