From 6d5e859fd40ef790f887e90214a09458d711a343 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 15:39:00 -0800 Subject: [PATCH 01/35] initial --- .../main/java/accord/local/CommandStore.java | 19 +++++ .../main/java/accord/local/CommandStores.java | 74 ++++++++++++++----- .../java/accord/local/SafeCommandStore.java | 5 ++ .../java/accord/topology/TopologyManager.java | 58 ++++++++++++++- .../accord/topology/TopologyRandomizer.java | 44 ++++++----- 5 files changed, 160 insertions(+), 40 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index d321bc6794..4aab5a02df 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -187,6 +187,7 @@ CommandStore create(int id, * But they may still be ordered for other key ranges they participate in. */ private NavigableMap safeToRead = emptySafeToRead(); + private Ranges retiredRanges = Ranges.EMPTY; private final Set bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private RejectBefore rejectBefore; @@ -401,10 +402,21 @@ protected void unsafeAcceptRequests(Ranges accept) */ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { + for (Map.Entry entry : newSafeToRead.entrySet()) + { + Ranges rangeExcluded = entry.getValue().without(this.retiredRanges); + logger.info("{} is excluded from newSafeToRead because it is in the retired range", rangeExcluded); + } + node.updateStamp(); this.safeToRead = newSafeToRead; } + final void unsafeAddToRetiredRanges(Ranges newRetiredRange) + { + this.retiredRanges = newRetiredRange.union(MERGE_ADJACENT, this.retiredRanges); + } + protected final void unsafeClearSafeToRead() { unsafeSetSafeToRead(null); @@ -1181,6 +1193,13 @@ final void markUnsafeToRead(Ranges ranges) } } + final void markAsRetired(Ranges ranges) + { + execute((Empty) () -> "Mark Range As Retired", safeStore -> { + safeStore.addToRetiredRanges(ranges); + }, agent); + } + public final DataStore unsafeGetDataStore() { return dataStore; diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9a6e2d29af..6dc7a53a53 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -18,14 +18,7 @@ package accord.local; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -37,6 +30,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.primitives.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -52,16 +46,6 @@ import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.local.CommandStore.EpochUpdateHolder; -import accord.primitives.AbstractRanges; -import accord.primitives.AbstractUnseekableKeys; -import accord.primitives.EpochSupplier; -import accord.primitives.Participants; -import accord.primitives.Range; -import accord.primitives.Ranges; -import accord.primitives.RoutingKeys; -import accord.primitives.Timestamp; -import accord.primitives.TxnId; -import accord.primitives.Unseekables; import accord.topology.Shard; import accord.topology.Topology; import accord.utils.IndexedQuadConsumer; @@ -577,6 +561,7 @@ public static class Snapshot extends Journal.TopologyUpdate implements Iterable< final Int2IntHashMap byId; private final int[] indexForRange; final SearchableRangeList lookupByRange; + final Map overlappingCommandStores; public Snapshot(ShardHolder[] shards, Topology local, Topology global) { @@ -625,6 +610,21 @@ class RangeAndIndex indexForRange[i] = rangesAndIndexes[i].index; } lookupByRange = SearchableRangeList.build(ranges); + + overlappingCommandStores = new HashMap<>(); + for (ShardHolder shard : shards) + { + overlappingCommandStores.put(shard.store.id, new LargeBitSet(shards.length)); + for (Map.Entry entry : overlappingCommandStores.entrySet()) + { + Integer id = entry.getKey(); + if (!shard.ranges().all().overlapping(shards[byId.get(id)].ranges.all()).isEmpty()) + { + overlappingCommandStores.get(shard.store.id).set(id); + entry.getValue().set(shard.store.id); + } + } + } } // This method exists to ensure we do not hold references to command stores @@ -759,6 +759,14 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); } + + Ranges regainedRanges = shard.ranges().all().overlapping(added); + if (!regainedRanges.isEmpty()) + { + shard.store.markUnsafeToRead(regainedRanges); + shard.store.markAsRetired(regainedRanges); + } + // TODO (desired): only sync affected shards Ranges ranges = shard.ranges().currentRanges(); // ranges can be empty when ranges are lost or consolidated across epochs. @@ -924,17 +932,45 @@ public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandSt return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); } + /* Do all reads and writes go through this? */ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; Iterator stores = selector.select(snapshot); AsyncChain chain = null; + LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length); while (stores.hasNext()) { - AsyncChain next = mapReduceConsume.applyAsync(stores.next()); + CommandStore store = stores.next(); + bitSet.set(store.id()); + AsyncChain next = mapReduceConsume.applyAsync(store); if (next != null) chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } + + // Check that we are not querying two command stores for the same range + for (Map.Entry entry : snapshot.overlappingCommandStores.entrySet()) + { + List overlapping = new ArrayList<>(); + LargeBitSet overlappingCommandStores = entry.getValue(); + for (int i = overlappingCommandStores.firstSetBit(); i >= 0 ; i = overlappingCommandStores.nextSetBit(i + 1, -1)) + { + if (bitSet.get(i)) + overlapping.add(i); + } + + Ranges range = Ranges.EMPTY; + for (Integer id : overlapping) + { + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(id).rangesForEpoch.all()); + if (!range.overlapping(touchedKeys).isEmpty()) + { + throw illegalState("We should not query the same range from two different command stores."); + } + range = range.with(mapReduceConsume.keys().overlapping(snapshot.byId(id).rangesForEpoch.all()).toRanges()); + } + } + return chain == null ? AsyncChains.success(null) : chain; } diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 99faad7a0b..217cd572d2 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -384,6 +384,11 @@ public void setSafeToRead(NavigableMap newSafeToRead) commandStore().unsafeSetSafeToRead(newSafeToRead); } + public void addToRetiredRanges(Ranges newRetiredRange) + { + commandStore().unsafeAddToRetiredRanges(newRetiredRange); + } + public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) { commandStore().unsafeSetRangesForEpoch(rangesForEpoch); diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 4ca397da9f..90f21bd678 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -18,7 +18,7 @@ package accord.topology; -import java.util.IdentityHashMap; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -283,6 +283,62 @@ protected Executor executor() return Runnable::run; } + public static Map getAdditions(Topology current, Topology next) + { + Map additions = new HashMap<>(); + for (Id node : next.nodes()) + { + Ranges prev = current.rangesForNode(node); + if (prev == null) prev = Ranges.EMPTY; + + Ranges added = next.rangesForNode(node).without(prev); + if (added.isEmpty()) + continue; + + additions.put(node, added); + } + return additions; + } + + public List epochsWeAreRegainingRangesFor(Topology curr, Topology next) + { + ArrayList epochs = new ArrayList<>(); + + synchronized (this) + { + Map additions = getAdditions(curr, next); + for (Map.Entry entry : additions.entrySet()) + { + for (ActiveEpoch activeEpoch : this.active) { + if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue())) + epochs.add(activeEpoch.epoch()); + } + } + } + + return epochs; + } + + public void blockUntilAllEpochsRetired(List epochs) + { + try + { + int index = 0; + synchronized (this) + { + while (index != epochs.size()) + { + for (int i = index; i < epochs.size(); i++) + if (this.active.get(epochs.get(i)).allRetired()) + index += 1; + wait(); + } + } + } catch (Throwable t) { + logger.error("Uncaught exception", t); + } + } + public void reportTopology(Topology topology) { PendingEpoch e; diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java index a8f8490acd..2725ab2089 100644 --- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java +++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java @@ -413,33 +413,37 @@ private static int prefix(Shard shard) return ((PrefixedIntHashKey) shard.range.start()).prefix; } - private static Map getAdditions(Topology current, Topology next) + private boolean validToReassignRange(Topology current, Shard[] nextShards, Map previouslyReplicated) { - Map additions = new HashMap<>(); - for (Id node : next.nodes()) - { - Ranges prev = current.rangesForNode(node); - if (prev == null) prev = Ranges.EMPTY; - - Ranges added = next.rangesForNode(node).without(prev); - if (added.isEmpty()) - continue; + Topology next = new Topology(current.epoch + 1, nextShards); + Map additions = TopologyManager.getAdditions(current, next); - additions.put(node, added); + for (Map.Entry entry : additions.entrySet()) + { + if (previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY).intersects(entry.getValue()) + && !(previousEpochForRegainedRangeRetired(current, entry.getValue()))) + return false; } - return additions; + + return true; } - private static boolean reassignsRanges(Topology current, Shard[] nextShards, Map previouslyReplicated) + private boolean previousEpochForRegainedRangeRetired(Topology current, Ranges regainingRanges) { - Topology next = new Topology(current.epoch + 1, nextShards); - Map additions = getAdditions(current, next); - - for (Map.Entry entry : additions.entrySet()) + for (Id id : current.nodes()) { - if (previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY).intersects(entry.getValue())) + Node node = this.nodeLookup.apply(id); + boolean isRetiredForNode = true; + for (ActiveEpoch epoch : node.topology().active()) + { + if (epoch.all().ranges.intersects(regainingRanges) && !epoch.allRetired()) + isRetiredForNode = false; + } + + if (isRetiredForNode) return true; } + return false; } @@ -472,7 +476,7 @@ public synchronized Topology updateTopology() Shard[] testShards = type.apply(state, random); Arrays.sort(testShards, (a, b) -> a.range.compareTo(b.range)); if (!everyShardHasQuorumOverlaps(oldShards, testShards) - || reassignsRanges(current, testShards, previouslyReplicated)) + || !validToReassignRange(current, testShards, previouslyReplicated)) { ++rejectedMutations; } @@ -488,7 +492,7 @@ public synchronized Topology updateTopology() Topology nextTopology = new Topology(current.epoch + 1, newShards); - Map nextAdditions = getAdditions(current, nextTopology); + Map nextAdditions = TopologyManager.getAdditions(current, nextTopology); for (Map.Entry entry : nextAdditions.entrySet()) { previouslyReplicated.merge(entry.getKey(), entry.getValue(), (a, b) -> a.union(MERGE_ADJACENT, b)); From 3241b640e17952ea6f4dfd468f2de45880abad3c Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 15:44:41 -0800 Subject: [PATCH 02/35] formatting --- .../main/java/accord/local/CommandStores.java | 22 ++++++++++++++++--- .../java/accord/topology/TopologyManager.java | 6 ++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 6dc7a53a53..be75c17e37 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -18,7 +18,15 @@ package accord.local; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.HashMap; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -30,7 +38,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import accord.primitives.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -46,6 +53,16 @@ import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.local.CommandStore.EpochUpdateHolder; +import accord.primitives.AbstractRanges; +import accord.primitives.AbstractUnseekableKeys; +import accord.primitives.EpochSupplier; +import accord.primitives.Participants; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.RoutingKeys; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; import accord.topology.Shard; import accord.topology.Topology; import accord.utils.IndexedQuadConsumer; @@ -932,7 +949,6 @@ public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandSt return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); } - /* Do all reads and writes go through this? */ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 90f21bd678..57f2656199 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -18,7 +18,11 @@ package accord.topology; -import java.util.*; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; From 53149bb935710a4a7c49277a16dcd15e11f51d8a Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 15:57:14 -0800 Subject: [PATCH 03/35] inlined loop --- .../main/java/accord/local/CommandStores.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index be75c17e37..5a6865db9f 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -967,23 +967,19 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore // Check that we are not querying two command stores for the same range for (Map.Entry entry : snapshot.overlappingCommandStores.entrySet()) { - List overlapping = new ArrayList<>(); + Ranges range = Ranges.EMPTY; LargeBitSet overlappingCommandStores = entry.getValue(); for (int i = overlappingCommandStores.firstSetBit(); i >= 0 ; i = overlappingCommandStores.nextSetBit(i + 1, -1)) { if (bitSet.get(i)) - overlapping.add(i); - } - - Ranges range = Ranges.EMPTY; - for (Integer id : overlapping) - { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(id).rangesForEpoch.all()); - if (!range.overlapping(touchedKeys).isEmpty()) { - throw illegalState("We should not query the same range from two different command stores."); + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()); + + if (!range.overlapping(touchedKeys).isEmpty()) + throw illegalState("We should not query the same range from two different command stores."); + + range = range.with(mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()).toRanges()); } - range = range.with(mapReduceConsume.keys().overlapping(snapshot.byId(id).rangesForEpoch.all()).toRanges()); } } From 97b0e97da99635ddfe9cf1eb4dbd1b1239822e1d Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 16:00:39 -0800 Subject: [PATCH 04/35] null check --- accord-core/src/main/java/accord/local/CommandStore.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 4aab5a02df..9bcd1be480 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -402,10 +402,13 @@ protected void unsafeAcceptRequests(Ranges accept) */ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { - for (Map.Entry entry : newSafeToRead.entrySet()) + if (newSafeToRead != null) { - Ranges rangeExcluded = entry.getValue().without(this.retiredRanges); - logger.info("{} is excluded from newSafeToRead because it is in the retired range", rangeExcluded); + for (Map.Entry entry : newSafeToRead.entrySet()) + { + Ranges rangeExcluded = entry.getValue().without(this.retiredRanges); + logger.info("{} is excluded from newSafeToRead because it is in the retired range", rangeExcluded); + } } node.updateStamp(); From 2663505ab71eed5170c63343243ec1b0a598ce83 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 20:00:22 -0800 Subject: [PATCH 05/35] inline --- accord-core/src/main/java/accord/local/CommandStores.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 5a6865db9f..4360f1e3e9 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -973,12 +973,12 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()); + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).unsafeGetRangesForEpoch().all()); if (!range.overlapping(touchedKeys).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); - range = range.with(mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()).toRanges()); + range = range.with(touchedKeys.toRanges()); } } } From 1d2dd2793105c120c4ced5fd0f149bc6d092e647 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 21:15:02 -0800 Subject: [PATCH 06/35] fix --- .../main/java/accord/local/CommandStores.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 4360f1e3e9..758b884787 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -629,16 +629,17 @@ class RangeAndIndex lookupByRange = SearchableRangeList.build(ranges); overlappingCommandStores = new HashMap<>(); - for (ShardHolder shard : shards) + for (int i = 0; i < shards.length; i++) + overlappingCommandStores.put(i, new LargeBitSet(shards.length)); + + for (int i = 0; i < shards.length; i++) { - overlappingCommandStores.put(shard.store.id, new LargeBitSet(shards.length)); - for (Map.Entry entry : overlappingCommandStores.entrySet()) + for (int j = i+1; j < shards.length; j++) { - Integer id = entry.getKey(); - if (!shard.ranges().all().overlapping(shards[byId.get(id)].ranges.all()).isEmpty()) + if (!shards[i].ranges().all().overlapping(shards[j].ranges().all()).isEmpty()) { - overlappingCommandStores.get(shard.store.id).set(id); - entry.getValue().set(shard.store.id); + overlappingCommandStores.get(i).set(j); + overlappingCommandStores.get(j).set(i); } } } @@ -973,7 +974,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).unsafeGetRangesForEpoch().all()); + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.shards[i].ranges().all()); if (!range.overlapping(touchedKeys).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); From 2fba86e2233bf3a05ae578086156a9243d556ae5 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 21:39:26 -0800 Subject: [PATCH 07/35] update --- accord-core/src/main/java/accord/topology/TopologyManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 57f2656199..fd6b266ebb 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -197,6 +197,7 @@ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) if (epoch > active.currentEpoch) ranges = pending.retired(ranges, epoch); ranges = active.retired(ranges, epoch); + notify(); } if (!ranges.isEmpty()) { From 3538f16b0e1c88396191e47a524c67cac889ddfb Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Feb 2026 11:51:27 -0800 Subject: [PATCH 08/35] fix npe --- .../src/main/java/accord/local/CommandStores.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 758b884787..0963f00c0b 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -629,8 +629,8 @@ class RangeAndIndex lookupByRange = SearchableRangeList.build(ranges); overlappingCommandStores = new HashMap<>(); - for (int i = 0; i < shards.length; i++) - overlappingCommandStores.put(i, new LargeBitSet(shards.length)); + for (ShardHolder shard : shards) + overlappingCommandStores.put(shard.store.id(), new LargeBitSet(shards.length)); for (int i = 0; i < shards.length; i++) { @@ -638,8 +638,8 @@ class RangeAndIndex { if (!shards[i].ranges().all().overlapping(shards[j].ranges().all()).isEmpty()) { - overlappingCommandStores.get(i).set(j); - overlappingCommandStores.get(j).set(i); + overlappingCommandStores.get(shards[i].store.id()).set(shards[j].store.id()); + overlappingCommandStores.get(shards[j].store.id()).set(shards[i].store.id()); } } } @@ -974,7 +974,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.shards[i].ranges().all()); + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()); if (!range.overlapping(touchedKeys).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); From a7bb9fffa56209c8add383b7415a9573c910b4f4 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Feb 2026 15:55:34 -0800 Subject: [PATCH 09/35] fix wrong use of overlapping --- accord-core/src/main/java/accord/local/CommandStores.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 0963f00c0b..11e3be908f 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -636,7 +636,7 @@ class RangeAndIndex { for (int j = i+1; j < shards.length; j++) { - if (!shards[i].ranges().all().overlapping(shards[j].ranges().all()).isEmpty()) + if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty()) { overlappingCommandStores.get(shards[i].store.id()).set(shards[j].store.id()); overlappingCommandStores.get(shards[j].store.id()).set(shards[i].store.id()); @@ -778,7 +778,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); } - Ranges regainedRanges = shard.ranges().all().overlapping(added); + Ranges regainedRanges = shard.ranges().all().slice(added, Minimal); if (!regainedRanges.isEmpty()) { shard.store.markUnsafeToRead(regainedRanges); @@ -974,9 +974,9 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()); + Ranges touchedKeys = mapReduceConsume.keys().toRanges().slice(snapshot.byId(i).rangesForEpoch.all(), Minimal); - if (!range.overlapping(touchedKeys).isEmpty()) + if (!range.slice(touchedKeys, Minimal).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); range = range.with(touchedKeys.toRanges()); From 226ddd0cd4930a4e2d2cff7ddef7215754605054 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Feb 2026 16:22:27 -0800 Subject: [PATCH 10/35] use shard index instead of command store id --- .../src/main/java/accord/local/CommandStores.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 11e3be908f..a8ce9a3329 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -629,8 +629,8 @@ class RangeAndIndex lookupByRange = SearchableRangeList.build(ranges); overlappingCommandStores = new HashMap<>(); - for (ShardHolder shard : shards) - overlappingCommandStores.put(shard.store.id(), new LargeBitSet(shards.length)); + for (int i = 0; i < shards.length ; i++) + overlappingCommandStores.put(i, new LargeBitSet(shards.length)); for (int i = 0; i < shards.length; i++) { @@ -638,8 +638,8 @@ class RangeAndIndex { if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty()) { - overlappingCommandStores.get(shards[i].store.id()).set(shards[j].store.id()); - overlappingCommandStores.get(shards[j].store.id()).set(shards[i].store.id()); + overlappingCommandStores.get(i).set(j); + overlappingCommandStores.get(j).set(i); } } } @@ -959,7 +959,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore while (stores.hasNext()) { CommandStore store = stores.next(); - bitSet.set(store.id()); + bitSet.set(snapshot.byId.get(store.id())); AsyncChain next = mapReduceConsume.applyAsync(store); if (next != null) chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; @@ -974,7 +974,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Ranges touchedKeys = mapReduceConsume.keys().toRanges().slice(snapshot.byId(i).rangesForEpoch.all(), Minimal); + Ranges touchedKeys = mapReduceConsume.keys().toRanges().slice(snapshot.shards[i].ranges().all(), Minimal); if (!range.slice(touchedKeys, Minimal).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); From aa61f854a32fefbbff75faefcf34d158a224769a Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 12:08:55 -0800 Subject: [PATCH 11/35] refactor --- .../main/java/accord/topology/Topology.java | 17 ++++++ .../java/accord/topology/TopologyManager.java | 57 ------------------- .../accord/topology/TopologyRandomizer.java | 4 +- 3 files changed, 19 insertions(+), 59 deletions(-) diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index cba5a4c20a..b0d9508662 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -765,6 +765,23 @@ public void forEach(Consumer forEach) forEach.accept(shards[i]); } + public static Map computeNodeAdditions(Topology current, Topology next) + { + Map additions = new HashMap<>(); + for (Id node : next.nodes()) + { + Ranges prev = current.rangesForNode(node); + if (prev == null) prev = Ranges.EMPTY; + + Ranges added = next.rangesForNode(node).without(prev); + if (added.isEmpty()) + continue; + + additions.put(node, added); + } + return additions; + } + public SortedArrayList nodes() { return nodes; diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index fd6b266ebb..b97790b350 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -197,7 +197,6 @@ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) if (epoch > active.currentEpoch) ranges = pending.retired(ranges, epoch); ranges = active.retired(ranges, epoch); - notify(); } if (!ranges.isEmpty()) { @@ -288,62 +287,6 @@ protected Executor executor() return Runnable::run; } - public static Map getAdditions(Topology current, Topology next) - { - Map additions = new HashMap<>(); - for (Id node : next.nodes()) - { - Ranges prev = current.rangesForNode(node); - if (prev == null) prev = Ranges.EMPTY; - - Ranges added = next.rangesForNode(node).without(prev); - if (added.isEmpty()) - continue; - - additions.put(node, added); - } - return additions; - } - - public List epochsWeAreRegainingRangesFor(Topology curr, Topology next) - { - ArrayList epochs = new ArrayList<>(); - - synchronized (this) - { - Map additions = getAdditions(curr, next); - for (Map.Entry entry : additions.entrySet()) - { - for (ActiveEpoch activeEpoch : this.active) { - if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue())) - epochs.add(activeEpoch.epoch()); - } - } - } - - return epochs; - } - - public void blockUntilAllEpochsRetired(List epochs) - { - try - { - int index = 0; - synchronized (this) - { - while (index != epochs.size()) - { - for (int i = index; i < epochs.size(); i++) - if (this.active.get(epochs.get(i)).allRetired()) - index += 1; - wait(); - } - } - } catch (Throwable t) { - logger.error("Uncaught exception", t); - } - } - public void reportTopology(Topology topology) { PendingEpoch e; diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java index 2725ab2089..c778e707c1 100644 --- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java +++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java @@ -416,7 +416,7 @@ private static int prefix(Shard shard) private boolean validToReassignRange(Topology current, Shard[] nextShards, Map previouslyReplicated) { Topology next = new Topology(current.epoch + 1, nextShards); - Map additions = TopologyManager.getAdditions(current, next); + Map additions = Topology.computeNodeAdditions(current, next); for (Map.Entry entry : additions.entrySet()) { @@ -492,7 +492,7 @@ public synchronized Topology updateTopology() Topology nextTopology = new Topology(current.epoch + 1, newShards); - Map nextAdditions = TopologyManager.getAdditions(current, nextTopology); + Map nextAdditions = Topology.computeNodeAdditions(current, nextTopology); for (Map.Entry entry : nextAdditions.entrySet()) { previouslyReplicated.merge(entry.getKey(), entry.getValue(), (a, b) -> a.union(MERGE_ADJACENT, b)); From 4b5b2a967f07b8f1533c5f8907176f41a9c0f090 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 14:37:55 -0800 Subject: [PATCH 12/35] added method for greatest epoch and range that needs to be retired --- .../java/accord/topology/TopologyManager.java | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index b97790b350..4511ee95b5 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -18,11 +18,8 @@ package accord.topology; -import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.Map; -import java.util.HashMap; -import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -31,6 +28,7 @@ import java.util.function.Supplier; import javax.annotation.Nullable; +import accord.primitives.Routables; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +54,8 @@ import accord.utils.async.AsyncResults; import accord.utils.async.NestedAsyncResult; +import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; + /** * Manages topology state changes and update bookkeeping * @@ -312,6 +312,56 @@ public void reportTopology(Topology topology) updateActive(); } + public static class regainingEpochRange + { + public final long epoch; + public final Ranges range; + + public regainingEpochRange(long epoch, Ranges range) + { + this.epoch = epoch; + this.range = range; + } + + public long epoch() + { + return epoch; + } + + public Ranges range() + { + return range; + } + } + + @Nullable + public regainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next) + { + Map additions = Topology.computeNodeAdditions(curr, next); + long greatestEpoch = -1; + Ranges range = Ranges.EMPTY; + + synchronized (this) + { + for (Map.Entry entry : additions.entrySet()) + { + for (ActiveEpoch activeEpoch : this.active) { + if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue()) && greatestEpoch < activeEpoch.epoch()) + { + greatestEpoch = activeEpoch.epoch(); + range = range.union(MERGE_ADJACENT, activeEpoch.all().rangesForNode(entry.getKey()).slice(entry.getValue(), Routables.Slice.Minimal)); + break; + } + } + } + } + + if (greatestEpoch != -1) + return new regainingEpochRange(greatestEpoch, range); + + return null; + } + private final AtomicBoolean updatingActive = new AtomicBoolean(); private void updateActive() { From 356c17352393d01dbd44e06f51b05161840c234f Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 16:31:19 -0800 Subject: [PATCH 13/35] fix class name --- .../src/main/java/accord/topology/TopologyManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 4511ee95b5..126e10078a 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -312,12 +312,12 @@ public void reportTopology(Topology topology) updateActive(); } - public static class regainingEpochRange + public static class RegainingEpochRange { public final long epoch; public final Ranges range; - public regainingEpochRange(long epoch, Ranges range) + public RegainingEpochRange(long epoch, Ranges range) { this.epoch = epoch; this.range = range; @@ -335,7 +335,7 @@ public Ranges range() } @Nullable - public regainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next) + public RegainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next) { Map additions = Topology.computeNodeAdditions(curr, next); long greatestEpoch = -1; From c5962adac56fbba5916d07b5f6c1bc6e95d6a409 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 16:31:50 -0800 Subject: [PATCH 14/35] fix --- accord-core/src/main/java/accord/topology/TopologyManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 126e10078a..52eedf507d 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -357,7 +357,7 @@ public RegainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next } if (greatestEpoch != -1) - return new regainingEpochRange(greatestEpoch, range); + return new RegainingEpochRange(greatestEpoch, range); return null; } From 8dbd41dbd424762172c15878cf4c26e41f158e8f Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 21:23:22 -0800 Subject: [PATCH 15/35] factor out method to test --- .../src/main/java/accord/local/CommandStores.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index a8ce9a3329..9a472e1f6f 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -965,6 +965,13 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } + Invariants.require(checkQueryDisjointRangesAcrossCommandStores(snapshot, bitSet, mapReduceConsume.keys().toRanges())); + + return chain == null ? AsyncChains.success(null) : chain; + } + + public boolean checkQueryDisjointRangesAcrossCommandStores(Snapshot snapshot, LargeBitSet commandStoresSeen, Ranges queryRange) + { // Check that we are not querying two command stores for the same range for (Map.Entry entry : snapshot.overlappingCommandStores.entrySet()) { @@ -972,19 +979,19 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore LargeBitSet overlappingCommandStores = entry.getValue(); for (int i = overlappingCommandStores.firstSetBit(); i >= 0 ; i = overlappingCommandStores.nextSetBit(i + 1, -1)) { - if (bitSet.get(i)) + if (commandStoresSeen.get(i)) { - Ranges touchedKeys = mapReduceConsume.keys().toRanges().slice(snapshot.shards[i].ranges().all(), Minimal); + Ranges touchedKeys = queryRange.slice(snapshot.shards[i].ranges().all(), Minimal); if (!range.slice(touchedKeys, Minimal).isEmpty()) - throw illegalState("We should not query the same range from two different command stores."); + return false; range = range.with(touchedKeys.toRanges()); } } } - return chain == null ? AsyncChains.success(null) : chain; + return true; } public O mapReduceUnsafe(StoreSelector selector, Function map, BiFunction reduce, O accumulator) From 8d930b759f4b6db56e14177598b63e264894f9ef Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 21:28:05 -0800 Subject: [PATCH 16/35] refactor to test --- .../src/main/java/accord/local/CommandStores.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9a472e1f6f..efdcd4aedf 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -965,23 +965,23 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - Invariants.require(checkQueryDisjointRangesAcrossCommandStores(snapshot, bitSet, mapReduceConsume.keys().toRanges())); + Invariants.require(checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges())); return chain == null ? AsyncChains.success(null) : chain; } - public boolean checkQueryDisjointRangesAcrossCommandStores(Snapshot snapshot, LargeBitSet commandStoresSeen, Ranges queryRange) + public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange) { // Check that we are not querying two command stores for the same range - for (Map.Entry entry : snapshot.overlappingCommandStores.entrySet()) + for (Map.Entry entry : overlappingCommandStores.entrySet()) { Ranges range = Ranges.EMPTY; - LargeBitSet overlappingCommandStores = entry.getValue(); - for (int i = overlappingCommandStores.firstSetBit(); i >= 0 ; i = overlappingCommandStores.nextSetBit(i + 1, -1)) + LargeBitSet overlappingCommandStore = entry.getValue(); + for (int i = overlappingCommandStore.firstSetBit(); i >= 0 ; i = overlappingCommandStore.nextSetBit(i + 1, -1)) { if (commandStoresSeen.get(i)) { - Ranges touchedKeys = queryRange.slice(snapshot.shards[i].ranges().all(), Minimal); + Ranges touchedKeys = queryRange.slice(shards[i].ranges().all(), Minimal); if (!range.slice(touchedKeys, Minimal).isEmpty()) return false; From a06d79e8176fe0db470717cf0c7e9ad52441a422 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 23:12:18 -0800 Subject: [PATCH 17/35] fix bug --- accord-core/src/main/java/accord/local/CommandStores.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index efdcd4aedf..0f65493577 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -634,7 +634,7 @@ class RangeAndIndex for (int i = 0; i < shards.length; i++) { - for (int j = i+1; j < shards.length; j++) + for (int j = i; j < shards.length; j++) { if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty()) { From 78fe590c2d40df1493e29a5cca9af860e1355d08 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 15:06:26 -0800 Subject: [PATCH 18/35] Refined check for querying > 1 command store for same range --- .../main/java/accord/local/CommandStores.java | 51 ++++++++++++++----- .../src/main/java/accord/local/Node.java | 2 +- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 0f65493577..14fccd56ed 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -110,9 +110,9 @@ class StandardLatentStoreSelector implements LatentStoreSelector @Override public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants) { - return snapshot -> StoreFinder.find(snapshot, participants) + return snapshot -> new StoreIterator(StoreFinder.find(snapshot, participants) .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()) - .iterator(snapshot); + .iterator(snapshot), txnId.epoch()); } } @@ -125,7 +125,7 @@ static LatentStoreSelector standard() public interface StoreSelector extends LatentStoreSelector { default StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants) { return this; } - Iterator select(Snapshot snapshot); + StoreIterator select(Snapshot snapshot); } public static class IncludingSpecificStoreSelector implements StoreSelector @@ -144,14 +144,35 @@ public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Particip StoreFinder finder = StoreFinder.find(snapshot, participants) .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()); finder.set(snapshot.byId.get(storeId)); - return finder.iterator(snapshot); + return new StoreIterator(finder.iterator(snapshot), txnId.epoch()); }; } @Override - public Iterator select(Snapshot snapshot) + public StoreIterator select(Snapshot snapshot) { - return Collections.singletonList(snapshot.byId(storeId)).iterator(); + return new StoreIterator(Collections.singletonList(snapshot.byId(storeId)).iterator(), null); + } + } + + public static class StoreIterator + { + public final Iterator StoreIterator; + public final Long minEpoch; + + public StoreIterator(Iterator StoreIterator, @Nullable Long minEpoch) + { + this.StoreIterator = StoreIterator; + this.minEpoch = minEpoch; + } + + public Iterator getStoreIterator() + { + return StoreIterator; + } + + public Long getMinEpoch() { + return minEpoch; } } @@ -176,7 +197,7 @@ public static StoreSelector selector(Unseekables unseekables, long minEpoch, return snapshot -> { StoreFinder finder = StoreFinder.find(snapshot, unseekables); finder.filter(snapshot, unseekables, minEpoch, maxEpoch); - return finder.iterator(snapshot); + return new StoreIterator(finder.iterator(snapshot), minEpoch); }; } @@ -878,7 +899,7 @@ public void forAllUnsafe(Consumer forEach) public AsyncChain forAll(String reason, Consumer forEach) { - return mapReduce(snapshot -> Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), new MapReduceCommandStores<>(RoutingKeys.EMPTY) + return mapReduce(snapshot -> new StoreIterator(Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), null), new MapReduceCommandStores<>(RoutingKeys.EMPTY) { @Override public Void reduce(Void o1, Void o2) { return null; } @Override public TxnId primaryTxnId() { return null; } @@ -947,13 +968,14 @@ public Cancellable mapReduceConsume(IntStream commandStoreIds, MapReduceCons public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandStores mapReduce) { - return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); + return mapReduce(snapshot -> new StoreIterator(commandStoreIds.mapToObj(snapshot::byId).iterator(), null), mapReduce); } public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; - Iterator stores = selector.select(snapshot); + StoreIterator StoreIterator = selector.select(snapshot); + Iterator stores = StoreIterator.getStoreIterator(); AsyncChain chain = null; LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length); while (stores.hasNext()) @@ -965,12 +987,13 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - Invariants.require(checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges())); + if (checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) + throw new IllegalStateException(); return chain == null ? AsyncChains.success(null) : chain; } - public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange) + public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange, Long minEpoch) { // Check that we are not querying two command stores for the same range for (Map.Entry entry : overlappingCommandStores.entrySet()) @@ -981,7 +1004,7 @@ public static boolean checkQueryDisjointRangesAcrossCommandStores(Map O mapReduceUnsafe(StoreSelector selector, Function map, BiFunction reduce, O accumulator) { Snapshot snapshot = current; - Iterator stores = selector.select(snapshot); + Iterator stores = selector.select(snapshot).getStoreIterator(); while (stores.hasNext()) { O next = map.apply(stores.next()); diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 9c4f1620c4..b4f57313aa 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -765,7 +765,7 @@ public CoordinationAdapter coordinationAdapter(TxnId txnId, Kind kind) public AsyncChain updateMinHlc(long minHlc) { // TODO (required): command stores that are not ready due to bootstrap need to refresh their min HLC on bootstrap completion - StoreSelector selector = snapshot -> Stream.of(snapshot.shards).map(sh -> sh.store).iterator(); + StoreSelector selector = snapshot -> new CommandStores.StoreIterator(Stream.of(snapshot.shards).map(sh -> sh.store).iterator(), null); return commandStores().mapReduce(selector, new MapReduceCommandStores<>(RoutingKeys.EMPTY) { @Override public Void reduce(Void o1, Void o2) { return null; } From 00e9136b0d4ef15f62af3088f950dfd2ac3ff7c6 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 15:15:21 -0800 Subject: [PATCH 19/35] null check --- accord-core/src/main/java/accord/local/CommandStores.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 14fccd56ed..9730a293e1 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -163,7 +163,7 @@ public static class StoreIterator public StoreIterator(Iterator StoreIterator, @Nullable Long minEpoch) { this.StoreIterator = StoreIterator; - this.minEpoch = minEpoch; + this.minEpoch = (minEpoch == null) ? 0L : minEpoch; } public Iterator getStoreIterator() From c72e9b25fdaedce2604b70d46578cbdba6c0f6f9 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 15:48:21 -0800 Subject: [PATCH 20/35] fix --- accord-core/src/main/java/accord/local/CommandStores.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9730a293e1..579c9254f7 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -987,7 +987,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - if (checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) + if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) throw new IllegalStateException(); return chain == null ? AsyncChains.success(null) : chain; From 6873064de2ae8d4f90ecce86f1cbe1ae35428095 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 16:30:33 -0800 Subject: [PATCH 21/35] name changes --- .../src/main/java/accord/local/CommandStore.java | 16 ++++++++-------- .../main/java/accord/local/CommandStores.java | 2 +- .../main/java/accord/local/SafeCommandStore.java | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 9bcd1be480..f59be8c237 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -187,7 +187,7 @@ CommandStore create(int id, * But they may still be ordered for other key ranges they participate in. */ private NavigableMap safeToRead = emptySafeToRead(); - private Ranges retiredRanges = Ranges.EMPTY; + private Ranges regainedRanges = Ranges.EMPTY; private final Set bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private RejectBefore rejectBefore; @@ -406,8 +406,8 @@ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { for (Map.Entry entry : newSafeToRead.entrySet()) { - Ranges rangeExcluded = entry.getValue().without(this.retiredRanges); - logger.info("{} is excluded from newSafeToRead because it is in the retired range", rangeExcluded); + Ranges rangeExcluded = entry.getValue().without(this.regainedRanges); + logger.info("{} is excluded from newSafeToRead because it is in the regained ranges", rangeExcluded); } } @@ -415,9 +415,9 @@ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) this.safeToRead = newSafeToRead; } - final void unsafeAddToRetiredRanges(Ranges newRetiredRange) + final void unsafeAddToRegainedRanges(Ranges newRegainedRanges) { - this.retiredRanges = newRetiredRange.union(MERGE_ADJACENT, this.retiredRanges); + this.regainedRanges = newRegainedRanges.union(MERGE_ADJACENT, this.regainedRanges); } protected final void unsafeClearSafeToRead() @@ -1196,10 +1196,10 @@ final void markUnsafeToRead(Ranges ranges) } } - final void markAsRetired(Ranges ranges) + final void markAsRegained(Ranges ranges) { - execute((Empty) () -> "Mark Range As Retired", safeStore -> { - safeStore.addToRetiredRanges(ranges); + execute((Empty) () -> "Mark Range As Regained", safeStore -> { + safeStore.addToRegainedRanges(ranges); }, agent); } diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 579c9254f7..e2e6a39c6f 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -803,7 +803,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top if (!regainedRanges.isEmpty()) { shard.store.markUnsafeToRead(regainedRanges); - shard.store.markAsRetired(regainedRanges); + shard.store.markAsRegained(regainedRanges); } // TODO (desired): only sync affected shards diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 217cd572d2..ba88a5ddf8 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -384,9 +384,9 @@ public void setSafeToRead(NavigableMap newSafeToRead) commandStore().unsafeSetSafeToRead(newSafeToRead); } - public void addToRetiredRanges(Ranges newRetiredRange) + public void addToRegainedRanges(Ranges newRegainedRanges) { - commandStore().unsafeAddToRetiredRanges(newRetiredRange); + commandStore().unsafeAddToRegainedRanges(newRegainedRanges); } public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) From 3c710de7e5fe08e43aac967f4e999b16b2421d29 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 4 Mar 2026 10:49:54 -0800 Subject: [PATCH 22/35] change failure mode --- accord-core/src/main/java/accord/local/CommandStores.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index e2e6a39c6f..4b91c2cafe 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -988,7 +988,10 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore } if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) - throw new IllegalStateException(); + { + logger.info("Reject query as it tries to query {} in more than one CommandStore", mapReduceConsume.keys().toRanges().toString()); + return AsyncChains.failure(null); + } return chain == null ? AsyncChains.success(null) : chain; } From d5db5f5ee4f6f3f4f6842944c1e83cb247bc413d Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 4 Mar 2026 17:15:02 -0800 Subject: [PATCH 23/35] changed error handling --- accord-core/src/main/java/accord/local/CommandStores.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 4b91c2cafe..5a38f326cd 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -38,6 +38,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.topology.TopologyException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -988,10 +989,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore } if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) - { - logger.info("Reject query as it tries to query {} in more than one CommandStore", mapReduceConsume.keys().toRanges().toString()); - return AsyncChains.failure(null); - } + return AsyncChains.failure(new RuntimeException("Tried to query more than one CommandStore for the same range")); return chain == null ? AsyncChains.success(null) : chain; } From fb12db945457ac1683933d3f12c6d975ac83f620 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 9 Mar 2026 10:45:11 -0700 Subject: [PATCH 24/35] change casing --- .../src/main/java/accord/local/CommandStores.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 5a38f326cd..d073992e38 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -158,18 +158,18 @@ public StoreIterator select(Snapshot snapshot) public static class StoreIterator { - public final Iterator StoreIterator; + public final Iterator storeIterator; public final Long minEpoch; - public StoreIterator(Iterator StoreIterator, @Nullable Long minEpoch) + public StoreIterator(Iterator storeIterator, @Nullable Long minEpoch) { - this.StoreIterator = StoreIterator; + this.storeIterator = storeIterator; this.minEpoch = (minEpoch == null) ? 0L : minEpoch; } public Iterator getStoreIterator() { - return StoreIterator; + return storeIterator; } public Long getMinEpoch() { @@ -975,8 +975,8 @@ public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandSt public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; - StoreIterator StoreIterator = selector.select(snapshot); - Iterator stores = StoreIterator.getStoreIterator(); + StoreIterator storeIterator = selector.select(snapshot); + Iterator stores = storeIterator.getStoreIterator(); AsyncChain chain = null; LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length); while (stores.hasNext()) @@ -988,7 +988,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) + if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), storeIterator.getMinEpoch())) return AsyncChains.failure(new RuntimeException("Tried to query more than one CommandStore for the same range")); return chain == null ? AsyncChains.success(null) : chain; From f4be3a7f5e4086f5f796874bf6385878ead731ea Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 11 Mar 2026 11:18:50 -0700 Subject: [PATCH 25/35] initial --- .../java/accord/coordinate/MaybeRecover.java | 1 - .../main/java/accord/local/CommandStores.java | 88 ++++++++++++++----- .../impl/basic/DelayedCommandStores.java | 12 +-- 3 files changed, 69 insertions(+), 32 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java index dd3b3fdb8c..1f7fecfea6 100644 --- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java +++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java @@ -55,7 +55,6 @@ public class MaybeRecover extends CheckShards> this.recoverIfAlreadyDurable = recoverIfAlreadyDurable; this.reportTo = reportTo; } - public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route someRoute, ProgressToken prevProgress, boolean recoverIfAlreadyDurable, LatentStoreSelector reportTo, BiConsumer callback) { MaybeRecover maybeRecover; diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index d073992e38..89406bf9e4 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -38,21 +38,14 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import accord.topology.TopologyException; +import accord.api.*; +import accord.topology.*; +import accord.utils.btree.BTreeMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.api.Agent; -import accord.api.AsyncExecutorFactory; -import accord.api.AsyncExecutor; -import accord.topology.EpochReady; -import accord.api.DataStore; -import accord.api.Journal; -import accord.api.LocalListeners; -import accord.api.ProgressLog; -import accord.api.RoutingKey; import accord.local.CommandStore.EpochUpdateHolder; import accord.primitives.AbstractRanges; import accord.primitives.AbstractUnseekableKeys; @@ -64,8 +57,6 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Unseekables; -import accord.topology.Shard; -import accord.topology.Topology; import accord.utils.IndexedQuadConsumer; import accord.utils.IndexedRangeQuadConsumer; import accord.utils.Invariants; @@ -318,6 +309,7 @@ public static class ShardHolder { public final CommandStore store; RangesForEpoch ranges; + Long retirementEpoch; ShardHolder(CommandStore store) { @@ -328,6 +320,7 @@ public ShardHolder(CommandStore store, RangesForEpoch ranges) { this.store = store; this.ranges = ranges; + this.retirementEpoch = null; } public ShardHolder withStoreUnsafe(CommandStore store) @@ -601,10 +594,11 @@ public static class Snapshot extends Journal.TopologyUpdate implements Iterable< private final int[] indexForRange; final SearchableRangeList lookupByRange; final Map overlappingCommandStores; + Int2ObjectHashMap previouslyOwnedRanges; - public Snapshot(ShardHolder[] shards, Topology local, Topology global) + public Snapshot(ShardHolder[] shards, Topology local, Topology global, Int2ObjectHashMap previouslyOwnedRanges) { - super(asMap(shards), global); + super(asMap(shards, previouslyOwnedRanges), global); this.local = local; this.shards = shards; this.byId = new Int2IntHashMap(shards.length, Hashing.DEFAULT_LOAD_FACTOR, -1); @@ -665,6 +659,8 @@ class RangeAndIndex } } } + + this.previouslyOwnedRanges = previouslyOwnedRanges; } // This method exists to ensure we do not hold references to command stores @@ -673,11 +669,12 @@ public Journal.TopologyUpdate asTopologyUpdate() return new Journal.TopologyUpdate(commandStores, global); } - private static Int2ObjectHashMap asMap(ShardHolder[] shards) + private static Int2ObjectHashMap asMap(ShardHolder[] shards, Int2ObjectHashMap previouslyOwnedRanges) { Int2ObjectHashMap commandStores = new Int2ObjectHashMap<>(); for (ShardHolder shard : shards) commandStores.put(shard.store.id, shard.ranges); + commandStores.putAll(previouslyOwnedRanges); return commandStores; } @@ -704,7 +701,7 @@ private CommandStores(StoreSupplier supplier, ShardDistributor shardDistributor, this.supplier = supplier; this.shardDistributor = shardDistributor; - this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY); + this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY, new Int2ObjectHashMap<>()); this.journal = journal; } @@ -788,14 +785,39 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top List> bootstrapUpdates = new ArrayList<>(); List result = new ArrayList<>(prev.shards.length + added.size()); + Int2ObjectHashMap previouslyOwnedRanges = new Int2ObjectHashMap<>(); + for (ShardHolder shard : prev.shards) { + // Remove shard if RangesForEpoch is fully retired + try + { + if (shard.retirementEpoch != null && node.topology().active().get(shard.retirementEpoch).retired().containsAll(shard.ranges().allAt(shard.retirementEpoch))) + { + previouslyOwnedRanges.put(shard.store.id(), shard.ranges); + continue; + } + } + catch (TopologyRetiredException e) + { + previouslyOwnedRanges.put(shard.store.id(), shard.ranges); + continue; + } + catch (TopologyException e) + { + throw new IllegalStateException(); + } + Ranges current = shard.ranges().currentRanges(); Ranges removeRanges = subtracted.slice(current, Minimal); if (!removeRanges.isEmpty()) { // TODO (required): This is updating the a non-volatile field in the previous Snapshot, why modify it at all, even with volatile the guaranteed visibility is weak even with mutual exclusion shard.ranges = shard.ranges().withRanges(newTopology.epoch(), current.without(subtracted)); + if (shard.ranges.ranges[shard.ranges.size() - 1].isEmpty()) + { + shard.retirementEpoch = shard.ranges.epochs[shard.ranges.size() - 1]; + } shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); } @@ -828,6 +850,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top updateHolder.add(epoch, rangesForEpoch, addRanges); ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder)); shard.ranges = rangesForEpoch; + shard.retirementEpoch = null; Map partitioned = addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range), BootstrapRangeAction.class); for (Map.Entry entry : partitioned.entrySet()) @@ -860,7 +883,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top ); }; } - return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology), bootstrap); + return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology, previouslyOwnedRanges), bootstrap); } private static boolean requiresSync(Ranges ranges, Topology oldTopology, Topology newTopology) @@ -982,19 +1005,32 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore while (stores.hasNext()) { CommandStore store = stores.next(); + Invariants.require(!snapshot.previouslyOwnedRanges.containsKey(store.id())); bitSet.set(snapshot.byId.get(store.id())); AsyncChain next = mapReduceConsume.applyAsync(store); if (next != null) chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), storeIterator.getMinEpoch())) + if (chain == null) + for (Map.Entry e : snapshot.previouslyOwnedRanges.entrySet()) + { + RangesForEpoch rangesForEpoch = e.getValue(); + + for (int i = 0; i < rangesForEpoch.size(); i++) + { + if (rangesForEpoch.epochs[i] >= storeIterator.getMinEpoch() && rangesForEpoch.ranges[i].intersects(mapReduceConsume.keys())) + return AsyncChains.failure(new RuntimeException("Tried to query CommandStore that has already been deleted")); + } + } + + if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, snapshot.previouslyOwnedRanges, mapReduceConsume.keys().toRanges(), storeIterator.getMinEpoch())) return AsyncChains.failure(new RuntimeException("Tried to query more than one CommandStore for the same range")); return chain == null ? AsyncChains.success(null) : chain; } - public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange, Long minEpoch) + public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Int2ObjectHashMap previouslyOwnedRanges, Ranges queryRange, Long minEpoch) { // Check that we are not querying two command stores for the same range for (Map.Entry entry : overlappingCommandStores.entrySet()) @@ -1005,7 +1041,7 @@ public static boolean checkQueryDisjointRangesAcrossCommandStores(Map= minEpoch ? queryRange.slice(shards[i].ranges().allSince(minEpoch), Minimal) : Ranges.EMPTY; if (!range.slice(touchedKeys, Minimal).isEmpty()) return false; @@ -1013,6 +1049,14 @@ public static boolean checkQueryDisjointRangesAcrossCommandStores(Map e : previouslyOwnedRanges.entrySet()) + { + Ranges touchedKeys = e.getValue().epochs[e.getValue().size()-1] >= minEpoch ? queryRange.slice(e.getValue().allSince(minEpoch), Minimal) : Ranges.EMPTY; + if (!range.slice(touchedKeys, Minimal).isEmpty()) + return false; + } } return true; @@ -1050,7 +1094,7 @@ public synchronized void initializeTopologyUnsafe(Journal.TopologyUpdate update) Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id)); nextId = maxId + 1; - loadSnapshot(new Snapshot(shards, update.global.forNode(supplier.node.id()).trim(), update.global)); + loadSnapshot(new Snapshot(shards, update.global.forNode(supplier.node.id()).trim(), update.global, new Int2ObjectHashMap<>())); } public synchronized void resetTopology(Journal.TopologyUpdate update) @@ -1100,7 +1144,7 @@ public void accept(Long epoch, Ranges ranges) } nextId = maxId + 1; - loadSnapshot(new Snapshot(shards, current.local, current.global)); + loadSnapshot(new Snapshot(shards, current.local, current.global, new Int2ObjectHashMap<>())); } public synchronized Supplier updateTopology(Node node, Topology newTopology) diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 4cde69a4c5..b9487dd4c3 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -18,14 +18,7 @@ package accord.impl.basic; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Queue; +import java.util.*; import java.util.concurrent.Callable; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -65,6 +58,7 @@ import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import accord.utils.async.Cancellable; +import org.agrona.collections.Int2ObjectHashMap; import static accord.api.Journal.CommandUpdate; import static accord.utils.Invariants.Paranoia.LINEAR; @@ -105,7 +99,7 @@ public void validateShardStateForTesting(Journal.TopologyUpdate lastUpdate) } Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id())); - loadSnapshot(new Snapshot(shards, lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global)); + loadSnapshot(new Snapshot(shards, lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global, new Int2ObjectHashMap<>())); } protected void loadSnapshot(Snapshot nextSnapshot) From f37865549694a458c686932fbbf7c3a0c354aff6 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 11 Mar 2026 12:42:02 -0700 Subject: [PATCH 26/35] fix --- .../main/java/accord/local/CommandStores.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 89406bf9e4..e5f50aade6 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -789,25 +789,6 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top for (ShardHolder shard : prev.shards) { - // Remove shard if RangesForEpoch is fully retired - try - { - if (shard.retirementEpoch != null && node.topology().active().get(shard.retirementEpoch).retired().containsAll(shard.ranges().allAt(shard.retirementEpoch))) - { - previouslyOwnedRanges.put(shard.store.id(), shard.ranges); - continue; - } - } - catch (TopologyRetiredException e) - { - previouslyOwnedRanges.put(shard.store.id(), shard.ranges); - continue; - } - catch (TopologyException e) - { - throw new IllegalStateException(); - } - Ranges current = shard.ranges().currentRanges(); Ranges removeRanges = subtracted.slice(current, Minimal); if (!removeRanges.isEmpty()) @@ -816,7 +797,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top shard.ranges = shard.ranges().withRanges(newTopology.epoch(), current.without(subtracted)); if (shard.ranges.ranges[shard.ranges.size() - 1].isEmpty()) { - shard.retirementEpoch = shard.ranges.epochs[shard.ranges.size() - 1]; + shard.retirementEpoch = shard.ranges.epochs[shard.ranges.size() - 1] - 1; } shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); @@ -837,6 +818,25 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top logger.debug("Epoch {} requires visibility sync for {}", epoch, ranges); bootstrapUpdates.add(shard.store.refreshReadyToCoordinate(node, ranges, epoch)); } + + try + { + if (shard.retirementEpoch != null && node.topology().active().get(shard.retirementEpoch).retired().containsAll(shard.ranges().allAt(shard.retirementEpoch))) + { + previouslyOwnedRanges.put(shard.store.id(), shard.ranges); + continue; + } + } + catch (TopologyRetiredException e) + { + previouslyOwnedRanges.put(shard.store.id(), shard.ranges); + continue; + } + catch (TopologyException e) + { + throw new IllegalStateException(); + } + result.add(shard); } From 2583d632389ea6a76d08a28d23bdccf6147fde01 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 11 Mar 2026 12:54:21 -0700 Subject: [PATCH 27/35] fix --- accord-core/src/main/java/accord/local/CommandStores.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index e5f50aade6..45bf3c66e9 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -795,9 +795,9 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top { // TODO (required): This is updating the a non-volatile field in the previous Snapshot, why modify it at all, even with volatile the guaranteed visibility is weak even with mutual exclusion shard.ranges = shard.ranges().withRanges(newTopology.epoch(), current.without(subtracted)); - if (shard.ranges.ranges[shard.ranges.size() - 1].isEmpty()) + if (current.without(subtracted).isEmpty()) { - shard.retirementEpoch = shard.ranges.epochs[shard.ranges.size() - 1] - 1; + shard.retirementEpoch = shard.ranges().epochs[shard.ranges.size() - 1] - 1; } shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); From 986c7f84881b4cc745e9de4d808d161a9e176a46 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 11 Mar 2026 13:46:52 -0700 Subject: [PATCH 28/35] fixes --- .../src/main/java/accord/local/CommandStores.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 45bf3c66e9..3d83953a12 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -1017,11 +1017,8 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { RangesForEpoch rangesForEpoch = e.getValue(); - for (int i = 0; i < rangesForEpoch.size(); i++) - { - if (rangesForEpoch.epochs[i] >= storeIterator.getMinEpoch() && rangesForEpoch.ranges[i].intersects(mapReduceConsume.keys())) - return AsyncChains.failure(new RuntimeException("Tried to query CommandStore that has already been deleted")); - } + if (rangesForEpoch.allSince(storeIterator.getMinEpoch()).intersects(mapReduceConsume.keys())) + return AsyncChains.failure(new RuntimeException("Query sent for deleted CommandStore")); } if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, snapshot.previouslyOwnedRanges, mapReduceConsume.keys().toRanges(), storeIterator.getMinEpoch())) @@ -1041,7 +1038,7 @@ public static boolean checkQueryDisjointRangesAcrossCommandStores(Map= minEpoch ? queryRange.slice(shards[i].ranges().allSince(minEpoch), Minimal) : Ranges.EMPTY; + Ranges touchedKeys = queryRange.slice(shards[i].ranges().allSince(minEpoch), Minimal); if (!range.slice(touchedKeys, Minimal).isEmpty()) return false; @@ -1053,7 +1050,7 @@ public static boolean checkQueryDisjointRangesAcrossCommandStores(Map e : previouslyOwnedRanges.entrySet()) { - Ranges touchedKeys = e.getValue().epochs[e.getValue().size()-1] >= minEpoch ? queryRange.slice(e.getValue().allSince(minEpoch), Minimal) : Ranges.EMPTY; + Ranges touchedKeys = queryRange.slice(e.getValue().allSince(minEpoch), Minimal); if (!range.slice(touchedKeys, Minimal).isEmpty()) return false; } From 77d35e698c943b1f08463e51389ac86ad8c77476 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 11 Mar 2026 15:34:03 -0700 Subject: [PATCH 29/35] fix --- accord-core/src/main/java/accord/local/CommandStores.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 3d83953a12..0320a0d2da 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -40,7 +40,6 @@ import accord.api.*; import accord.topology.*; -import accord.utils.btree.BTreeMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -785,7 +784,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top List> bootstrapUpdates = new ArrayList<>(); List result = new ArrayList<>(prev.shards.length + added.size()); - Int2ObjectHashMap previouslyOwnedRanges = new Int2ObjectHashMap<>(); + Int2ObjectHashMap previouslyOwnedRanges = prev.previouslyOwnedRanges; for (ShardHolder shard : prev.shards) { From e6768bb40a34b5cd912fab378bac242084619bd9 Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Fri, 17 Apr 2026 16:40:14 +0100 Subject: [PATCH 30/35] feedback/edits for regaining ranges --- .../src/main/java/accord/api/Journal.java | 10 +- .../accord/impl/AbstractSafeCommandStore.java | 6 + .../main/java/accord/local/CommandStore.java | 17 +- .../main/java/accord/local/CommandStores.java | 313 ++++++++---------- .../src/main/java/accord/local/Node.java | 2 +- .../OverlappingCommandStoresException.java | 39 +++ .../java/accord/local/SafeCommandStore.java | 11 +- .../java/accord/topology/TopologyManager.java | 45 +-- .../impl/basic/DelayedCommandStores.java | 10 +- 9 files changed, 246 insertions(+), 207 deletions(-) create mode 100644 accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index 381f1be2d6..cf9986c61f 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -28,6 +28,7 @@ import accord.impl.CommandChange; import accord.local.Command; import accord.local.CommandStores; +import accord.local.CommandStores.PreviouslyOwned; import accord.local.DurableBefore; import accord.local.Node; import accord.local.RedundantBefore; @@ -85,11 +86,13 @@ class TopologyUpdate { public final Int2ObjectHashMap commandStores; public final Topology global; + public final PreviouslyOwned previouslyOwned; - public TopologyUpdate(@Nonnull Int2ObjectHashMap commandStores, @Nonnull Topology global) + public TopologyUpdate(@Nonnull Int2ObjectHashMap commandStores, @Nonnull Topology global, PreviouslyOwned previouslyOwned) { this.commandStores = commandStores; this.global = global; + this.previouslyOwned = previouslyOwned; } public boolean isEquivalent(TopologyUpdate other) @@ -103,7 +106,7 @@ public boolean isEquivalent(TopologyUpdate other) public TopologyUpdate cloneWithEquivalentEpoch(long epoch) { - return new TopologyUpdate(commandStores, global.cloneEquivalentWithEpoch(epoch)); + return new TopologyUpdate(commandStores, global.cloneEquivalentWithEpoch(epoch), previouslyOwned); } @Override @@ -150,6 +153,7 @@ class FieldUpdates public RedundantBefore newRedundantBefore; public NavigableMap newBootstrapBeganAt; public NavigableMap newSafeToRead; + public Ranges newPermanentlyUnsafeToRead; public CommandStores.RangesForEpoch newRangesForEpoch; public String toString() @@ -161,6 +165,8 @@ public String toString() builder.append("newBootstrapBeganAt=").append(newBootstrapBeganAt).append(", "); if (newSafeToRead != null) builder.append("newSafeToRead=").append(newSafeToRead).append(", "); + if (newPermanentlyUnsafeToRead != null) + builder.append("newPermanentlyUnsafeToRead=").append(newPermanentlyUnsafeToRead).append(", "); if (newRangesForEpoch != null) builder.append("newRangesForEpoch=").append(newRangesForEpoch).append(", "); builder.setLength(builder.length() - 2); diff --git a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java index 24d52839d9..b684575126 100644 --- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java +++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java @@ -243,6 +243,12 @@ public final void setSafeToRead(NavigableMap newSafeToRead) ensureFieldUpdates().newSafeToRead = newSafeToRead; } + @Override + public final void setPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) + { + ensureFieldUpdates().newPermanentlyUnsafeToRead = newPermanentlyUnsafeToRead; + } + @Override public void setRangesForEpoch(RangesForEpoch rangesForEpoch) { diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index f59be8c237..83966f1928 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -187,7 +187,7 @@ CommandStore create(int id, * But they may still be ordered for other key ranges they participate in. */ private NavigableMap safeToRead = emptySafeToRead(); - private Ranges regainedRanges = Ranges.EMPTY; + private Ranges permanentlyUnsafeToRead = Ranges.EMPTY; private final Set bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private RejectBefore rejectBefore; @@ -406,7 +406,7 @@ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { for (Map.Entry entry : newSafeToRead.entrySet()) { - Ranges rangeExcluded = entry.getValue().without(this.regainedRanges); + Ranges rangeExcluded = entry.getValue().without(this.permanentlyUnsafeToRead); logger.info("{} is excluded from newSafeToRead because it is in the regained ranges", rangeExcluded); } } @@ -415,9 +415,9 @@ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) this.safeToRead = newSafeToRead; } - final void unsafeAddToRegainedRanges(Ranges newRegainedRanges) + final void unsafeSetPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) { - this.regainedRanges = newRegainedRanges.union(MERGE_ADJACENT, this.regainedRanges); + this.permanentlyUnsafeToRead = newPermanentlyUnsafeToRead; } protected final void unsafeClearSafeToRead() @@ -1196,11 +1196,12 @@ final void markUnsafeToRead(Ranges ranges) } } - final void markAsRegained(Ranges ranges) + final AsyncChain markPermanentlyUnsafeToRead(Ranges ranges) { - execute((Empty) () -> "Mark Range As Regained", safeStore -> { - safeStore.addToRegainedRanges(ranges); - }, agent); + return chain((Empty) () -> "Mark Range As Regained", safeStore -> { + safeStore.setSafeToRead(purgeHistory(safeToRead, ranges)); + safeStore.setPermanentlyUnsafeToRead(permanentlyUnsafeToRead.union(MERGE_ADJACENT, ranges)); + }); } public final DataStore unsafeGetDataStore() diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 0320a0d2da..72aa60131c 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -25,12 +25,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.HashMap; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -38,13 +34,20 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import accord.api.*; -import accord.topology.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.Agent; +import accord.api.AsyncExecutorFactory; +import accord.api.AsyncExecutor; +import accord.topology.EpochReady; +import accord.api.DataStore; +import accord.api.Journal; +import accord.api.LocalListeners; +import accord.api.ProgressLog; +import accord.api.RoutingKey; import accord.local.CommandStore.EpochUpdateHolder; import accord.primitives.AbstractRanges; import accord.primitives.AbstractUnseekableKeys; @@ -56,6 +59,8 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Unseekables; +import accord.topology.Shard; +import accord.topology.Topology; import accord.utils.IndexedQuadConsumer; import accord.utils.IndexedRangeQuadConsumer; import accord.utils.Invariants; @@ -90,6 +95,12 @@ public abstract class CommandStores implements AsyncExecutorFactory @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(CommandStores.class); + static final Iterator INVALID = new Iterator<>() + { + @Override public boolean hasNext() { throw new UnsupportedOperationException(); } + @Override public CommandStore next() { throw new UnsupportedOperationException(); } + }; + public interface LatentStoreSelector { StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants); @@ -101,9 +112,9 @@ class StandardLatentStoreSelector implements LatentStoreSelector @Override public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants) { - return snapshot -> new StoreIterator(StoreFinder.find(snapshot, participants) + return snapshot -> StoreFinder.find(snapshot, participants) .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()) - .iterator(snapshot), txnId.epoch()); + .iterator(snapshot); } } @@ -116,7 +127,7 @@ static LatentStoreSelector standard() public interface StoreSelector extends LatentStoreSelector { default StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants) { return this; } - StoreIterator select(Snapshot snapshot); + Iterator select(Snapshot snapshot); } public static class IncludingSpecificStoreSelector implements StoreSelector @@ -135,35 +146,14 @@ public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Particip StoreFinder finder = StoreFinder.find(snapshot, participants) .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()); finder.set(snapshot.byId.get(storeId)); - return new StoreIterator(finder.iterator(snapshot), txnId.epoch()); + return finder.iterator(snapshot); }; } @Override - public StoreIterator select(Snapshot snapshot) - { - return new StoreIterator(Collections.singletonList(snapshot.byId(storeId)).iterator(), null); - } - } - - public static class StoreIterator - { - public final Iterator storeIterator; - public final Long minEpoch; - - public StoreIterator(Iterator storeIterator, @Nullable Long minEpoch) - { - this.storeIterator = storeIterator; - this.minEpoch = (minEpoch == null) ? 0L : minEpoch; - } - - public Iterator getStoreIterator() + public Iterator select(Snapshot snapshot) { - return storeIterator; - } - - public Long getMinEpoch() { - return minEpoch; + return Collections.singletonList(snapshot.byId(storeId)).iterator(); } } @@ -171,6 +161,7 @@ public Long getMinEpoch() { public static class StoreFinder extends LargeBitSet implements IndexedQuadConsumer, IndexedRangeQuadConsumer { final int[] indexMap; + private boolean invalid; private StoreFinder(int size, int[] indexMap) { @@ -188,7 +179,7 @@ public static StoreSelector selector(Unseekables unseekables, long minEpoch, return snapshot -> { StoreFinder finder = StoreFinder.find(snapshot, unseekables); finder.filter(snapshot, unseekables, minEpoch, maxEpoch); - return new StoreIterator(finder.iterator(snapshot), minEpoch); + return finder.iterator(snapshot); }; } @@ -223,13 +214,23 @@ public StoreFinder filter(Snapshot snapshot, Unseekables unseekables, long mi ShardHolder shard = snapshot.shards[i]; Ranges shardRanges = shard.ranges().allBetween(minEpoch, maxEpoch); if (shardRanges != shard.ranges.all() && !shardRanges.intersects(unseekables)) + { unset(i); + } + else if (unsafelyTouchesRegainedRanges(snapshot, shard, unseekables, minEpoch)) + { + invalid = true; + break; + } } return this; } public Iterator iterator(Snapshot snapshot) { + if (invalid) + return INVALID; + return new Iterator<>() { int i = firstSetBit(); @@ -307,24 +308,20 @@ CommandStore create(int id, EpochUpdateHolder rangesForEpoch) public static class ShardHolder { public final CommandStore store; + @Nullable final Ranges regainsRanges; RangesForEpoch ranges; - Long retirementEpoch; - ShardHolder(CommandStore store) + ShardHolder(CommandStore store, @Nullable Ranges regainsRanges) { this.store = store; + this.regainsRanges = regainsRanges; } - public ShardHolder(CommandStore store, RangesForEpoch ranges) + public ShardHolder(CommandStore store, RangesForEpoch ranges, @Nullable Ranges regainsRanges) { this.store = store; + this.regainsRanges = regainsRanges; this.ranges = ranges; - this.retirementEpoch = null; - } - - public ShardHolder withStoreUnsafe(CommandStore store) - { - return new ShardHolder(store, ranges); } public RangesForEpoch ranges() @@ -349,12 +346,77 @@ public interface RangesForEpochSupplier RangesForEpoch ranges(); } + public static final class PreviouslyOwned + { + public static final PreviouslyOwned EMPTY = new PreviouslyOwned(0, RangesForEpoch.EMPTY.epochs, RangesForEpoch.EMPTY.ranges); + final long maxEpoch; + final long[] epochs; // the epoch upon which it was last owned + final Ranges[] ranges; + + public PreviouslyOwned(long maxEpoch, long[] epochs, Ranges[] ranges) + { + this.maxEpoch = maxEpoch; + this.epochs = epochs; + this.ranges = ranges; + } + + PreviouslyOwned prepend(long epoch, Ranges ranges) + { + Invariants.require(epochs.length == 0 || epoch > epochs[0]); + long[] newEpochs = new long[this.epochs.length + 1]; + Ranges[] newRanges = new Ranges[epochs.length]; + newEpochs[0] = epoch; + newRanges[0] = ranges; + System.arraycopy(this.epochs, 0, newEpochs, 1, this.epochs.length); + System.arraycopy(this.ranges, 0, newRanges, 1, this.ranges.length); + return new PreviouslyOwned(epoch, newEpochs, newRanges); + } + + public boolean overlaps(long epoch, Unseekables test) + { + if (epoch > maxEpoch) + return false; + + for (int i = 0 ; i < epochs.length && epoch <= epochs[i] ; ++i) + { + if (this.ranges[i].intersects(test)) + return true; + } + + return false; + } + + public Ranges regains(Ranges overlapping) + { + Ranges regains = Ranges.EMPTY; + for (Ranges rs : this.ranges) + regains = regains.without(rs.slice(overlapping, Minimal)); + return regains; + } + + public int size() + { + return epochs.length; + } + + public long epochs(int i) + { + return epochs[i]; + } + + public Ranges ranges(int i) + { + return ranges[i]; + } + } + // We ONLY remove ranges to keep logic manageable; likely to only merge CommandStores into a new CommandStore via some kind of Bootstrap public static class RangesForEpoch { + public static final RangesForEpoch EMPTY = new RangesForEpoch(new long[0], new Ranges[0]); + final long[] epochs; final Ranges[] ranges; - public static final RangesForEpoch EMPTY = new RangesForEpoch(new long[0], new Ranges[0]); public RangesForEpoch(long epoch, Ranges ranges) { @@ -386,7 +448,7 @@ public boolean equals(Object object) if (this == object) return true; if (object == null || getClass() != object.getClass()) return false; RangesForEpoch that = (RangesForEpoch) object; - return Objects.deepEquals(epochs, that.epochs) && Objects.deepEquals(ranges, that.ranges); + return Arrays.equals(epochs, that.epochs) && Arrays.equals(ranges, that.ranges); } @Override @@ -592,12 +654,10 @@ public static class Snapshot extends Journal.TopologyUpdate implements Iterable< final Int2IntHashMap byId; private final int[] indexForRange; final SearchableRangeList lookupByRange; - final Map overlappingCommandStores; - Int2ObjectHashMap previouslyOwnedRanges; - public Snapshot(ShardHolder[] shards, Topology local, Topology global, Int2ObjectHashMap previouslyOwnedRanges) + public Snapshot(ShardHolder[] shards, Topology local, Topology global, PreviouslyOwned previouslyOwned) { - super(asMap(shards, previouslyOwnedRanges), global); + super(asMap(shards), global, previouslyOwned); this.local = local; this.shards = shards; this.byId = new Int2IntHashMap(shards.length, Hashing.DEFAULT_LOAD_FACTOR, -1); @@ -642,38 +702,19 @@ class RangeAndIndex indexForRange[i] = rangesAndIndexes[i].index; } lookupByRange = SearchableRangeList.build(ranges); - - overlappingCommandStores = new HashMap<>(); - for (int i = 0; i < shards.length ; i++) - overlappingCommandStores.put(i, new LargeBitSet(shards.length)); - - for (int i = 0; i < shards.length; i++) - { - for (int j = i; j < shards.length; j++) - { - if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty()) - { - overlappingCommandStores.get(i).set(j); - overlappingCommandStores.get(j).set(i); - } - } - } - - this.previouslyOwnedRanges = previouslyOwnedRanges; } // This method exists to ensure we do not hold references to command stores public Journal.TopologyUpdate asTopologyUpdate() { - return new Journal.TopologyUpdate(commandStores, global); + return new Journal.TopologyUpdate(commandStores, global, previouslyOwned); } - private static Int2ObjectHashMap asMap(ShardHolder[] shards, Int2ObjectHashMap previouslyOwnedRanges) + private static Int2ObjectHashMap asMap(ShardHolder[] shards) { Int2ObjectHashMap commandStores = new Int2ObjectHashMap<>(); for (ShardHolder shard : shards) commandStores.put(shard.store.id, shard.ranges); - commandStores.putAll(previouslyOwnedRanges); return commandStores; } @@ -700,7 +741,7 @@ private CommandStores(StoreSupplier supplier, ShardDistributor shardDistributor, this.supplier = supplier; this.shardDistributor = shardDistributor; - this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY, new Int2ObjectHashMap<>()); + this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY, PreviouslyOwned.EMPTY); this.journal = journal; } @@ -784,7 +825,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top List> bootstrapUpdates = new ArrayList<>(); List result = new ArrayList<>(prev.shards.length + added.size()); - Int2ObjectHashMap previouslyOwnedRanges = prev.previouslyOwnedRanges; + PreviouslyOwned previouslyOwned = prev.previouslyOwned; for (ShardHolder shard : prev.shards) { @@ -794,20 +835,14 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top { // TODO (required): This is updating the a non-volatile field in the previous Snapshot, why modify it at all, even with volatile the guaranteed visibility is weak even with mutual exclusion shard.ranges = shard.ranges().withRanges(newTopology.epoch(), current.without(subtracted)); - if (current.without(subtracted).isEmpty()) - { - shard.retirementEpoch = shard.ranges().epochs[shard.ranges.size() - 1] - 1; - } shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); + bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); } Ranges regainedRanges = shard.ranges().all().slice(added, Minimal); if (!regainedRanges.isEmpty()) - { - shard.store.markUnsafeToRead(regainedRanges); - shard.store.markAsRegained(regainedRanges); - } + bootstrapUpdates.add(() -> EpochReady.all(epoch, shard.store.markPermanentlyUnsafeToRead(regainedRanges).beginAsResult())); // TODO (desired): only sync affected shards Ranges ranges = shard.ranges().currentRanges(); @@ -818,24 +853,6 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top bootstrapUpdates.add(shard.store.refreshReadyToCoordinate(node, ranges, epoch)); } - try - { - if (shard.retirementEpoch != null && node.topology().active().get(shard.retirementEpoch).retired().containsAll(shard.ranges().allAt(shard.retirementEpoch))) - { - previouslyOwnedRanges.put(shard.store.id(), shard.ranges); - continue; - } - } - catch (TopologyRetiredException e) - { - previouslyOwnedRanges.put(shard.store.id(), shard.ranges); - continue; - } - catch (TopologyException e) - { - throw new IllegalStateException(); - } - result.add(shard); } @@ -847,9 +864,8 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top EpochUpdateHolder updateHolder = new EpochUpdateHolder(); RangesForEpoch rangesForEpoch = new RangesForEpoch(epoch, addRanges); updateHolder.add(epoch, rangesForEpoch, addRanges); - ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder)); + ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder), previouslyOwned.regains(addRanges)); shard.ranges = rangesForEpoch; - shard.retirementEpoch = null; Map partitioned = addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range), BootstrapRangeAction.class); for (Map.Entry entry : partitioned.entrySet()) @@ -882,7 +898,11 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top ); }; } - return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology, previouslyOwnedRanges), bootstrap); + + if (!subtracted.isEmpty()) + previouslyOwned = previouslyOwned.prepend(epoch - 1, subtracted); + + return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology, previouslyOwned), bootstrap); } private static boolean requiresSync(Ranges ranges, Topology oldTopology, Topology newTopology) @@ -922,7 +942,7 @@ public void forAllUnsafe(Consumer forEach) public AsyncChain forAll(String reason, Consumer forEach) { - return mapReduce(snapshot -> new StoreIterator(Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), null), new MapReduceCommandStores<>(RoutingKeys.EMPTY) + return mapReduce(snapshot -> Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), new MapReduceCommandStores<>(RoutingKeys.EMPTY) { @Override public Void reduce(Void o1, Void o2) { return null; } @Override public TxnId primaryTxnId() { return null; } @@ -991,83 +1011,38 @@ public Cancellable mapReduceConsume(IntStream commandStoreIds, MapReduceCons public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandStores mapReduce) { - return mapReduce(snapshot -> new StoreIterator(commandStoreIds.mapToObj(snapshot::byId).iterator(), null), mapReduce); + return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); } public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; - StoreIterator storeIterator = selector.select(snapshot); - Iterator stores = storeIterator.getStoreIterator(); + Iterator stores = selector.select(snapshot); + if (stores == INVALID) + return AsyncChains.failure(new OverlappingCommandStoresException()); + AsyncChain chain = null; - LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length); while (stores.hasNext()) { CommandStore store = stores.next(); - Invariants.require(!snapshot.previouslyOwnedRanges.containsKey(store.id())); - bitSet.set(snapshot.byId.get(store.id())); AsyncChain next = mapReduceConsume.applyAsync(store); if (next != null) chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - if (chain == null) - for (Map.Entry e : snapshot.previouslyOwnedRanges.entrySet()) - { - RangesForEpoch rangesForEpoch = e.getValue(); - - if (rangesForEpoch.allSince(storeIterator.getMinEpoch()).intersects(mapReduceConsume.keys())) - return AsyncChains.failure(new RuntimeException("Query sent for deleted CommandStore")); - } - - if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, snapshot.previouslyOwnedRanges, mapReduceConsume.keys().toRanges(), storeIterator.getMinEpoch())) - return AsyncChains.failure(new RuntimeException("Tried to query more than one CommandStore for the same range")); - return chain == null ? AsyncChains.success(null) : chain; } - public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Int2ObjectHashMap previouslyOwnedRanges, Ranges queryRange, Long minEpoch) + private static boolean unsafelyTouchesRegainedRanges(Snapshot snapshot, ShardHolder shard, Unseekables unseekables, long minEpoch) { - // Check that we are not querying two command stores for the same range - for (Map.Entry entry : overlappingCommandStores.entrySet()) - { - Ranges range = Ranges.EMPTY; - LargeBitSet overlappingCommandStore = entry.getValue(); - for (int i = overlappingCommandStore.firstSetBit(); i >= 0 ; i = overlappingCommandStore.nextSetBit(i + 1, -1)) - { - if (commandStoresSeen.get(i)) - { - Ranges touchedKeys = queryRange.slice(shards[i].ranges().allSince(minEpoch), Minimal); - - if (!range.slice(touchedKeys, Minimal).isEmpty()) - return false; + if (shard.regainsRanges == null) + return false; - range = range.with(touchedKeys.toRanges()); - } - } + unseekables = unseekables.slice(shard.regainsRanges, Minimal); + if (unseekables.isEmpty()) + return false; - // Can only overlap with ranges of CommandStores that we traverse - for (Map.Entry e : previouslyOwnedRanges.entrySet()) - { - Ranges touchedKeys = queryRange.slice(e.getValue().allSince(minEpoch), Minimal); - if (!range.slice(touchedKeys, Minimal).isEmpty()) - return false; - } - } - - return true; - } - - public O mapReduceUnsafe(StoreSelector selector, Function map, BiFunction reduce, O accumulator) - { - Snapshot snapshot = current; - Iterator stores = selector.select(snapshot).getStoreIterator(); - while (stores.hasNext()) - { - O next = map.apply(stores.next()); - accumulator = reduce.apply(accumulator, next); - } - return accumulator; + return snapshot.previouslyOwned.overlaps(minEpoch, unseekables); } /** @@ -1081,21 +1056,21 @@ public synchronized void initializeTopologyUnsafe(Journal.TopologyUpdate update) int maxId = -1; for (Map.Entry e : update.commandStores.entrySet()) { - Invariants.require(e.getValue() != null); + RangesForEpoch rfe = e.getValue(); + Invariants.require(rfe != null); EpochUpdateHolder holder = new EpochUpdateHolder(); - holder.add(1, e.getValue(), e.getValue().all()); - shards[i++] = new ShardHolder(supplier.create(e.getKey(), holder), e.getValue()); + holder.add(1, rfe, rfe.all()); + shards[i++] = new ShardHolder(supplier.create(e.getKey(), holder), rfe, update.previouslyOwned.regains(rfe.all())); maxId = Math.max(maxId, e.getKey()); } Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id)); nextId = maxId + 1; - loadSnapshot(new Snapshot(shards, update.global.forNode(supplier.node.id()).trim(), update.global, new Int2ObjectHashMap<>())); + loadSnapshot(new Snapshot(shards, update.global.forNode(supplier.node.id()).trim(), update.global, update.previouslyOwned)); } public synchronized void resetTopology(Journal.TopologyUpdate update) { - // TODO: assert Snapshot current = this.current; Invariants.require(update.global.epoch() == current.local.epoch()); ShardHolder[] shards = new ShardHolder[current.commandStores.size()]; @@ -1103,11 +1078,11 @@ public synchronized void resetTopology(Journal.TopologyUpdate update) for (Map.Entry e : update.commandStores.entrySet()) { int storeId = e.getKey(); - RangesForEpoch ranges = e.getValue(); - Invariants.require(ranges != null); - ShardHolder shard = new ShardHolder(current.byId(storeId), ranges); + RangesForEpoch rfe = e.getValue(); + Invariants.require(rfe != null); + ShardHolder shard = new ShardHolder(current.byId(storeId), rfe, update.previouslyOwned.regains(rfe.all())); EpochUpdateHolder holder = shard.store.epochUpdateHolder; - ranges.forEach(new BiConsumer<>() + rfe.forEach(new BiConsumer<>() { RangesForEpoch accumulator = null; Ranges prev = null; @@ -1140,7 +1115,7 @@ public void accept(Long epoch, Ranges ranges) } nextId = maxId + 1; - loadSnapshot(new Snapshot(shards, current.local, current.global, new Int2ObjectHashMap<>())); + loadSnapshot(new Snapshot(shards, current.local, current.global, update.previouslyOwned)); } public synchronized Supplier updateTopology(Node node, Topology newTopology) diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index b4f57313aa..9c4f1620c4 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -765,7 +765,7 @@ public CoordinationAdapter coordinationAdapter(TxnId txnId, Kind kind) public AsyncChain updateMinHlc(long minHlc) { // TODO (required): command stores that are not ready due to bootstrap need to refresh their min HLC on bootstrap completion - StoreSelector selector = snapshot -> new CommandStores.StoreIterator(Stream.of(snapshot.shards).map(sh -> sh.store).iterator(), null); + StoreSelector selector = snapshot -> Stream.of(snapshot.shards).map(sh -> sh.store).iterator(); return commandStores().mapReduce(selector, new MapReduceCommandStores<>(RoutingKeys.EMPTY) { @Override public Void reduce(Void o1, Void o2) { return null; } diff --git a/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java b/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java new file mode 100644 index 0000000000..f1e358d655 --- /dev/null +++ b/accord-core/src/main/java/accord/local/OverlappingCommandStoresException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.local; + +import accord.utils.Rethrowable; + +public class OverlappingCommandStoresException extends RuntimeException implements Rethrowable +{ + public OverlappingCommandStoresException() + { + } + + private OverlappingCommandStoresException(Throwable cause) + { + super(cause); + } + + @Override + public OverlappingCommandStoresException rethrowable() + { + return new OverlappingCommandStoresException(this); + } +} diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index ba88a5ddf8..8d0b06f6b7 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -29,6 +29,7 @@ import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.RoutingKey; +import accord.local.CommandStores.RangesForEpoch; import accord.local.CommandStores.RangesForEpochSupplier; import accord.local.RedundantBefore.RedundantBeforeSupplier; import accord.local.cfk.CommandsForKey; @@ -384,12 +385,12 @@ public void setSafeToRead(NavigableMap newSafeToRead) commandStore().unsafeSetSafeToRead(newSafeToRead); } - public void addToRegainedRanges(Ranges newRegainedRanges) + public void setPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) { - commandStore().unsafeAddToRegainedRanges(newRegainedRanges); + commandStore().unsafeSetPermanentlyUnsafeToRead(newPermanentlyUnsafeToRead); } - public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) + public void setRangesForEpoch(RangesForEpoch rangesForEpoch) { commandStore().unsafeSetRangesForEpoch(rangesForEpoch); } @@ -576,7 +577,7 @@ private static void registerTransitive(SafeCommandStore safeStore, TxnId txnId, if (safeCommand != null && safeCommand.current().known().route() != MaybeRoute) return; - CommandStores.RangesForEpoch rangesForEpoch = safeStore.ranges(); + RangesForEpoch rangesForEpoch = safeStore.ranges(); // TODO (required): this is incompatible with rebootstrap - we need to use some additional condition witnessedBy = witnessedBy.without(rangesForEpoch.coordinates(txnId)); // already coordinates, no need to replicate if (witnessedBy.isEmpty()) @@ -594,7 +595,7 @@ private static void registerTransitive(SafeCommandStore safeStore, TxnId txnId, public abstract Agent agent(); public abstract ProgressLog progressLog(); public abstract NodeCommandStoreService node(); - public abstract CommandStores.RangesForEpoch ranges(); + public abstract RangesForEpoch ranges(); protected NavigableMap bootstrapBeganAt() { diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 52eedf507d..282ae9ef34 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -28,7 +28,6 @@ import java.util.function.Supplier; import javax.annotation.Nullable; -import accord.primitives.Routables; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +54,7 @@ import accord.utils.async.NestedAsyncResult; import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; +import static accord.primitives.Routables.Slice.Minimal; /** * Manages topology state changes and update bookkeeping @@ -315,12 +315,12 @@ public void reportTopology(Topology topology) public static class RegainingEpochRange { public final long epoch; - public final Ranges range; + public final Ranges ranges; - public RegainingEpochRange(long epoch, Ranges range) + public RegainingEpochRange(long epoch, Ranges ranges) { this.epoch = epoch; - this.range = range; + this.ranges = ranges; } public long epoch() @@ -328,36 +328,43 @@ public long epoch() return epoch; } - public Ranges range() + public Ranges ranges() { - return range; + return ranges; } } @Nullable - public RegainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next) + public RegainingEpochRange computeRegaining(Topology current, Topology next) { - Map additions = Topology.computeNodeAdditions(curr, next); + Map additions = Topology.computeNodeAdditions(current, next); long greatestEpoch = -1; - Ranges range = Ranges.EMPTY; + Ranges ranges = Ranges.EMPTY; - synchronized (this) + ActiveEpochs active = this.active; + for (Map.Entry entry : additions.entrySet()) { - for (Map.Entry entry : additions.entrySet()) + Ranges addingForNode = entry.getValue(); + for (ActiveEpoch e : active) { - for (ActiveEpoch activeEpoch : this.active) { - if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue()) && greatestEpoch < activeEpoch.epoch()) - { - greatestEpoch = activeEpoch.epoch(); - range = range.union(MERGE_ADJACENT, activeEpoch.all().rangesForNode(entry.getKey()).slice(entry.getValue(), Routables.Slice.Minimal)); - break; - } + addingForNode = addingForNode.without(e.removedRanges).without(e.retired()); + if (addingForNode.isEmpty()) + break; + + Ranges existingForNode = e.all().rangesForNode(entry.getKey()); + Ranges regainingForNode = addingForNode.slice(existingForNode, Minimal); + if (!regainingForNode.isEmpty()) + { + greatestEpoch = Math.max(greatestEpoch, e.epoch()); + ranges = ranges.union(MERGE_ADJACENT, regainingForNode); + addingForNode = addingForNode.without(regainingForNode); } + addingForNode = addingForNode.without(e.addedRanges); } } if (greatestEpoch != -1) - return new RegainingEpochRange(greatestEpoch, range); + return new RegainingEpochRange(greatestEpoch, ranges); return null; } diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index b9487dd4c3..a9b47c36ed 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -58,7 +58,6 @@ import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import accord.utils.async.Cancellable; -import org.agrona.collections.Int2ObjectHashMap; import static accord.api.Journal.CommandUpdate; import static accord.utils.Invariants.Paranoia.LINEAR; @@ -79,6 +78,7 @@ public static CommandStores.Factory factory(PendingQueue pending, CacheLoading i public void validateShardStateForTesting(Journal.TopologyUpdate lastUpdate) { + PreviouslyOwned previouslyOwned = lastUpdate.previouslyOwned; ShardHolder[] shards = new ShardHolder[lastUpdate.commandStores.size()]; int i = 0; for (Map.Entry e : lastUpdate.commandStores.entrySet()) @@ -89,17 +89,21 @@ public void validateShardStateForTesting(Journal.TopologyUpdate lastUpdate) for (ShardHolder shard : current) { if (shard.ranges().equals(ranges)) + { + Invariants.require(commandStore == null); commandStore = shard.store; + } } Invariants.nonNull(commandStore, "Each set of ranges should have a corresponding command store, but %d did not:(%s)", ranges, Arrays.toString(shards)) .restore(); - ShardHolder shard = new ShardHolder(commandStore, e.getValue()); + + ShardHolder shard = new ShardHolder(commandStore, ranges, previouslyOwned.regains(ranges.all())); shards[i++] = shard; } Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id())); - loadSnapshot(new Snapshot(shards, lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global, new Int2ObjectHashMap<>())); + loadSnapshot(new Snapshot(shards, lastUpdate.global.forNode(nodeId()).trim(), lastUpdate.global, lastUpdate.previouslyOwned)); } protected void loadSnapshot(Snapshot nextSnapshot) From b93f5ad66427faf1bfa1653364a44451ed4bcf6c Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 20 Apr 2026 14:29:23 -0700 Subject: [PATCH 31/35] fixed indexing bug --- .../src/main/java/accord/local/CommandStore.java | 10 ++++------ .../src/main/java/accord/local/CommandStores.java | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 83966f1928..bf3af79827 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -402,17 +402,15 @@ protected void unsafeAcceptRequests(Ranges accept) */ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { + NavigableMap newSafeToReadWithoutRegainingRanges = new TreeMap<>(); if (newSafeToRead != null) { for (Map.Entry entry : newSafeToRead.entrySet()) - { - Ranges rangeExcluded = entry.getValue().without(this.permanentlyUnsafeToRead); - logger.info("{} is excluded from newSafeToRead because it is in the regained ranges", rangeExcluded); - } + newSafeToReadWithoutRegainingRanges.put(entry.getKey(), entry.getValue().without(this.permanentlyUnsafeToRead)); } node.updateStamp(); - this.safeToRead = newSafeToRead; + this.safeToRead = newSafeToReadWithoutRegainingRanges; } final void unsafeSetPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) @@ -1198,7 +1196,7 @@ final void markUnsafeToRead(Ranges ranges) final AsyncChain markPermanentlyUnsafeToRead(Ranges ranges) { - return chain((Empty) () -> "Mark Range As Regained", safeStore -> { + return chain((Empty) () -> "Mark Range As Permanently Unsafe To Read", safeStore -> { safeStore.setSafeToRead(purgeHistory(safeToRead, ranges)); safeStore.setPermanentlyUnsafeToRead(permanentlyUnsafeToRead.union(MERGE_ADJACENT, ranges)); }); diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 72aa60131c..51eb8bb990 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -364,7 +364,7 @@ PreviouslyOwned prepend(long epoch, Ranges ranges) { Invariants.require(epochs.length == 0 || epoch > epochs[0]); long[] newEpochs = new long[this.epochs.length + 1]; - Ranges[] newRanges = new Ranges[epochs.length]; + Ranges[] newRanges = new Ranges[this.epochs.length + 1]; newEpochs[0] = epoch; newRanges[0] = ranges; System.arraycopy(this.epochs, 0, newEpochs, 1, this.epochs.length); From 4673322bf2c062ef16e2069429eed06bc828ef86 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 22 Apr 2026 10:43:01 -0700 Subject: [PATCH 32/35] delete command stores: --- .../main/java/accord/local/CommandStores.java | 25 +++++++++++++++++++ .../java/accord/topology/TopologyManager.java | 5 ++++ 2 files changed, 30 insertions(+) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 51eb8bb990..1db46522e3 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -1139,6 +1139,31 @@ public synchronized Supplier updateTopology(Node node, Topology newT return update.bootstrap; } + public synchronized void deleteCommandStores(long retiredEpoch) + { + List result = new ArrayList<>(); + boolean shouldSwap = false; + for (ShardHolder shard : current) + { + long lastEpochWithOwnedRange = shard.ranges().epochs[shard.ranges.epochs.length - 1] - 1; + if (shard.ranges().currentRanges().isEmpty() && lastEpochWithOwnedRange < retiredEpoch) + { + shouldSwap = true; + } + else { + result.add(shard); + } + } + + if (shouldSwap) + { + Snapshot snapshot = new Snapshot(result.toArray(new ShardHolder[0]), current().local, current.global, current.previouslyOwned); + AsyncResults.SettableResult flush = new AsyncResults.SettableWithDescription<>("Write Topology To Journal"); + journal.saveTopology(snapshot.asTopologyUpdate(), () -> flush.setSuccess(null)); + current = snapshot; + } + } + public void shutdown() { for (ShardHolder shard : current.shards) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 282ae9ef34..9b06493ec0 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -197,6 +197,11 @@ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) if (epoch > active.currentEpoch) ranges = pending.retired(ranges, epoch); ranges = active.retired(ranges, epoch); + if (e != null && e.allRetired() || epoch < minEpoch()) + { + this.active = new ActiveEpochs(this, active().epochs, active.firstNonEmptyEpoch); + node.commandStores().deleteCommandStores(epoch); + } } if (!ranges.isEmpty()) { From af8a2e39d0b3c6afeceb0216bc2560afbf4d5762 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Fri, 24 Apr 2026 14:37:06 -0700 Subject: [PATCH 33/35] fix NPE --- .../src/main/java/accord/local/CommandStore.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index bf3af79827..b7df4d7ab7 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -400,17 +400,17 @@ protected void unsafeAcceptRequests(Ranges accept) /** * This method may be invoked on a non-CommandStore thread */ - final void unsafeSetSafeToRead(NavigableMap newSafeToRead) + final void unsafeSetSafeToRead(@Nullable NavigableMap newSafeToRead) { - NavigableMap newSafeToReadWithoutRegainingRanges = new TreeMap<>(); + node.updateStamp(); if (newSafeToRead != null) { + NavigableMap newSafeToReadWithoutRegainingRanges = new TreeMap<>(); for (Map.Entry entry : newSafeToRead.entrySet()) newSafeToReadWithoutRegainingRanges.put(entry.getKey(), entry.getValue().without(this.permanentlyUnsafeToRead)); - } - node.updateStamp(); - this.safeToRead = newSafeToReadWithoutRegainingRanges; + this.safeToRead = newSafeToReadWithoutRegainingRanges; + } } final void unsafeSetPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) From 3fb4f5159304470d3a84983bb40b567f4f1955ad Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Mon, 27 Apr 2026 11:42:13 +0100 Subject: [PATCH 34/35] suggestions --- .../main/java/accord/local/CommandStore.java | 11 +--- .../main/java/accord/local/CommandStores.java | 21 +++---- .../java/accord/topology/ActiveEpochs.java | 56 +++++++++++-------- .../java/accord/topology/TopologyManager.java | 30 +++------- .../accord/topology/TopologyManagerTest.java | 7 --- 5 files changed, 53 insertions(+), 72 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index b7df4d7ab7..5af1601a25 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -402,15 +402,10 @@ protected void unsafeAcceptRequests(Ranges accept) */ final void unsafeSetSafeToRead(@Nullable NavigableMap newSafeToRead) { - node.updateStamp(); if (newSafeToRead != null) - { - NavigableMap newSafeToReadWithoutRegainingRanges = new TreeMap<>(); - for (Map.Entry entry : newSafeToRead.entrySet()) - newSafeToReadWithoutRegainingRanges.put(entry.getKey(), entry.getValue().without(this.permanentlyUnsafeToRead)); - - this.safeToRead = newSafeToReadWithoutRegainingRanges; - } + newSafeToRead = purgeHistory(newSafeToRead, permanentlyUnsafeToRead); + safeToRead = newSafeToRead; + node.updateStamp(); } final void unsafeSetPermanentlyUnsafeToRead(Ranges newPermanentlyUnsafeToRead) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 1db46522e3..9bf4c5e680 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -1139,25 +1139,20 @@ public synchronized Supplier updateTopology(Node node, Topology newT return update.bootstrap; } - public synchronized void deleteCommandStores(long retiredEpoch) + public synchronized void removeCommandStoresBefore(long beforeEpoch) { - List result = new ArrayList<>(); - boolean shouldSwap = false; + List keep = new ArrayList<>(current.shards.length); for (ShardHolder shard : current) { - long lastEpochWithOwnedRange = shard.ranges().epochs[shard.ranges.epochs.length - 1] - 1; - if (shard.ranges().currentRanges().isEmpty() && lastEpochWithOwnedRange < retiredEpoch) - { - shouldSwap = true; - } - else { - result.add(shard); - } + RangesForEpoch ranges = shard.ranges; + int size = ranges.size(); + if (!ranges.rangesAtIndex(size - 1).isEmpty() || ranges.epochAtIndex(size - 2) >= beforeEpoch) + keep.add(shard); } - if (shouldSwap) + if (keep.size() != current.shards.length) { - Snapshot snapshot = new Snapshot(result.toArray(new ShardHolder[0]), current().local, current.global, current.previouslyOwned); + Snapshot snapshot = new Snapshot(keep.toArray(new ShardHolder[0]), current().local, current.global, current.previouslyOwned); AsyncResults.SettableResult flush = new AsyncResults.SettableWithDescription<>("Write Topology To Journal"); journal.saveTopology(snapshot.asTopologyUpdate(), () -> flush.setSuccess(null)); current = snapshot; diff --git a/accord-core/src/main/java/accord/topology/ActiveEpochs.java b/accord-core/src/main/java/accord/topology/ActiveEpochs.java index 5b1677ffba..1ba8a3e477 100644 --- a/accord-core/src/main/java/accord/topology/ActiveEpochs.java +++ b/accord-core/src/main/java/accord/topology/ActiveEpochs.java @@ -62,19 +62,31 @@ public final class ActiveEpochs implements Iterable // Epochs are sorted in _descending_ order final ActiveEpoch[] epochs; - ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long prevFirstNonEmptyEpoch) + private ActiveEpochs(TopologyManager manager, ActiveEpoch[] epochs, long firstNonEmptyEpoch) { this.manager = manager; this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; - if (prevFirstNonEmptyEpoch != -1) - this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; - else if (epochs.length > 0 && !epochs[0].all().isEmpty()) - this.firstNonEmptyEpoch = currentEpoch; - else - this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; - + this.firstNonEmptyEpoch = firstNonEmptyEpoch; + this.epochs = epochs; for (int i = 1; i < epochs.length; i++) Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); + } + + static ActiveEpochs empty(TopologyManager manager) + { + return new ActiveEpochs(manager, new ActiveEpoch[0], -1); + } + + ActiveEpochs withNewEpochs(ActiveEpoch[] epochs) + { + long firstNonEmptyEpoch = this.firstNonEmptyEpoch; + if (firstNonEmptyEpoch == -1 && epochs.length > 0 && !epochs[0].all().isEmpty()) + firstNonEmptyEpoch = currentEpoch; + return new ActiveEpochs(manager, epochs, firstNonEmptyEpoch); + } + + ActiveEpochs maybeTruncate() + { int truncateFrom = -1; // > 0 because we do not want to be left without epochs in case they're all empty for (int i = epochs.length - 1; i > 0; i--) @@ -86,28 +98,26 @@ else if (epochs.length > 0 && !epochs[0].all().isEmpty()) } if (truncateFrom == -1) + return this; + + ActiveEpoch[] newEpochs = Arrays.copyOf(epochs, truncateFrom);; + for (int i = truncateFrom; i < epochs.length; i++) { - this.epochs = epochs; + ActiveEpoch e = epochs[i]; + Invariants.require(epochs[i].isQuorumReady()); + logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.all.ranges, e.closed()); } - else + if (logger.isTraceEnabled()) { - this.epochs = Arrays.copyOf(epochs, truncateFrom); - for (int i = truncateFrom; i < epochs.length; i++) + for (int i = 0; i < truncateFrom; i++) { ActiveEpoch e = epochs[i]; - Invariants.require(epochs[i].isQuorumReady()); - logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", e.epoch(), e.addedRanges, e.removedRanges, e.all.ranges, e.closed()); - } - if (logger.isTraceEnabled()) - { - for (int i = 0; i < truncateFrom; i++) - { - ActiveEpoch e = epochs[i]; - Invariants.require(e.isQuorumReady()); - logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); - } + Invariants.require(e.isQuorumReady()); + logger.trace("Leaving epoch {} with added/removed ranges {}/{}", e.epoch(), e.addedRanges, e.removedRanges); } } + + return withNewEpochs(newEpochs); } public boolean isEmpty() diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 9b06493ec0..5f06885b0a 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -103,7 +103,7 @@ public TopologyManager(TopologySorter.Supplier sorter, Node node, TopologyServic this.time = time; this.timeouts = timeouts; this.topologyService = topologyService; - this.active = new ActiveEpochs(this, new ActiveEpoch[0], -1); + this.active = ActiveEpochs.empty(this); this.pending = new PendingEpochs(this); } @@ -173,6 +173,7 @@ public void onEpochRetired(Ranges ranges, TxnId txnId) private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) { + long removeCommandStoresBefore = 0; Topology topology = null; synchronized (this) { @@ -197,10 +198,11 @@ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) if (epoch > active.currentEpoch) ranges = pending.retired(ranges, epoch); ranges = active.retired(ranges, epoch); - if (e != null && e.allRetired() || epoch < minEpoch()) + ActiveEpochs truncated = active.maybeTruncate(); + if (truncated != active) { - this.active = new ActiveEpochs(this, active().epochs, active.firstNonEmptyEpoch); - node.commandStores().deleteCommandStores(epoch); + active = truncated; + removeCommandStoresBefore = truncated.minEpoch(); } } if (!ranges.isEmpty()) @@ -208,22 +210,8 @@ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) for (TopologyListener listener : listeners) listener.onEpochRetired(ranges, epoch, topology); } - } - - public synchronized void truncateTopologiesUntil(long epoch) - { - ActiveEpochs current = active; - Invariants.requireArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d is > current epoch %d", epoch, current.epoch()); - - if (current.minEpoch() >= epoch) - return; - - int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); - Invariants.require(current.epochs[newLen - 1].isQuorumReady(), "Epoch %d is not ready to coordinate", current.epochs[newLen - 1].epoch()); - - ActiveEpoch[] nextEpochs = new ActiveEpoch[newLen]; - System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); - active = new ActiveEpochs(this, nextEpochs, current.firstNonEmptyEpoch); + if (removeCommandStoresBefore > 0) + node.commandStores().removeCommandStoresBefore(removeCommandStoresBefore); } public TopologySorter.Supplier sorter() @@ -440,7 +428,7 @@ private void updateActive() } } - this.active = new ActiveEpochs(this, next, prev.firstNonEmptyEpoch); + this.active = prev.withNewEpochs(next); this.pending.removeFirst(topology.epoch); } diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java index 68152ae9e0..30ed0fd414 100644 --- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java @@ -283,13 +283,6 @@ void truncateTopologyHistory() Assertions.assertTrue(service.active().hasEpoch(2)); Assertions.assertTrue(service.active().hasEpoch(3)); Assertions.assertTrue(service.active().hasEpoch(4)); - - service.truncateTopologiesUntil(3); - Assertions.assertFalse(service.active().hasEpoch(1)); - Assertions.assertFalse(service.active().hasEpoch(2)); - Assertions.assertTrue(service.active().hasEpoch(3)); - Assertions.assertTrue(service.active().hasEpoch(4)); - } @Test From 9e32dc36bb5481460dd8f1b03e339e6d4823bf20 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 27 Apr 2026 10:08:34 -0700 Subject: [PATCH 35/35] fix --- accord-core/src/main/java/accord/local/CommandStores.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9bf4c5e680..ef0ca3461c 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -1146,7 +1146,8 @@ public synchronized void removeCommandStoresBefore(long beforeEpoch) { RangesForEpoch ranges = shard.ranges; int size = ranges.size(); - if (!ranges.rangesAtIndex(size - 1).isEmpty() || ranges.epochAtIndex(size - 2) >= beforeEpoch) + long lastEpochWithOwnedRange = ranges.epochAtIndex(size - 1) - 1; + if (!ranges.rangesAtIndex(size - 1).isEmpty() || lastEpochWithOwnedRange >= beforeEpoch) keep.add(shard); }