From 6d5e859fd40ef790f887e90214a09458d711a343 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 15:39:00 -0800 Subject: [PATCH 01/24] initial --- .../main/java/accord/local/CommandStore.java | 19 +++++ .../main/java/accord/local/CommandStores.java | 74 ++++++++++++++----- .../java/accord/local/SafeCommandStore.java | 5 ++ .../java/accord/topology/TopologyManager.java | 58 ++++++++++++++- .../accord/topology/TopologyRandomizer.java | 44 ++++++----- 5 files changed, 160 insertions(+), 40 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index d321bc6794..4aab5a02df 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -187,6 +187,7 @@ CommandStore create(int id, * But they may still be ordered for other key ranges they participate in. */ private NavigableMap safeToRead = emptySafeToRead(); + private Ranges retiredRanges = Ranges.EMPTY; private final Set bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private RejectBefore rejectBefore; @@ -401,10 +402,21 @@ protected void unsafeAcceptRequests(Ranges accept) */ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { + for (Map.Entry entry : newSafeToRead.entrySet()) + { + Ranges rangeExcluded = entry.getValue().without(this.retiredRanges); + logger.info("{} is excluded from newSafeToRead because it is in the retired range", rangeExcluded); + } + node.updateStamp(); this.safeToRead = newSafeToRead; } + final void unsafeAddToRetiredRanges(Ranges newRetiredRange) + { + this.retiredRanges = newRetiredRange.union(MERGE_ADJACENT, this.retiredRanges); + } + protected final void unsafeClearSafeToRead() { unsafeSetSafeToRead(null); @@ -1181,6 +1193,13 @@ final void markUnsafeToRead(Ranges ranges) } } + final void markAsRetired(Ranges ranges) + { + execute((Empty) () -> "Mark Range As Retired", safeStore -> { + safeStore.addToRetiredRanges(ranges); + }, agent); + } + public final DataStore unsafeGetDataStore() { return dataStore; diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9a6e2d29af..6dc7a53a53 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -18,14 +18,7 @@ package accord.local; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -37,6 +30,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.primitives.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -52,16 +46,6 @@ import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.local.CommandStore.EpochUpdateHolder; -import accord.primitives.AbstractRanges; -import accord.primitives.AbstractUnseekableKeys; -import accord.primitives.EpochSupplier; -import accord.primitives.Participants; -import accord.primitives.Range; -import accord.primitives.Ranges; -import accord.primitives.RoutingKeys; -import accord.primitives.Timestamp; -import accord.primitives.TxnId; -import accord.primitives.Unseekables; import accord.topology.Shard; import accord.topology.Topology; import accord.utils.IndexedQuadConsumer; @@ -577,6 +561,7 @@ public static class Snapshot extends Journal.TopologyUpdate implements Iterable< final Int2IntHashMap byId; private final int[] indexForRange; final SearchableRangeList lookupByRange; + final Map overlappingCommandStores; public Snapshot(ShardHolder[] shards, Topology local, Topology global) { @@ -625,6 +610,21 @@ class RangeAndIndex indexForRange[i] = rangesAndIndexes[i].index; } lookupByRange = SearchableRangeList.build(ranges); + + overlappingCommandStores = new HashMap<>(); + for (ShardHolder shard : shards) + { + overlappingCommandStores.put(shard.store.id, new LargeBitSet(shards.length)); + for (Map.Entry entry : overlappingCommandStores.entrySet()) + { + Integer id = entry.getKey(); + if (!shard.ranges().all().overlapping(shards[byId.get(id)].ranges.all()).isEmpty()) + { + overlappingCommandStores.get(shard.store.id).set(id); + entry.getValue().set(shard.store.id); + } + } + } } // This method exists to ensure we do not hold references to command stores @@ -759,6 +759,14 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges); bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); } + + Ranges regainedRanges = shard.ranges().all().overlapping(added); + if (!regainedRanges.isEmpty()) + { + shard.store.markUnsafeToRead(regainedRanges); + shard.store.markAsRetired(regainedRanges); + } + // TODO (desired): only sync affected shards Ranges ranges = shard.ranges().currentRanges(); // ranges can be empty when ranges are lost or consolidated across epochs. @@ -924,17 +932,45 @@ public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandSt return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); } + /* Do all reads and writes go through this? */ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; Iterator stores = selector.select(snapshot); AsyncChain chain = null; + LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length); while (stores.hasNext()) { - AsyncChain next = mapReduceConsume.applyAsync(stores.next()); + CommandStore store = stores.next(); + bitSet.set(store.id()); + AsyncChain next = mapReduceConsume.applyAsync(store); if (next != null) chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } + + // Check that we are not querying two command stores for the same range + for (Map.Entry entry : snapshot.overlappingCommandStores.entrySet()) + { + List overlapping = new ArrayList<>(); + LargeBitSet overlappingCommandStores = entry.getValue(); + for (int i = overlappingCommandStores.firstSetBit(); i >= 0 ; i = overlappingCommandStores.nextSetBit(i + 1, -1)) + { + if (bitSet.get(i)) + overlapping.add(i); + } + + Ranges range = Ranges.EMPTY; + for (Integer id : overlapping) + { + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(id).rangesForEpoch.all()); + if (!range.overlapping(touchedKeys).isEmpty()) + { + throw illegalState("We should not query the same range from two different command stores."); + } + range = range.with(mapReduceConsume.keys().overlapping(snapshot.byId(id).rangesForEpoch.all()).toRanges()); + } + } + return chain == null ? AsyncChains.success(null) : chain; } diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 99faad7a0b..217cd572d2 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -384,6 +384,11 @@ public void setSafeToRead(NavigableMap newSafeToRead) commandStore().unsafeSetSafeToRead(newSafeToRead); } + public void addToRetiredRanges(Ranges newRetiredRange) + { + commandStore().unsafeAddToRetiredRanges(newRetiredRange); + } + public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) { commandStore().unsafeSetRangesForEpoch(rangesForEpoch); diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 4ca397da9f..90f21bd678 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -18,7 +18,7 @@ package accord.topology; -import java.util.IdentityHashMap; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -283,6 +283,62 @@ protected Executor executor() return Runnable::run; } + public static Map getAdditions(Topology current, Topology next) + { + Map additions = new HashMap<>(); + for (Id node : next.nodes()) + { + Ranges prev = current.rangesForNode(node); + if (prev == null) prev = Ranges.EMPTY; + + Ranges added = next.rangesForNode(node).without(prev); + if (added.isEmpty()) + continue; + + additions.put(node, added); + } + return additions; + } + + public List epochsWeAreRegainingRangesFor(Topology curr, Topology next) + { + ArrayList epochs = new ArrayList<>(); + + synchronized (this) + { + Map additions = getAdditions(curr, next); + for (Map.Entry entry : additions.entrySet()) + { + for (ActiveEpoch activeEpoch : this.active) { + if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue())) + epochs.add(activeEpoch.epoch()); + } + } + } + + return epochs; + } + + public void blockUntilAllEpochsRetired(List epochs) + { + try + { + int index = 0; + synchronized (this) + { + while (index != epochs.size()) + { + for (int i = index; i < epochs.size(); i++) + if (this.active.get(epochs.get(i)).allRetired()) + index += 1; + wait(); + } + } + } catch (Throwable t) { + logger.error("Uncaught exception", t); + } + } + public void reportTopology(Topology topology) { PendingEpoch e; diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java index a8f8490acd..2725ab2089 100644 --- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java +++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java @@ -413,33 +413,37 @@ private static int prefix(Shard shard) return ((PrefixedIntHashKey) shard.range.start()).prefix; } - private static Map getAdditions(Topology current, Topology next) + private boolean validToReassignRange(Topology current, Shard[] nextShards, Map previouslyReplicated) { - Map additions = new HashMap<>(); - for (Id node : next.nodes()) - { - Ranges prev = current.rangesForNode(node); - if (prev == null) prev = Ranges.EMPTY; - - Ranges added = next.rangesForNode(node).without(prev); - if (added.isEmpty()) - continue; + Topology next = new Topology(current.epoch + 1, nextShards); + Map additions = TopologyManager.getAdditions(current, next); - additions.put(node, added); + for (Map.Entry entry : additions.entrySet()) + { + if (previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY).intersects(entry.getValue()) + && !(previousEpochForRegainedRangeRetired(current, entry.getValue()))) + return false; } - return additions; + + return true; } - private static boolean reassignsRanges(Topology current, Shard[] nextShards, Map previouslyReplicated) + private boolean previousEpochForRegainedRangeRetired(Topology current, Ranges regainingRanges) { - Topology next = new Topology(current.epoch + 1, nextShards); - Map additions = getAdditions(current, next); - - for (Map.Entry entry : additions.entrySet()) + for (Id id : current.nodes()) { - if (previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY).intersects(entry.getValue())) + Node node = this.nodeLookup.apply(id); + boolean isRetiredForNode = true; + for (ActiveEpoch epoch : node.topology().active()) + { + if (epoch.all().ranges.intersects(regainingRanges) && !epoch.allRetired()) + isRetiredForNode = false; + } + + if (isRetiredForNode) return true; } + return false; } @@ -472,7 +476,7 @@ public synchronized Topology updateTopology() Shard[] testShards = type.apply(state, random); Arrays.sort(testShards, (a, b) -> a.range.compareTo(b.range)); if (!everyShardHasQuorumOverlaps(oldShards, testShards) - || reassignsRanges(current, testShards, previouslyReplicated)) + || !validToReassignRange(current, testShards, previouslyReplicated)) { ++rejectedMutations; } @@ -488,7 +492,7 @@ public synchronized Topology updateTopology() Topology nextTopology = new Topology(current.epoch + 1, newShards); - Map nextAdditions = getAdditions(current, nextTopology); + Map nextAdditions = TopologyManager.getAdditions(current, nextTopology); for (Map.Entry entry : nextAdditions.entrySet()) { previouslyReplicated.merge(entry.getKey(), entry.getValue(), (a, b) -> a.union(MERGE_ADJACENT, b)); From 3241b640e17952ea6f4dfd468f2de45880abad3c Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 15:44:41 -0800 Subject: [PATCH 02/24] formatting --- .../main/java/accord/local/CommandStores.java | 22 ++++++++++++++++--- .../java/accord/topology/TopologyManager.java | 6 ++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 6dc7a53a53..be75c17e37 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -18,7 +18,15 @@ package accord.local; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.HashMap; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -30,7 +38,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import accord.primitives.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -46,6 +53,16 @@ import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.local.CommandStore.EpochUpdateHolder; +import accord.primitives.AbstractRanges; +import accord.primitives.AbstractUnseekableKeys; +import accord.primitives.EpochSupplier; +import accord.primitives.Participants; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.RoutingKeys; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; import accord.topology.Shard; import accord.topology.Topology; import accord.utils.IndexedQuadConsumer; @@ -932,7 +949,6 @@ public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandSt return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); } - /* Do all reads and writes go through this? */ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 90f21bd678..57f2656199 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -18,7 +18,11 @@ package accord.topology; -import java.util.*; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; From 53149bb935710a4a7c49277a16dcd15e11f51d8a Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 15:57:14 -0800 Subject: [PATCH 03/24] inlined loop --- .../main/java/accord/local/CommandStores.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index be75c17e37..5a6865db9f 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -967,23 +967,19 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore // Check that we are not querying two command stores for the same range for (Map.Entry entry : snapshot.overlappingCommandStores.entrySet()) { - List overlapping = new ArrayList<>(); + Ranges range = Ranges.EMPTY; LargeBitSet overlappingCommandStores = entry.getValue(); for (int i = overlappingCommandStores.firstSetBit(); i >= 0 ; i = overlappingCommandStores.nextSetBit(i + 1, -1)) { if (bitSet.get(i)) - overlapping.add(i); - } - - Ranges range = Ranges.EMPTY; - for (Integer id : overlapping) - { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(id).rangesForEpoch.all()); - if (!range.overlapping(touchedKeys).isEmpty()) { - throw illegalState("We should not query the same range from two different command stores."); + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()); + + if (!range.overlapping(touchedKeys).isEmpty()) + throw illegalState("We should not query the same range from two different command stores."); + + range = range.with(mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()).toRanges()); } - range = range.with(mapReduceConsume.keys().overlapping(snapshot.byId(id).rangesForEpoch.all()).toRanges()); } } From 97b0e97da99635ddfe9cf1eb4dbd1b1239822e1d Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 16:00:39 -0800 Subject: [PATCH 04/24] null check --- accord-core/src/main/java/accord/local/CommandStore.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 4aab5a02df..9bcd1be480 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -402,10 +402,13 @@ protected void unsafeAcceptRequests(Ranges accept) */ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { - for (Map.Entry entry : newSafeToRead.entrySet()) + if (newSafeToRead != null) { - Ranges rangeExcluded = entry.getValue().without(this.retiredRanges); - logger.info("{} is excluded from newSafeToRead because it is in the retired range", rangeExcluded); + for (Map.Entry entry : newSafeToRead.entrySet()) + { + Ranges rangeExcluded = entry.getValue().without(this.retiredRanges); + logger.info("{} is excluded from newSafeToRead because it is in the retired range", rangeExcluded); + } } node.updateStamp(); From 2663505ab71eed5170c63343243ec1b0a598ce83 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 20:00:22 -0800 Subject: [PATCH 05/24] inline --- accord-core/src/main/java/accord/local/CommandStores.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 5a6865db9f..4360f1e3e9 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -973,12 +973,12 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()); + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).unsafeGetRangesForEpoch().all()); if (!range.overlapping(touchedKeys).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); - range = range.with(mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()).toRanges()); + range = range.with(touchedKeys.toRanges()); } } } From 1d2dd2793105c120c4ced5fd0f149bc6d092e647 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 21:15:02 -0800 Subject: [PATCH 06/24] fix --- .../main/java/accord/local/CommandStores.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 4360f1e3e9..758b884787 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -629,16 +629,17 @@ class RangeAndIndex lookupByRange = SearchableRangeList.build(ranges); overlappingCommandStores = new HashMap<>(); - for (ShardHolder shard : shards) + for (int i = 0; i < shards.length; i++) + overlappingCommandStores.put(i, new LargeBitSet(shards.length)); + + for (int i = 0; i < shards.length; i++) { - overlappingCommandStores.put(shard.store.id, new LargeBitSet(shards.length)); - for (Map.Entry entry : overlappingCommandStores.entrySet()) + for (int j = i+1; j < shards.length; j++) { - Integer id = entry.getKey(); - if (!shard.ranges().all().overlapping(shards[byId.get(id)].ranges.all()).isEmpty()) + if (!shards[i].ranges().all().overlapping(shards[j].ranges().all()).isEmpty()) { - overlappingCommandStores.get(shard.store.id).set(id); - entry.getValue().set(shard.store.id); + overlappingCommandStores.get(i).set(j); + overlappingCommandStores.get(j).set(i); } } } @@ -973,7 +974,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).unsafeGetRangesForEpoch().all()); + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.shards[i].ranges().all()); if (!range.overlapping(touchedKeys).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); From 2fba86e2233bf3a05ae578086156a9243d556ae5 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 23 Feb 2026 21:39:26 -0800 Subject: [PATCH 07/24] update --- accord-core/src/main/java/accord/topology/TopologyManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 57f2656199..fd6b266ebb 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -197,6 +197,7 @@ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) if (epoch > active.currentEpoch) ranges = pending.retired(ranges, epoch); ranges = active.retired(ranges, epoch); + notify(); } if (!ranges.isEmpty()) { From 3538f16b0e1c88396191e47a524c67cac889ddfb Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Feb 2026 11:51:27 -0800 Subject: [PATCH 08/24] fix npe --- .../src/main/java/accord/local/CommandStores.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 758b884787..0963f00c0b 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -629,8 +629,8 @@ class RangeAndIndex lookupByRange = SearchableRangeList.build(ranges); overlappingCommandStores = new HashMap<>(); - for (int i = 0; i < shards.length; i++) - overlappingCommandStores.put(i, new LargeBitSet(shards.length)); + for (ShardHolder shard : shards) + overlappingCommandStores.put(shard.store.id(), new LargeBitSet(shards.length)); for (int i = 0; i < shards.length; i++) { @@ -638,8 +638,8 @@ class RangeAndIndex { if (!shards[i].ranges().all().overlapping(shards[j].ranges().all()).isEmpty()) { - overlappingCommandStores.get(i).set(j); - overlappingCommandStores.get(j).set(i); + overlappingCommandStores.get(shards[i].store.id()).set(shards[j].store.id()); + overlappingCommandStores.get(shards[j].store.id()).set(shards[i].store.id()); } } } @@ -974,7 +974,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.shards[i].ranges().all()); + Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()); if (!range.overlapping(touchedKeys).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); From a7bb9fffa56209c8add383b7415a9573c910b4f4 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Feb 2026 15:55:34 -0800 Subject: [PATCH 09/24] fix wrong use of overlapping --- accord-core/src/main/java/accord/local/CommandStores.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 0963f00c0b..11e3be908f 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -636,7 +636,7 @@ class RangeAndIndex { for (int j = i+1; j < shards.length; j++) { - if (!shards[i].ranges().all().overlapping(shards[j].ranges().all()).isEmpty()) + if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty()) { overlappingCommandStores.get(shards[i].store.id()).set(shards[j].store.id()); overlappingCommandStores.get(shards[j].store.id()).set(shards[i].store.id()); @@ -778,7 +778,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges)); } - Ranges regainedRanges = shard.ranges().all().overlapping(added); + Ranges regainedRanges = shard.ranges().all().slice(added, Minimal); if (!regainedRanges.isEmpty()) { shard.store.markUnsafeToRead(regainedRanges); @@ -974,9 +974,9 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Unseekables touchedKeys = mapReduceConsume.keys().overlapping(snapshot.byId(i).rangesForEpoch.all()); + Ranges touchedKeys = mapReduceConsume.keys().toRanges().slice(snapshot.byId(i).rangesForEpoch.all(), Minimal); - if (!range.overlapping(touchedKeys).isEmpty()) + if (!range.slice(touchedKeys, Minimal).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); range = range.with(touchedKeys.toRanges()); From 226ddd0cd4930a4e2d2cff7ddef7215754605054 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Feb 2026 16:22:27 -0800 Subject: [PATCH 10/24] use shard index instead of command store id --- .../src/main/java/accord/local/CommandStores.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 11e3be908f..a8ce9a3329 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -629,8 +629,8 @@ class RangeAndIndex lookupByRange = SearchableRangeList.build(ranges); overlappingCommandStores = new HashMap<>(); - for (ShardHolder shard : shards) - overlappingCommandStores.put(shard.store.id(), new LargeBitSet(shards.length)); + for (int i = 0; i < shards.length ; i++) + overlappingCommandStores.put(i, new LargeBitSet(shards.length)); for (int i = 0; i < shards.length; i++) { @@ -638,8 +638,8 @@ class RangeAndIndex { if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty()) { - overlappingCommandStores.get(shards[i].store.id()).set(shards[j].store.id()); - overlappingCommandStores.get(shards[j].store.id()).set(shards[i].store.id()); + overlappingCommandStores.get(i).set(j); + overlappingCommandStores.get(j).set(i); } } } @@ -959,7 +959,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore while (stores.hasNext()) { CommandStore store = stores.next(); - bitSet.set(store.id()); + bitSet.set(snapshot.byId.get(store.id())); AsyncChain next = mapReduceConsume.applyAsync(store); if (next != null) chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; @@ -974,7 +974,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore { if (bitSet.get(i)) { - Ranges touchedKeys = mapReduceConsume.keys().toRanges().slice(snapshot.byId(i).rangesForEpoch.all(), Minimal); + Ranges touchedKeys = mapReduceConsume.keys().toRanges().slice(snapshot.shards[i].ranges().all(), Minimal); if (!range.slice(touchedKeys, Minimal).isEmpty()) throw illegalState("We should not query the same range from two different command stores."); From aa61f854a32fefbbff75faefcf34d158a224769a Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 12:08:55 -0800 Subject: [PATCH 11/24] refactor --- .../main/java/accord/topology/Topology.java | 17 ++++++ .../java/accord/topology/TopologyManager.java | 57 ------------------- .../accord/topology/TopologyRandomizer.java | 4 +- 3 files changed, 19 insertions(+), 59 deletions(-) diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index cba5a4c20a..b0d9508662 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -765,6 +765,23 @@ public void forEach(Consumer forEach) forEach.accept(shards[i]); } + public static Map computeNodeAdditions(Topology current, Topology next) + { + Map additions = new HashMap<>(); + for (Id node : next.nodes()) + { + Ranges prev = current.rangesForNode(node); + if (prev == null) prev = Ranges.EMPTY; + + Ranges added = next.rangesForNode(node).without(prev); + if (added.isEmpty()) + continue; + + additions.put(node, added); + } + return additions; + } + public SortedArrayList nodes() { return nodes; diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index fd6b266ebb..b97790b350 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -197,7 +197,6 @@ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId txnId) if (epoch > active.currentEpoch) ranges = pending.retired(ranges, epoch); ranges = active.retired(ranges, epoch); - notify(); } if (!ranges.isEmpty()) { @@ -288,62 +287,6 @@ protected Executor executor() return Runnable::run; } - public static Map getAdditions(Topology current, Topology next) - { - Map additions = new HashMap<>(); - for (Id node : next.nodes()) - { - Ranges prev = current.rangesForNode(node); - if (prev == null) prev = Ranges.EMPTY; - - Ranges added = next.rangesForNode(node).without(prev); - if (added.isEmpty()) - continue; - - additions.put(node, added); - } - return additions; - } - - public List epochsWeAreRegainingRangesFor(Topology curr, Topology next) - { - ArrayList epochs = new ArrayList<>(); - - synchronized (this) - { - Map additions = getAdditions(curr, next); - for (Map.Entry entry : additions.entrySet()) - { - for (ActiveEpoch activeEpoch : this.active) { - if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue())) - epochs.add(activeEpoch.epoch()); - } - } - } - - return epochs; - } - - public void blockUntilAllEpochsRetired(List epochs) - { - try - { - int index = 0; - synchronized (this) - { - while (index != epochs.size()) - { - for (int i = index; i < epochs.size(); i++) - if (this.active.get(epochs.get(i)).allRetired()) - index += 1; - wait(); - } - } - } catch (Throwable t) { - logger.error("Uncaught exception", t); - } - } - public void reportTopology(Topology topology) { PendingEpoch e; diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java index 2725ab2089..c778e707c1 100644 --- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java +++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java @@ -416,7 +416,7 @@ private static int prefix(Shard shard) private boolean validToReassignRange(Topology current, Shard[] nextShards, Map previouslyReplicated) { Topology next = new Topology(current.epoch + 1, nextShards); - Map additions = TopologyManager.getAdditions(current, next); + Map additions = Topology.computeNodeAdditions(current, next); for (Map.Entry entry : additions.entrySet()) { @@ -492,7 +492,7 @@ public synchronized Topology updateTopology() Topology nextTopology = new Topology(current.epoch + 1, newShards); - Map nextAdditions = TopologyManager.getAdditions(current, nextTopology); + Map nextAdditions = Topology.computeNodeAdditions(current, nextTopology); for (Map.Entry entry : nextAdditions.entrySet()) { previouslyReplicated.merge(entry.getKey(), entry.getValue(), (a, b) -> a.union(MERGE_ADJACENT, b)); From 4b5b2a967f07b8f1533c5f8907176f41a9c0f090 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 14:37:55 -0800 Subject: [PATCH 12/24] added method for greatest epoch and range that needs to be retired --- .../java/accord/topology/TopologyManager.java | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index b97790b350..4511ee95b5 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -18,11 +18,8 @@ package accord.topology; -import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.Map; -import java.util.HashMap; -import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -31,6 +28,7 @@ import java.util.function.Supplier; import javax.annotation.Nullable; +import accord.primitives.Routables; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +54,8 @@ import accord.utils.async.AsyncResults; import accord.utils.async.NestedAsyncResult; +import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; + /** * Manages topology state changes and update bookkeeping * @@ -312,6 +312,56 @@ public void reportTopology(Topology topology) updateActive(); } + public static class regainingEpochRange + { + public final long epoch; + public final Ranges range; + + public regainingEpochRange(long epoch, Ranges range) + { + this.epoch = epoch; + this.range = range; + } + + public long epoch() + { + return epoch; + } + + public Ranges range() + { + return range; + } + } + + @Nullable + public regainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next) + { + Map additions = Topology.computeNodeAdditions(curr, next); + long greatestEpoch = -1; + Ranges range = Ranges.EMPTY; + + synchronized (this) + { + for (Map.Entry entry : additions.entrySet()) + { + for (ActiveEpoch activeEpoch : this.active) { + if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue()) && greatestEpoch < activeEpoch.epoch()) + { + greatestEpoch = activeEpoch.epoch(); + range = range.union(MERGE_ADJACENT, activeEpoch.all().rangesForNode(entry.getKey()).slice(entry.getValue(), Routables.Slice.Minimal)); + break; + } + } + } + } + + if (greatestEpoch != -1) + return new regainingEpochRange(greatestEpoch, range); + + return null; + } + private final AtomicBoolean updatingActive = new AtomicBoolean(); private void updateActive() { From 356c17352393d01dbd44e06f51b05161840c234f Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 16:31:19 -0800 Subject: [PATCH 13/24] fix class name --- .../src/main/java/accord/topology/TopologyManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 4511ee95b5..126e10078a 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -312,12 +312,12 @@ public void reportTopology(Topology topology) updateActive(); } - public static class regainingEpochRange + public static class RegainingEpochRange { public final long epoch; public final Ranges range; - public regainingEpochRange(long epoch, Ranges range) + public RegainingEpochRange(long epoch, Ranges range) { this.epoch = epoch; this.range = range; @@ -335,7 +335,7 @@ public Ranges range() } @Nullable - public regainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next) + public RegainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next) { Map additions = Topology.computeNodeAdditions(curr, next); long greatestEpoch = -1; From c5962adac56fbba5916d07b5f6c1bc6e95d6a409 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 16:31:50 -0800 Subject: [PATCH 14/24] fix --- accord-core/src/main/java/accord/topology/TopologyManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 126e10078a..52eedf507d 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -357,7 +357,7 @@ public RegainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next } if (greatestEpoch != -1) - return new regainingEpochRange(greatestEpoch, range); + return new RegainingEpochRange(greatestEpoch, range); return null; } From 8dbd41dbd424762172c15878cf4c26e41f158e8f Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 21:23:22 -0800 Subject: [PATCH 15/24] factor out method to test --- .../src/main/java/accord/local/CommandStores.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index a8ce9a3329..9a472e1f6f 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -965,6 +965,13 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } + Invariants.require(checkQueryDisjointRangesAcrossCommandStores(snapshot, bitSet, mapReduceConsume.keys().toRanges())); + + return chain == null ? AsyncChains.success(null) : chain; + } + + public boolean checkQueryDisjointRangesAcrossCommandStores(Snapshot snapshot, LargeBitSet commandStoresSeen, Ranges queryRange) + { // Check that we are not querying two command stores for the same range for (Map.Entry entry : snapshot.overlappingCommandStores.entrySet()) { @@ -972,19 +979,19 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore LargeBitSet overlappingCommandStores = entry.getValue(); for (int i = overlappingCommandStores.firstSetBit(); i >= 0 ; i = overlappingCommandStores.nextSetBit(i + 1, -1)) { - if (bitSet.get(i)) + if (commandStoresSeen.get(i)) { - Ranges touchedKeys = mapReduceConsume.keys().toRanges().slice(snapshot.shards[i].ranges().all(), Minimal); + Ranges touchedKeys = queryRange.slice(snapshot.shards[i].ranges().all(), Minimal); if (!range.slice(touchedKeys, Minimal).isEmpty()) - throw illegalState("We should not query the same range from two different command stores."); + return false; range = range.with(touchedKeys.toRanges()); } } } - return chain == null ? AsyncChains.success(null) : chain; + return true; } public O mapReduceUnsafe(StoreSelector selector, Function map, BiFunction reduce, O accumulator) From 8d930b759f4b6db56e14177598b63e264894f9ef Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 21:28:05 -0800 Subject: [PATCH 16/24] refactor to test --- .../src/main/java/accord/local/CommandStores.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9a472e1f6f..efdcd4aedf 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -965,23 +965,23 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - Invariants.require(checkQueryDisjointRangesAcrossCommandStores(snapshot, bitSet, mapReduceConsume.keys().toRanges())); + Invariants.require(checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges())); return chain == null ? AsyncChains.success(null) : chain; } - public boolean checkQueryDisjointRangesAcrossCommandStores(Snapshot snapshot, LargeBitSet commandStoresSeen, Ranges queryRange) + public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange) { // Check that we are not querying two command stores for the same range - for (Map.Entry entry : snapshot.overlappingCommandStores.entrySet()) + for (Map.Entry entry : overlappingCommandStores.entrySet()) { Ranges range = Ranges.EMPTY; - LargeBitSet overlappingCommandStores = entry.getValue(); - for (int i = overlappingCommandStores.firstSetBit(); i >= 0 ; i = overlappingCommandStores.nextSetBit(i + 1, -1)) + LargeBitSet overlappingCommandStore = entry.getValue(); + for (int i = overlappingCommandStore.firstSetBit(); i >= 0 ; i = overlappingCommandStore.nextSetBit(i + 1, -1)) { if (commandStoresSeen.get(i)) { - Ranges touchedKeys = queryRange.slice(snapshot.shards[i].ranges().all(), Minimal); + Ranges touchedKeys = queryRange.slice(shards[i].ranges().all(), Minimal); if (!range.slice(touchedKeys, Minimal).isEmpty()) return false; From a06d79e8176fe0db470717cf0c7e9ad52441a422 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Thu, 26 Feb 2026 23:12:18 -0800 Subject: [PATCH 17/24] fix bug --- accord-core/src/main/java/accord/local/CommandStores.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index efdcd4aedf..0f65493577 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -634,7 +634,7 @@ class RangeAndIndex for (int i = 0; i < shards.length; i++) { - for (int j = i+1; j < shards.length; j++) + for (int j = i; j < shards.length; j++) { if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty()) { From 78fe590c2d40df1493e29a5cca9af860e1355d08 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 15:06:26 -0800 Subject: [PATCH 18/24] Refined check for querying > 1 command store for same range --- .../main/java/accord/local/CommandStores.java | 51 ++++++++++++++----- .../src/main/java/accord/local/Node.java | 2 +- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 0f65493577..14fccd56ed 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -110,9 +110,9 @@ class StandardLatentStoreSelector implements LatentStoreSelector @Override public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants) { - return snapshot -> StoreFinder.find(snapshot, participants) + return snapshot -> new StoreIterator(StoreFinder.find(snapshot, participants) .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()) - .iterator(snapshot); + .iterator(snapshot), txnId.epoch()); } } @@ -125,7 +125,7 @@ static LatentStoreSelector standard() public interface StoreSelector extends LatentStoreSelector { default StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants participants) { return this; } - Iterator select(Snapshot snapshot); + StoreIterator select(Snapshot snapshot); } public static class IncludingSpecificStoreSelector implements StoreSelector @@ -144,14 +144,35 @@ public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Particip StoreFinder finder = StoreFinder.find(snapshot, participants) .filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch()); finder.set(snapshot.byId.get(storeId)); - return finder.iterator(snapshot); + return new StoreIterator(finder.iterator(snapshot), txnId.epoch()); }; } @Override - public Iterator select(Snapshot snapshot) + public StoreIterator select(Snapshot snapshot) { - return Collections.singletonList(snapshot.byId(storeId)).iterator(); + return new StoreIterator(Collections.singletonList(snapshot.byId(storeId)).iterator(), null); + } + } + + public static class StoreIterator + { + public final Iterator StoreIterator; + public final Long minEpoch; + + public StoreIterator(Iterator StoreIterator, @Nullable Long minEpoch) + { + this.StoreIterator = StoreIterator; + this.minEpoch = minEpoch; + } + + public Iterator getStoreIterator() + { + return StoreIterator; + } + + public Long getMinEpoch() { + return minEpoch; } } @@ -176,7 +197,7 @@ public static StoreSelector selector(Unseekables unseekables, long minEpoch, return snapshot -> { StoreFinder finder = StoreFinder.find(snapshot, unseekables); finder.filter(snapshot, unseekables, minEpoch, maxEpoch); - return finder.iterator(snapshot); + return new StoreIterator(finder.iterator(snapshot), minEpoch); }; } @@ -878,7 +899,7 @@ public void forAllUnsafe(Consumer forEach) public AsyncChain forAll(String reason, Consumer forEach) { - return mapReduce(snapshot -> Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), new MapReduceCommandStores<>(RoutingKeys.EMPTY) + return mapReduce(snapshot -> new StoreIterator(Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), null), new MapReduceCommandStores<>(RoutingKeys.EMPTY) { @Override public Void reduce(Void o1, Void o2) { return null; } @Override public TxnId primaryTxnId() { return null; } @@ -947,13 +968,14 @@ public Cancellable mapReduceConsume(IntStream commandStoreIds, MapReduceCons public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandStores mapReduce) { - return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce); + return mapReduce(snapshot -> new StoreIterator(commandStoreIds.mapToObj(snapshot::byId).iterator(), null), mapReduce); } public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; - Iterator stores = selector.select(snapshot); + StoreIterator StoreIterator = selector.select(snapshot); + Iterator stores = StoreIterator.getStoreIterator(); AsyncChain chain = null; LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length); while (stores.hasNext()) @@ -965,12 +987,13 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - Invariants.require(checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges())); + if (checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) + throw new IllegalStateException(); return chain == null ? AsyncChains.success(null) : chain; } - public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange) + public static boolean checkQueryDisjointRangesAcrossCommandStores(Map overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange, Long minEpoch) { // Check that we are not querying two command stores for the same range for (Map.Entry entry : overlappingCommandStores.entrySet()) @@ -981,7 +1004,7 @@ public static boolean checkQueryDisjointRangesAcrossCommandStores(Map O mapReduceUnsafe(StoreSelector selector, Function map, BiFunction reduce, O accumulator) { Snapshot snapshot = current; - Iterator stores = selector.select(snapshot); + Iterator stores = selector.select(snapshot).getStoreIterator(); while (stores.hasNext()) { O next = map.apply(stores.next()); diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 9c4f1620c4..b4f57313aa 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -765,7 +765,7 @@ public CoordinationAdapter coordinationAdapter(TxnId txnId, Kind kind) public AsyncChain updateMinHlc(long minHlc) { // TODO (required): command stores that are not ready due to bootstrap need to refresh their min HLC on bootstrap completion - StoreSelector selector = snapshot -> Stream.of(snapshot.shards).map(sh -> sh.store).iterator(); + StoreSelector selector = snapshot -> new CommandStores.StoreIterator(Stream.of(snapshot.shards).map(sh -> sh.store).iterator(), null); return commandStores().mapReduce(selector, new MapReduceCommandStores<>(RoutingKeys.EMPTY) { @Override public Void reduce(Void o1, Void o2) { return null; } From 00e9136b0d4ef15f62af3088f950dfd2ac3ff7c6 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 15:15:21 -0800 Subject: [PATCH 19/24] null check --- accord-core/src/main/java/accord/local/CommandStores.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 14fccd56ed..9730a293e1 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -163,7 +163,7 @@ public static class StoreIterator public StoreIterator(Iterator StoreIterator, @Nullable Long minEpoch) { this.StoreIterator = StoreIterator; - this.minEpoch = minEpoch; + this.minEpoch = (minEpoch == null) ? 0L : minEpoch; } public Iterator getStoreIterator() From c72e9b25fdaedce2604b70d46578cbdba6c0f6f9 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 15:48:21 -0800 Subject: [PATCH 20/24] fix --- accord-core/src/main/java/accord/local/CommandStores.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9730a293e1..579c9254f7 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -987,7 +987,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - if (checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) + if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) throw new IllegalStateException(); return chain == null ? AsyncChains.success(null) : chain; From 6873064de2ae8d4f90ecce86f1cbe1ae35428095 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 3 Mar 2026 16:30:33 -0800 Subject: [PATCH 21/24] name changes --- .../src/main/java/accord/local/CommandStore.java | 16 ++++++++-------- .../main/java/accord/local/CommandStores.java | 2 +- .../main/java/accord/local/SafeCommandStore.java | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 9bcd1be480..f59be8c237 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -187,7 +187,7 @@ CommandStore create(int id, * But they may still be ordered for other key ranges they participate in. */ private NavigableMap safeToRead = emptySafeToRead(); - private Ranges retiredRanges = Ranges.EMPTY; + private Ranges regainedRanges = Ranges.EMPTY; private final Set bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private RejectBefore rejectBefore; @@ -406,8 +406,8 @@ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) { for (Map.Entry entry : newSafeToRead.entrySet()) { - Ranges rangeExcluded = entry.getValue().without(this.retiredRanges); - logger.info("{} is excluded from newSafeToRead because it is in the retired range", rangeExcluded); + Ranges rangeExcluded = entry.getValue().without(this.regainedRanges); + logger.info("{} is excluded from newSafeToRead because it is in the regained ranges", rangeExcluded); } } @@ -415,9 +415,9 @@ final void unsafeSetSafeToRead(NavigableMap newSafeToRead) this.safeToRead = newSafeToRead; } - final void unsafeAddToRetiredRanges(Ranges newRetiredRange) + final void unsafeAddToRegainedRanges(Ranges newRegainedRanges) { - this.retiredRanges = newRetiredRange.union(MERGE_ADJACENT, this.retiredRanges); + this.regainedRanges = newRegainedRanges.union(MERGE_ADJACENT, this.regainedRanges); } protected final void unsafeClearSafeToRead() @@ -1196,10 +1196,10 @@ final void markUnsafeToRead(Ranges ranges) } } - final void markAsRetired(Ranges ranges) + final void markAsRegained(Ranges ranges) { - execute((Empty) () -> "Mark Range As Retired", safeStore -> { - safeStore.addToRetiredRanges(ranges); + execute((Empty) () -> "Mark Range As Regained", safeStore -> { + safeStore.addToRegainedRanges(ranges); }, agent); } diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 579c9254f7..e2e6a39c6f 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -803,7 +803,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top if (!regainedRanges.isEmpty()) { shard.store.markUnsafeToRead(regainedRanges); - shard.store.markAsRetired(regainedRanges); + shard.store.markAsRegained(regainedRanges); } // TODO (desired): only sync affected shards diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 217cd572d2..ba88a5ddf8 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -384,9 +384,9 @@ public void setSafeToRead(NavigableMap newSafeToRead) commandStore().unsafeSetSafeToRead(newSafeToRead); } - public void addToRetiredRanges(Ranges newRetiredRange) + public void addToRegainedRanges(Ranges newRegainedRanges) { - commandStore().unsafeAddToRetiredRanges(newRetiredRange); + commandStore().unsafeAddToRegainedRanges(newRegainedRanges); } public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) From 3c710de7e5fe08e43aac967f4e999b16b2421d29 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 4 Mar 2026 10:49:54 -0800 Subject: [PATCH 22/24] change failure mode --- accord-core/src/main/java/accord/local/CommandStores.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index e2e6a39c6f..4b91c2cafe 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -988,7 +988,10 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore } if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) - throw new IllegalStateException(); + { + logger.info("Reject query as it tries to query {} in more than one CommandStore", mapReduceConsume.keys().toRanges().toString()); + return AsyncChains.failure(null); + } return chain == null ? AsyncChains.success(null) : chain; } From d5db5f5ee4f6f3f4f6842944c1e83cb247bc413d Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 4 Mar 2026 17:15:02 -0800 Subject: [PATCH 23/24] changed error handling --- accord-core/src/main/java/accord/local/CommandStores.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 4b91c2cafe..5a38f326cd 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -38,6 +38,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.topology.TopologyException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -988,10 +989,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore } if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) - { - logger.info("Reject query as it tries to query {} in more than one CommandStore", mapReduceConsume.keys().toRanges().toString()); - return AsyncChains.failure(null); - } + return AsyncChains.failure(new RuntimeException("Tried to query more than one CommandStore for the same range")); return chain == null ? AsyncChains.success(null) : chain; } From fb12db945457ac1683933d3f12c6d975ac83f620 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Mon, 9 Mar 2026 10:45:11 -0700 Subject: [PATCH 24/24] change casing --- .../src/main/java/accord/local/CommandStores.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 5a38f326cd..d073992e38 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -158,18 +158,18 @@ public StoreIterator select(Snapshot snapshot) public static class StoreIterator { - public final Iterator StoreIterator; + public final Iterator storeIterator; public final Long minEpoch; - public StoreIterator(Iterator StoreIterator, @Nullable Long minEpoch) + public StoreIterator(Iterator storeIterator, @Nullable Long minEpoch) { - this.StoreIterator = StoreIterator; + this.storeIterator = storeIterator; this.minEpoch = (minEpoch == null) ? 0L : minEpoch; } public Iterator getStoreIterator() { - return StoreIterator; + return storeIterator; } public Long getMinEpoch() { @@ -975,8 +975,8 @@ public AsyncChain mapReduce(IntStream commandStoreIds, MapReduceCommandSt public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStores mapReduceConsume) { Snapshot snapshot = current; - StoreIterator StoreIterator = selector.select(snapshot); - Iterator stores = StoreIterator.getStoreIterator(); + StoreIterator storeIterator = selector.select(snapshot); + Iterator stores = storeIterator.getStoreIterator(); AsyncChain chain = null; LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length); while (stores.hasNext()) @@ -988,7 +988,7 @@ public AsyncChain mapReduce(StoreSelector selector, MapReduceCommandStore chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next; } - if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), StoreIterator.getMinEpoch())) + if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), storeIterator.getMinEpoch())) return AsyncChains.failure(new RuntimeException("Tried to query more than one CommandStore for the same range")); return chain == null ? AsyncChains.success(null) : chain;