diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index 381f1be2d6..cf9986c61f 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -28,6 +28,7 @@ import accord.impl.CommandChange; import accord.local.Command; import accord.local.CommandStores; +import accord.local.CommandStores.PreviouslyOwned; import accord.local.DurableBefore; import accord.local.Node; import accord.local.RedundantBefore; @@ -85,11 +86,13 @@ class TopologyUpdate { public final Int2ObjectHashMap commandStores; public final Topology global; + public final PreviouslyOwned previouslyOwned; - public TopologyUpdate(@Nonnull Int2ObjectHashMap commandStores, @Nonnull Topology global) + public TopologyUpdate(@Nonnull Int2ObjectHashMap commandStores, @Nonnull Topology global, PreviouslyOwned previouslyOwned) { this.commandStores = commandStores; this.global = global; + this.previouslyOwned = previouslyOwned; } public boolean isEquivalent(TopologyUpdate other) @@ -103,7 +106,7 @@ public boolean isEquivalent(TopologyUpdate other) public TopologyUpdate cloneWithEquivalentEpoch(long epoch) { - return new TopologyUpdate(commandStores, global.cloneEquivalentWithEpoch(epoch)); + return new TopologyUpdate(commandStores, global.cloneEquivalentWithEpoch(epoch), previouslyOwned); } @Override @@ -150,6 +153,7 @@ class FieldUpdates public RedundantBefore newRedundantBefore; public NavigableMap newBootstrapBeganAt; public NavigableMap newSafeToRead; + public Ranges newPermanentlyUnsafeToRead; public CommandStores.RangesForEpoch newRangesForEpoch; public String toString() @@ -161,6 +165,8 @@ public String toString() builder.append("newBootstrapBeganAt=").append(newBootstrapBeganAt).append(", "); if (newSafeToRead != null) builder.append("newSafeToRead=").append(newSafeToRead).append(", "); + if (newPermanentlyUnsafeToRead != null) + builder.append("newPermanentlyUnsafeToRead=").append(newPermanentlyUnsafeToRead).append(", "); if (newRangesForEpoch != null) builder.append("newRangesForEpoch=").append(newRangesForEpoch).append(", "); builder.setLength(builder.length() - 2); diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java index dd3b3fdb8c..1f7fecfea6 100644 --- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java +++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java @@ -55,7 +55,6 @@ public class MaybeRecover extends CheckShards> this.recoverIfAlreadyDurable = recoverIfAlreadyDurable; this.reportTo = reportTo; } - public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route someRoute, ProgressToken prevProgress, boolean recoverIfAlreadyDurable, LatentStoreSelector reportTo, BiConsumer callback) { MaybeRecover maybeRecover; diff --git a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java index 24d52839d9..b684575126 100644 --- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java +++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java @@ -243,6 +243,12 @@ public final void setSafeToRead(NavigableMap newSafeToRead) ensureFieldUpdates().newSafeToRead = newSafeToRead; } + @Override + public final void setPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) + { + ensureFieldUpdates().newPermanentlyUnsafeToRead = newPermanentlyUnsafeToRead; + } + @Override public void setRangesForEpoch(RangesForEpoch rangesForEpoch) { diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index d321bc6794..5af1601a25 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -187,6 +187,7 @@ CommandStore create(int id, * But they may still be ordered for other key ranges they participate in. */ private NavigableMap safeToRead = emptySafeToRead(); + private Ranges permanentlyUnsafeToRead = Ranges.EMPTY; private final Set bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private RejectBefore rejectBefore; @@ -399,10 +400,17 @@ protected void unsafeAcceptRequests(Ranges accept) /** * This method may be invoked on a non-CommandStore thread */ - final void unsafeSetSafeToRead(NavigableMap newSafeToRead) + final void unsafeSetSafeToRead(@Nullable NavigableMap newSafeToRead) { + if (newSafeToRead != null) + newSafeToRead = purgeHistory(newSafeToRead, permanentlyUnsafeToRead); + safeToRead = newSafeToRead; node.updateStamp(); - this.safeToRead = newSafeToRead; + } + + final void unsafeSetPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) + { + this.permanentlyUnsafeToRead = newPermanentlyUnsafeToRead; } protected final void unsafeClearSafeToRead() @@ -1181,6 +1189,14 @@ final void markUnsafeToRead(Ranges ranges) } } + final AsyncChain markPermanentlyUnsafeToRead(Ranges ranges) + { + return chain((Empty) () -> "Mark Range As Permanently Unsafe To Read", safeStore -> { + safeStore.setSafeToRead(purgeHistory(safeToRead, ranges)); + safeStore.setPermanentlyUnsafeToRead(permanentlyUnsafeToRead.union(MERGE_ADJACENT, ranges)); + }); + } + public final DataStore unsafeGetDataStore() { return dataStore; diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9a6e2d29af..ef0ca3461c 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -25,11 +25,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -98,6 +95,12 @@ public abstract class CommandStores implements AsyncExecutorFactory @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(CommandStores.class); + static final Iterator INVALID = new Iterator<>() + { + @Override public boolean hasNext() { throw new UnsupportedOperationException(); } + @Override public CommandStore next() { throw new UnsupportedOperationException(); } + }; + public interface LatentStoreSelector { StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants); @@ -158,6 +161,7 @@ public Iterator select(Snapshot snapshot) public static class StoreFinder extends LargeBitSet implements IndexedQuadConsumer, IndexedRangeQuadConsumer { final int[] indexMap; + private boolean invalid; private StoreFinder(int size, int[] indexMap) { @@ -210,13 +214,23 @@ public StoreFinder filter(Snapshot snapshot, Unseekables unseekables, long mi ShardHolder shard = snapshot.shards[i]; Ranges shardRanges = shard.ranges().allBetween(minEpoch, maxEpoch); if (shardRanges != shard.ranges.all() && !shardRanges.intersects(unseekables)) + { unset(i); + } + else if (unsafelyTouchesRegainedRanges(snapshot, shard, unseekables, minEpoch)) + { + invalid = true; + break; + } } return this; } public Iterator iterator(Snapshot snapshot) { + if (invalid) + return INVALID; + return new Iterator<>() { int i = firstSetBit(); @@ -294,24 +308,22 @@ CommandStore create(int id, EpochUpdateHolder rangesForEpoch) public static class ShardHolder { public final CommandStore store; + @Nullable final Ranges regainsRanges; RangesForEpoch ranges; - ShardHolder(CommandStore store) + ShardHolder(CommandStore store, @Nullable Ranges regainsRanges) { this.store = store; + this.regainsRanges = regainsRanges; } - public ShardHolder(CommandStore store, RangesForEpoch ranges) + public ShardHolder(CommandStore store, RangesForEpoch ranges, @Nullable Ranges regainsRanges) { this.store = store; + this.regainsRanges = regainsRanges; this.ranges = ranges; } - public ShardHolder withStoreUnsafe(CommandStore store) - { - return new ShardHolder(store, ranges); - } - public RangesForEpoch ranges() { return ranges; @@ -334,12 +346,77 @@ public interface RangesForEpochSupplier RangesForEpoch ranges(); } + public static final class PreviouslyOwned + { + public static final PreviouslyOwned EMPTY = new PreviouslyOwned(0, RangesForEpoch.EMPTY.epochs, RangesForEpoch.EMPTY.ranges); + final long maxEpoch; + final long[] epochs; // the epoch upon which it was last owned + final Ranges[] ranges; + + public PreviouslyOwned(long maxEpoch, long[] epochs, Ranges[] ranges) + { + this.maxEpoch = maxEpoch; + this.epochs = epochs; + this.ranges = ranges; + } + + PreviouslyOwned prepend(long epoch, Ranges ranges) + { + Invariants.require(epochs.length == 0 || epoch > epochs[0]); + long[] newEpochs = new long[this.epochs.length + 1]; + Ranges[] newRanges = new Ranges[this.epochs.length + 1]; + newEpochs[0] = epoch; + newRanges[0] = ranges; + System.arraycopy(this.epochs, 0, newEpochs, 1, this.epochs.length); + System.arraycopy(this.ranges, 0, newRanges, 1, this.ranges.length); + return new PreviouslyOwned(epoch, newEpochs, newRanges); + } + + public boolean overlaps(long epoch, Unseekables test) + { + if (epoch > maxEpoch) + return false; + + for (int i = 0 ; i < epochs.length && epoch <= epochs[i] ; ++i) + { + if (this.ranges[i].intersects(test)) + return true; + } + + return false; + } + + public Ranges regains(Ranges overlapping) + { + Ranges regains = Ranges.EMPTY; + for (Ranges rs : this.ranges) + regains = regains.without(rs.slice(overlapping, Minimal)); + return regains; + } + + public int size() + { + return epochs.length; + } + + public long epochs(int i) + { + return epochs[i]; + } + + public Ranges ranges(int i) + { + return ranges[i]; + } + } + // We ONLY remove ranges to keep logic manageable; likely to only merge CommandStores into a new CommandStore via some kind of Bootstrap public static class RangesForEpoch { + public static final RangesForEpoch EMPTY = new RangesForEpoch(new long[0], new Ranges[0]); + final long[] epochs; final Ranges[] ranges; - public static final RangesForEpoch EMPTY = new RangesForEpoch(new long[0], new Ranges[0]); public RangesForEpoch(long epoch, Ranges ranges) { @@ -371,7 +448,7 @@ public boolean equals(Object object) if (this == object) return true; if (object == null || getClass() != object.getClass()) return false; RangesForEpoch that = (RangesForEpoch) object; - return Objects.deepEquals(epochs, that.epochs) && Objects.deepEquals(ranges, that.ranges); + return Arrays.equals(epochs, that.epochs) && Arrays.equals(ranges, that.ranges); } @Override @@ -578,9 +655,9 @@ public static class Snapshot extends Journal.TopologyUpdate implements Iterable< private final int[] indexForRange; final SearchableRangeList lookupByRange; - public Snapshot(ShardHolder[] shards, Topology local, Topology global) + public Snapshot(ShardHolder[] shards, Topology local, Topology global, PreviouslyOwned previouslyOwned) { - super(asMap(shards), global); + super(asMap(shards), global, previouslyOwned); this.local = local; this.shards = shards; this.byId = new Int2IntHashMap(shards.length, Hashing.DEFAULT_LOAD_FACTOR, -1); @@ -630,7 +707,7 @@ class RangeAndIndex // This method exists to ensure we do not hold references to command stores public Journal.TopologyUpdate asTopologyUpdate() { - return new Journal.TopologyUpdate(commandStores, global); + return new Journal.TopologyUpdate(commandStores, global, previouslyOwned); } private static Int2ObjectHashMap asMap(ShardHolder[] shards) @@ -664,7 +741,7 @@ private CommandStores(StoreSupplier supplier, ShardDistributor shardDistributor, this.supplier = supplier; this.shardDistributor = shardDistributor; - this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY); + this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY, PreviouslyOwned.EMPTY); this.journal = journal; } @@ -748,6 +825,8 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top List> bootstrapUpdates = new ArrayList<>(); List result = new ArrayList<>(prev.shards.length + added.size()); + PreviouslyOwned previouslyOwned = prev.previouslyOwned; + for (ShardHolder shard : prev.shards) { Ranges current = shard.ranges().currentRanges(); @@ -757,8 +836,14 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top // TODO (required): This is updating the a non-volatile field in the previous Snapshot, why modify it at all, even with volatile the guaranteed visibility is weak even with mutual exclusion shard.ranges = shard.ranges().withRanges(newTopology.epoch(), current.without(subtracted)); shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); + bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); } + + Ranges regainedRanges = shard.ranges().all().slice(added, Minimal); + if (!regainedRanges.isEmpty()) + bootstrapUpdates.add(() -> EpochReady.all(epoch, shard.store.markPermanentlyUnsafeToRead(regainedRanges).beginAsResult())); + // TODO (desired): only sync affected shards Ranges ranges = shard.ranges().currentRanges(); // ranges can be empty when ranges are lost or consolidated across epochs. @@ -767,6 +852,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top logger.debug("Epoch {} requires visibility sync for {}", epoch, ranges); bootstrapUpdates.add(shard.store.refreshReadyToCoordinate(node, ranges, epoch)); } + result.add(shard); } @@ -778,7 +864,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top EpochUpdateHolder updateHolder = new EpochUpdateHolder(); RangesForEpoch rangesForEpoch = new RangesForEpoch(epoch, addRanges); updateHolder.add(epoch, rangesForEpoch, addRanges); - ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder)); + ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder), previouslyOwned.regains(addRanges)); shard.ranges = rangesForEpoch; Map partitioned = addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range), BootstrapRangeAction.class); @@ -812,7 +898,11 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top ); }; } - return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology), bootstrap); + + if (!subtracted.isEmpty()) + previouslyOwned = previouslyOwned.prepend(epoch - 1, subtracted); + + return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology, previouslyOwned), bootstrap); } private static boolean requiresSync(Ranges ranges, Topology oldTopology, Topology newTopology) @@ -928,26 +1018,31 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { Snapshot snapshot = current; Iterator stores = selector.select(snapshot); + if (stores == INVALID) + return AsyncChains.failure(new OverlappingCommandStoresException()); + AsyncChain chain = null; while (stores.hasNext()) { - AsyncChain next = mapReduceConsume.applyAsync(stores.next()); + CommandStore store = stores.next(); + AsyncChain next = mapReduceConsume.applyAsync(store); if (next != null) chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } + return chain == null ? AsyncChains.success(null) : chain; } - public O mapReduceUnsafe(StoreSelector selector, Function map, BiFunction reduce, O accumulator) + private static boolean unsafelyTouchesRegainedRanges(Snapshot snapshot, ShardHolder shard, Unseekables unseekables, long minEpoch) { - Snapshot snapshot = current; - Iterator stores = selector.select(snapshot); - while (stores.hasNext()) - { - O next = map.apply(stores.next()); - accumulator = reduce.apply(accumulator, next); - } - return accumulator; + if (shard.regainsRanges == null) + return false; + + unseekables = unseekables.slice(shard.regainsRanges, Minimal); + if (unseekables.isEmpty()) + return false; + + return snapshot.previouslyOwned.overlaps(minEpoch, unseekables); } /** @@ -961,21 +1056,21 @@ public synchronized void initializeTopologyUnsafe(Journal.TopologyUpdate update) int maxId = -1; for (Map.Entry e : update.commandStores.entrySet()) { - Invariants.require(e.getValue() != null); + RangesForEpoch rfe = e.getValue(); + Invariants.require(rfe != null); EpochUpdateHolder holder = new EpochUpdateHolder(); - holder.add(1, e.getValue(), e.getValue().all()); - shards[i++] = new ShardHolder(supplier.create(e.getKey(), holder), e.getValue()); + holder.add(1, rfe, rfe.all()); + shards[i++] = new ShardHolder(supplier.create(e.getKey(), holder), rfe, update.previouslyOwned.regains(rfe.all())); maxId = Math.max(maxId, e.getKey()); } Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id)); nextId = maxId + 1; - loadSnapshot(new Snapshot(shards, update.global.forNode(supplier.node.id()).trim(), update.global)); + loadSnapshot(new Snapshot(shards, update.global.forNode(supplier.node.id()).trim(), update.global, update.previouslyOwned)); } public synchronized void resetTopology(Journal.TopologyUpdate update) { - // TODO: assert Snapshot current = this.current; Invariants.require(update.global.epoch() == current.local.epoch()); ShardHolder[] shards = new ShardHolder[current.commandStores.size()]; @@ -983,11 +1078,11 @@ public synchronized void resetTopology(Journal.TopologyUpdate update) for (Map.Entry e : update.commandStores.entrySet()) { int storeId = e.getKey(); - RangesForEpoch ranges = e.getValue(); - Invariants.require(ranges != null); - ShardHolder shard = new ShardHolder(current.byId(storeId), ranges); + RangesForEpoch rfe = e.getValue(); + Invariants.require(rfe != null); + ShardHolder shard = new ShardHolder(current.byId(storeId), rfe, update.previouslyOwned.regains(rfe.all())); EpochUpdateHolder holder = shard.store.epochUpdateHolder; - ranges.forEach(new BiConsumer<>() + rfe.forEach(new BiConsumer<>() { RangesForEpoch accumulator = null; Ranges prev = null; @@ -1020,7 +1115,7 @@ public void accept(Long epoch, Ranges ranges) } nextId = maxId + 1; - loadSnapshot(new Snapshot(shards, current.local, current.global)); + loadSnapshot(new Snapshot(shards, current.local, current.global, update.previouslyOwned)); } public synchronized Supplier updateTopology(Node node, Topology newTopology) @@ -1044,6 +1139,27 @@ public synchronized Supplier updateTopology(Node node, Topology newT return update.bootstrap; } + public synchronized void removeCommandStoresBefore(long beforeEpoch) + { + List keep = new ArrayList<>(current.shards.length); + for (ShardHolder shard : current) + { + RangesForEpoch ranges = shard.ranges; + int size = ranges.size(); + long lastEpochWithOwnedRange = ranges.epochAtIndex(size - 1) - 1; + if (!ranges.rangesAtIndex(size - 1).isEmpty() || lastEpochWithOwnedRange >= beforeEpoch) + keep.add(shard); + } + + if (keep.size() != current.shards.length) + { + Snapshot snapshot = new Snapshot(keep.toArray(new ShardHolder[0]), current().local, current.global, current.previouslyOwned); + AsyncResults.SettableResult flush = new AsyncResults.SettableWithDescription<>("Write Topology To Journal"); + journal.saveTopology(snapshot.asTopologyUpdate(), () -> flush.setSuccess(null)); + current = snapshot; + } + } + public void shutdown() { for (ShardHolder shard : current.shards) diff --git a/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java b/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java new file mode 100644 index 0000000000..f1e358d655 --- /dev/null +++ b/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.local; + +import accord.utils.Rethrowable; + +public class OverlappingCommandStoresException extends RuntimeException implements Rethrowable +{ + public OverlappingCommandStoresException() + { + } + + private OverlappingCommandStoresException(Throwable cause) + { + super(cause); + } + + @Override + public OverlappingCommandStoresException rethrowable() + { + return new OverlappingCommandStoresException(this); + } +} diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 99faad7a0b..8d0b06f6b7 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -29,6 +29,7 @@ import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.RoutingKey; +import accord.local.CommandStores.RangesForEpoch; import accord.local.CommandStores.RangesForEpochSupplier; import accord.local.RedundantBefore.RedundantBeforeSupplier; import accord.local.cfk.CommandsForKey; @@ -384,7 +385,12 @@ public void setSafeToRead(NavigableMap newSafeToRead) commandStore().unsafeSetSafeToRead(newSafeToRead); } - public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) + public void setPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) + { + commandStore().unsafeSetPermanentlyUnsafeToRead(newPermanentlyUnsafeToRead); + } + + public void setRangesForEpoch(RangesForEpoch rangesForEpoch) { commandStore().unsafeSetRangesForEpoch(rangesForEpoch); } @@ -571,7 +577,7 @@ private static void registerTransitive(SafeCommandStore safeStore, TxnId txnId, if (safeCommand != null && safeCommand.current().known().route() != MaybeRoute) return; - CommandStores.RangesForEpoch rangesForEpoch = safeStore.ranges(); + RangesForEpoch rangesForEpoch = safeStore.ranges(); // TODO (required): this is incompatible with rebootstrap - we need to use some additional condition witnessedBy = witnessedBy.without(rangesForEpoch.coordinates(txnId)); // already coordinates, no need to replicate if (witnessedBy.isEmpty()) @@ -589,7 +595,7 @@ private static void registerTransitive(SafeCommandStore safeStore, TxnId txnId, public abstract Agent agent(); public abstract ProgressLog progressLog(); public abstract NodeCommandStoreService node(); - public abstract CommandStores.RangesForEpoch ranges(); + public abstract RangesForEpoch ranges(); protected NavigableMap bootstrapBeganAt() { diff --git a/accord-core/src/main/java/accord/topology/ActiveEpochs.java b/accord-core/src/main/java/accord/topology/ActiveEpochs.java index 5b1677ffba..1ba8a3e477 100644 --- a/accord-core/src/main/java/accord/topology/ActiveEpochs.java +++ b/accord-core/src/main/java/accord/topology/ActiveEpochs.java @@ -62,19 +62,31 @@ public final class ActiveEpochs implements Iterable // Epochs are sorted in _descending_ order final ActiveEpoch[] epochs; - ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long prevFirstNonEmptyEpoch) + private ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long firstNonEmptyEpoch) { this.manager = manager; this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; - if (prevFirstNonEmptyEpoch != -1) - this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; - else if (epochs.length > 0 && !epochs[0].all().isEmpty()) - this.firstNonEmptyEpoch = currentEpoch; - else - this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; - + this.firstNonEmptyEpoch = firstNonEmptyEpoch; + this.epochs = epochs; for (int i = 1; i < epochs.length; i++) Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); + } + + static ActiveEpochs empty(TopologyManager manager) + { + return new ActiveEpochs(manager, new ActiveEpoch[0], -1); + } + + ActiveEpochs withNewEpochs(ActiveEpoch[] epochs) + { + long firstNonEmptyEpoch = this.firstNonEmptyEpoch; + if (firstNonEmptyEpoch == -1 && epochs.length > 0 && !epochs[0].all().isEmpty()) + firstNonEmptyEpoch = currentEpoch; + return new ActiveEpochs(manager, epochs, firstNonEmptyEpoch); + } + + ActiveEpochs maybeTruncate() + { int truncateFrom = -1; // > 0 because we do not want to be left without epochs in case they're all empty for (int i = epochs.length - 1; i > 0; i--) @@ -86,28 +98,26 @@ else if (epochs.length > 0 && !epochs[0].all().isEmpty()) } if (truncateFrom == -1) + return this; + + ActiveEpoch[] newEpochs = Arrays.copyOf(epochs, truncateFrom);; + for (int i = truncateFrom; i < epochs.length; i++) { - this.epochs = epochs; + ActiveEpoch e = epochs[i]; + Invariants.require(epochs[i].isQuorumReady()); + logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.all.ranges, e.closed()); } - else + if (logger.isTraceEnabled()) { - this.epochs = Arrays.copyOf(epochs, truncateFrom); - for (int i = truncateFrom; i < epochs.length; i++) + for (int i = 0; i < truncateFrom; i++) { ActiveEpoch e = epochs[i]; - Invariants.require(epochs[i].isQuorumReady()); - logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.all.ranges, e.closed()); - } - if (logger.isTraceEnabled()) - { - for (int i = 0; i < truncateFrom; i++) - { - ActiveEpoch e = epochs[i]; - Invariants.require(e.isQuorumReady()); - logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); - } + Invariants.require(e.isQuorumReady()); + logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); } } + + return withNewEpochs(newEpochs); } public boolean isEmpty() diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index cba5a4c20a..b0d9508662 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -765,6 +765,23 @@ public void forEach(Consumer forEach) forEach.accept(shards[i]); } + public static Map computeNodeAdditions(Topology current, Topology next) + { + Map additions = new HashMap<>(); + for (Id node : next.nodes()) + { + Ranges prev = current.rangesForNode(node); + if (prev == null) prev = Ranges.EMPTY; + + Ranges added = next.rangesForNode(node).without(prev); + if (added.isEmpty()) + continue; + + additions.put(node, added); + } + return additions; + } + public SortedArrayList nodes() { return nodes; diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 4ca397da9f..5f06885b0a 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -19,6 +19,7 @@ package accord.topology; import java.util.IdentityHashMap; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -52,6 +53,9 @@ import accord.utils.async.AsyncResults; import accord.utils.async.NestedAsyncResult; +import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; +import static accord.primitives.Routables.Slice.Minimal; + /** * Manages topology state changes and update bookkeeping * @@ -99,7 +103,7 @@ public TopologyManager(TopologySorter.Supplier sorter, Node node, TopologyServic this.time = time; this.timeouts = timeouts; this.topologyService = topologyService; - this.active = new ActiveEpochs(this, new ActiveEpoch[0], -1); + this.active = ActiveEpochs.empty(this); this.pending = new PendingEpochs(this); } @@ -169,6 +173,7 @@ public void onEpochRetired(Ranges ranges, TxnId txnId) private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) { + long removeCommandStoresBefore = 0; Topology topology = null; synchronized (this) { @@ -193,28 +198,20 @@ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) if (epoch > active.currentEpoch) ranges = pending.retired(ranges, epoch); ranges = active.retired(ranges, epoch); + ActiveEpochs truncated = active.maybeTruncate(); + if (truncated != active) + { + active = truncated; + removeCommandStoresBefore = truncated.minEpoch(); + } } if (!ranges.isEmpty()) { for (TopologyListener listener : listeners) listener.onEpochRetired(ranges, epoch, topology); } - } - - public synchronized void truncateTopologiesUntil(long epoch) - { - ActiveEpochs current = active; - Invariants.requireArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d is > current epoch %d", epoch, current.epoch()); - - if (current.minEpoch() >= epoch) - return; - - int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); - Invariants.require(current.epochs[newLen - 1].isQuorumReady(), "Epoch %d is not ready to coordinate", current.epochs[newLen - 1].epoch()); - - ActiveEpoch[] nextEpochs = new ActiveEpoch[newLen]; - System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); - active = new ActiveEpochs(this, nextEpochs, current.firstNonEmptyEpoch); + if (removeCommandStoresBefore > 0) + node.commandStores().removeCommandStoresBefore(removeCommandStoresBefore); } public TopologySorter.Supplier sorter() @@ -308,6 +305,63 @@ public void reportTopology(Topology topology) updateActive(); } + public static class RegainingEpochRange + { + public final long epoch; + public final Ranges ranges; + + public RegainingEpochRange(long epoch, Ranges ranges) + { + this.epoch = epoch; + this.ranges = ranges; + } + + public long epoch() + { + return epoch; + } + + public Ranges ranges() + { + return ranges; + } + } + + @Nullable + public RegainingEpochRange computeRegaining(Topology current, Topology next) + { + Map additions = Topology.computeNodeAdditions(current, next); + long greatestEpoch = -1; + Ranges ranges = Ranges.EMPTY; + + ActiveEpochs active = this.active; + for (Map.Entry entry : additions.entrySet()) + { + Ranges addingForNode = entry.getValue(); + for (ActiveEpoch e : active) + { + addingForNode = addingForNode.without(e.removedRanges).without(e.retired()); + if (addingForNode.isEmpty()) + break; + + Ranges existingForNode = e.all().rangesForNode(entry.getKey()); + Ranges regainingForNode = addingForNode.slice(existingForNode, Minimal); + if (!regainingForNode.isEmpty()) + { + greatestEpoch = Math.max(greatestEpoch, e.epoch()); + ranges = ranges.union(MERGE_ADJACENT, regainingForNode); + addingForNode = addingForNode.without(regainingForNode); + } + addingForNode = addingForNode.without(e.addedRanges); + } + } + + if (greatestEpoch != -1) + return new RegainingEpochRange(greatestEpoch, ranges); + + return null; + } + private final AtomicBoolean updatingActive = new AtomicBoolean(); private void updateActive() { @@ -374,7 +428,7 @@ private void updateActive() } } - this.active = new ActiveEpochs(this, next, prev.firstNonEmptyEpoch); + this.active = prev.withNewEpochs(next); this.pending.removeFirst(topology.epoch); } diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 4cde69a4c5..a9b47c36ed 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -18,14 +18,7 @@ package accord.impl.basic; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Queue; +import java.util.*; import java.util.concurrent.Callable; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -85,6 +78,7 @@ public static CommandStores.Factory factory(PendingQueue pending, CacheLoading i public void validateShardStateForTesting(Journal.TopologyUpdate lastUpdate) { + PreviouslyOwned previouslyOwned = lastUpdate.previouslyOwned; ShardHolder[] shards = new ShardHolder[lastUpdate.commandStores.size()]; int i = 0; for (Map.Entry e : lastUpdate.commandStores.entrySet()) @@ -95,17 +89,21 @@ public void validateShardStateForTesting(Journal.TopologyUpdate lastUpdate) for (ShardHolder shard : current) { if (shard.ranges().equals(ranges)) + { + Invariants.require(commandStore == null); commandStore = shard.store; + } } Invariants.nonNull(commandStore, "Each set of ranges should have a corresponding command store, but %d did not:(%s)", ranges, Arrays.toString(shards)) .restore(); - ShardHolder shard = new ShardHolder(commandStore, e.getValue()); + + ShardHolder shard = new ShardHolder(commandStore, ranges, previouslyOwned.regains(ranges.all())); shards[i++] = shard; } Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id())); - loadSnapshot(new Snapshot(shards, lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global)); + loadSnapshot(new Snapshot(shards, lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global, lastUpdate.previouslyOwned)); } protected void loadSnapshot(Snapshot nextSnapshot) diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java index 68152ae9e0..30ed0fd414 100644 --- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java @@ -283,13 +283,6 @@ void truncateTopologyHistory() Assertions.assertTrue(service.active().hasEpoch(2)); Assertions.assertTrue(service.active().hasEpoch(3)); Assertions.assertTrue(service.active().hasEpoch(4)); - - service.truncateTopologiesUntil(3); - Assertions.assertFalse(service.active().hasEpoch(1)); - Assertions.assertFalse(service.active().hasEpoch(2)); - Assertions.assertTrue(service.active().hasEpoch(3)); - Assertions.assertTrue(service.active().hasEpoch(4)); - } @Test diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java index a8f8490acd..c778e707c1 100644 --- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java +++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java @@ -413,33 +413,37 @@ private static int prefix(Shard shard) return ((PrefixedIntHashKey) shard.range.start()).prefix; } - private static Map getAdditions(Topology current, Topology next) + private boolean validToReassignRange(Topology current, Shard[] nextShards, Map previouslyReplicated) { - Map additions = new HashMap<>(); - for (Id node : next.nodes()) - { - Ranges prev = current.rangesForNode(node); - if (prev == null) prev = Ranges.EMPTY; - - Ranges added = next.rangesForNode(node).without(prev); - if (added.isEmpty()) - continue; + Topology next = new Topology(current.epoch + 1, nextShards); + Map additions = Topology.computeNodeAdditions(current, next); - additions.put(node, added); + for (Map.Entry entry : additions.entrySet()) + { + if (previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY).intersects(entry.getValue()) + && !(previousEpochForRegainedRangeRetired(current, entry.getValue()))) + return false; } - return additions; + + return true; } - private static boolean reassignsRanges(Topology current, Shard[] nextShards, Map previouslyReplicated) + private boolean previousEpochForRegainedRangeRetired(Topology current, Ranges regainingRanges) { - Topology next = new Topology(current.epoch + 1, nextShards); - Map additions = getAdditions(current, next); - - for (Map.Entry entry : additions.entrySet()) + for (Id id : current.nodes()) { - if (previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY).intersects(entry.getValue())) + Node node = this.nodeLookup.apply(id); + boolean isRetiredForNode = true; + for (ActiveEpoch epoch : node.topology().active()) + { + if (epoch.all().ranges.intersects(regainingRanges) && !epoch.allRetired()) + isRetiredForNode = false; + } + + if (isRetiredForNode) return true; } + return false; } @@ -472,7 +476,7 @@ public synchronized Topology updateTopology() Shard[] testShards = type.apply(state, random); Arrays.sort(testShards, (a, b) -> a.range.compareTo(b.range)); if (!everyShardHasQuorumOverlaps(oldShards, testShards) - || reassignsRanges(current, testShards, previouslyReplicated)) + || !validToReassignRange(current, testShards, previouslyReplicated)) { ++rejectedMutations; } @@ -488,7 +492,7 @@ public synchronized Topology updateTopology() Topology nextTopology = new Topology(current.epoch + 1, newShards); - Map nextAdditions = getAdditions(current, nextTopology); + Map nextAdditions = Topology.computeNodeAdditions(current, nextTopology); for (Map.Entry entry : nextAdditions.entrySet()) { previouslyReplicated.merge(entry.getKey(), entry.getValue(), (a, b) -> a.union(MERGE_ADJACENT, b));