Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions accord-core/src/main/java/accord/local/CommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ CommandStore create(int id,
* But they may still be ordered for other key ranges they participate in.
*/
private NavigableMap<Timestamp, Ranges> safeToRead = emptySafeToRead();
private Ranges regainedRanges = Ranges.EMPTY;
private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>());
@Nullable private RejectBefore rejectBefore;

Expand Down Expand Up @@ -401,10 +402,24 @@ protected void unsafeAcceptRequests(Ranges accept)
*/
final void unsafeSetSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead)
{
if (newSafeToRead != null)
{
for (Map.Entry<Timestamp, Ranges> entry : newSafeToRead.entrySet())
{
Ranges rangeExcluded = entry.getValue().without(this.regainedRanges);
logger.info("{} is excluded from newSafeToRead because it is in the regained ranges", rangeExcluded);
}
}

node.updateStamp();
this.safeToRead = newSafeToRead;
}

final void unsafeAddToRegainedRanges(Ranges newRegainedRanges)
{
this.regainedRanges = newRegainedRanges.union(MERGE_ADJACENT, this.regainedRanges);
}

protected final void unsafeClearSafeToRead()
{
unsafeSetSafeToRead(null);
Expand Down Expand Up @@ -1181,6 +1196,13 @@ final void markUnsafeToRead(Ranges ranges)
}
}

final void markAsRegained(Ranges ranges)
{
execute((Empty) () -> "Mark Range As Regained", safeStore -> {
safeStore.addToRegainedRanges(ranges);
}, agent);
}

public final DataStore unsafeGetDataStore()
{
return dataStore;
Expand Down
104 changes: 92 additions & 12 deletions accord-core/src/main/java/accord/local/CommandStores.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.HashMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand All @@ -37,6 +38,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import accord.topology.TopologyException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
Expand Down Expand Up @@ -109,9 +111,9 @@ class StandardLatentStoreSelector implements LatentStoreSelector
@Override
public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants<?> participants)
{
return snapshot -> StoreFinder.find(snapshot, participants)
return snapshot -> new StoreIterator(StoreFinder.find(snapshot, participants)
.filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch())
.iterator(snapshot);
.iterator(snapshot), txnId.epoch());
}
}

Expand All @@ -124,7 +126,7 @@ static LatentStoreSelector standard()
public interface StoreSelector extends LatentStoreSelector
{
default StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Participants<?> participants) { return this; }
Iterator<CommandStore> select(Snapshot snapshot);
StoreIterator select(Snapshot snapshot);
}

public static class IncludingSpecificStoreSelector implements StoreSelector
Expand All @@ -143,14 +145,35 @@ public StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, Particip
StoreFinder finder = StoreFinder.find(snapshot, participants)
.filter(snapshot, participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch());
finder.set(snapshot.byId.get(storeId));
return finder.iterator(snapshot);
return new StoreIterator(finder.iterator(snapshot), txnId.epoch());
};
}

@Override
public Iterator<CommandStore> select(Snapshot snapshot)
public StoreIterator select(Snapshot snapshot)
{
return Collections.singletonList(snapshot.byId(storeId)).iterator();
return new StoreIterator(Collections.singletonList(snapshot.byId(storeId)).iterator(), null);
}
}

public static class StoreIterator
{
public final Iterator<CommandStore> storeIterator;
public final Long minEpoch;

public StoreIterator(Iterator<CommandStore> storeIterator, @Nullable Long minEpoch)
{
this.storeIterator = storeIterator;
this.minEpoch = (minEpoch == null) ? 0L : minEpoch;
}

public Iterator<CommandStore> getStoreIterator()
{
return storeIterator;
}

public Long getMinEpoch() {
return minEpoch;
}
}

Expand All @@ -175,7 +198,7 @@ public static StoreSelector selector(Unseekables<?> unseekables, long minEpoch,
return snapshot -> {
StoreFinder finder = StoreFinder.find(snapshot, unseekables);
finder.filter(snapshot, unseekables, minEpoch, maxEpoch);
return finder.iterator(snapshot);
return new StoreIterator(finder.iterator(snapshot), minEpoch);
};
}

Expand Down Expand Up @@ -577,6 +600,7 @@ public static class Snapshot extends Journal.TopologyUpdate implements Iterable<
final Int2IntHashMap byId;
private final int[] indexForRange;
final SearchableRangeList lookupByRange;
final Map<Integer, LargeBitSet> overlappingCommandStores;

public Snapshot(ShardHolder[] shards, Topology local, Topology global)
{
Expand Down Expand Up @@ -625,6 +649,22 @@ class RangeAndIndex
indexForRange[i] = rangesAndIndexes[i].index;
}
lookupByRange = SearchableRangeList.build(ranges);

overlappingCommandStores = new HashMap<>();
for (int i = 0; i < shards.length ; i++)
overlappingCommandStores.put(i, new LargeBitSet(shards.length));

for (int i = 0; i < shards.length; i++)
{
for (int j = i; j < shards.length; j++)
{
if (!shards[i].ranges().all().slice(shards[j].ranges().all(), Minimal).isEmpty())
{
overlappingCommandStores.get(i).set(j);
overlappingCommandStores.get(j).set(i);
}
}
}
}

// This method exists to ensure we do not hold references to command stores
Expand Down Expand Up @@ -759,6 +799,14 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top
shard.store.epochUpdateHolder.remove(epoch, shard.ranges, removeRanges);
bootstrapUpdates.add(shard.store.unbootstrap(epoch, removeRanges));
}

Ranges regainedRanges = shard.ranges().all().slice(added, Minimal);
if (!regainedRanges.isEmpty())
{
shard.store.markUnsafeToRead(regainedRanges);
shard.store.markAsRegained(regainedRanges);
}

// TODO (desired): only sync affected shards
Ranges ranges = shard.ranges().currentRanges();
// ranges can be empty when ranges are lost or consolidated across epochs.
Expand Down Expand Up @@ -852,7 +900,7 @@ public void forAllUnsafe(Consumer<CommandStore> forEach)

public AsyncChain<Void> forAll(String reason, Consumer<SafeCommandStore> forEach)
{
return mapReduce(snapshot -> Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), new MapReduceCommandStores<>(RoutingKeys.EMPTY)
return mapReduce(snapshot -> new StoreIterator(Stream.of(snapshot.shards).map(shard -> shard.store).iterator(), null), new MapReduceCommandStores<>(RoutingKeys.EMPTY)
{
@Override public Void reduce(Void o1, Void o2) { return null; }
@Override public TxnId primaryTxnId() { return null; }
Expand Down Expand Up @@ -921,27 +969,59 @@ public <O> Cancellable mapReduceConsume(IntStream commandStoreIds, MapReduceCons

public <O> AsyncChain<O> mapReduce(IntStream commandStoreIds, MapReduceCommandStores<?, O> mapReduce)
{
return mapReduce(snapshot -> commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce);
return mapReduce(snapshot -> new StoreIterator(commandStoreIds.mapToObj(snapshot::byId).iterator(), null), mapReduce);
}

public <O> AsyncChain<O> mapReduce(StoreSelector selector, MapReduceCommandStores<?, O> mapReduceConsume)
{
Snapshot snapshot = current;
Iterator<CommandStore> stores = selector.select(snapshot);
StoreIterator storeIterator = selector.select(snapshot);
Iterator<CommandStore> stores = storeIterator.getStoreIterator();
AsyncChain<O> chain = null;
LargeBitSet bitSet = new LargeBitSet(snapshot.shards.length);
while (stores.hasNext())
{
AsyncChain<O> next = mapReduceConsume.applyAsync(stores.next());
CommandStore store = stores.next();
bitSet.set(snapshot.byId.get(store.id()));
AsyncChain<O> next = mapReduceConsume.applyAsync(store);
if (next != null)
chain = chain != null ? AsyncChains.reduce(chain, next, mapReduceConsume) : next;
}

if (!checkQueryDisjointRangesAcrossCommandStores(snapshot.overlappingCommandStores, snapshot.shards, bitSet, mapReduceConsume.keys().toRanges(), storeIterator.getMinEpoch()))
return AsyncChains.failure(new RuntimeException("Tried to query more than one CommandStore for the same range"));

return chain == null ? AsyncChains.success(null) : chain;
}

public static boolean checkQueryDisjointRangesAcrossCommandStores(Map<Integer, LargeBitSet> overlappingCommandStores, ShardHolder[] shards, LargeBitSet commandStoresSeen, Ranges queryRange, Long minEpoch)
{
// Check that we are not querying two command stores for the same range
for (Map.Entry<Integer, LargeBitSet> entry : overlappingCommandStores.entrySet())
{
Ranges range = Ranges.EMPTY;
LargeBitSet overlappingCommandStore = entry.getValue();
for (int i = overlappingCommandStore.firstSetBit(); i >= 0 ; i = overlappingCommandStore.nextSetBit(i + 1, -1))
{
if (commandStoresSeen.get(i))
{
Ranges touchedKeys = queryRange.slice(shards[i].ranges().allSince(minEpoch), Minimal);

if (!range.slice(touchedKeys, Minimal).isEmpty())
return false;

range = range.with(touchedKeys.toRanges());
}
}
}

return true;
}

public <O> O mapReduceUnsafe(StoreSelector selector, Function<CommandStore, O> map, BiFunction<O, O, O> reduce, O accumulator)
{
Snapshot snapshot = current;
Iterator<CommandStore> stores = selector.select(snapshot);
Iterator<CommandStore> stores = selector.select(snapshot).getStoreIterator();
while (stores.hasNext())
{
O next = map.apply(stores.next());
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/main/java/accord/local/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ public <R> CoordinationAdapter<R> coordinationAdapter(TxnId txnId, Kind kind)
public AsyncChain<Void> updateMinHlc(long minHlc)
{
// TODO (required): command stores that are not ready due to bootstrap need to refresh their min HLC on bootstrap completion
StoreSelector selector = snapshot -> Stream.of(snapshot.shards).map(sh -> sh.store).iterator();
StoreSelector selector = snapshot -> new CommandStores.StoreIterator(Stream.of(snapshot.shards).map(sh -> sh.store).iterator(), null);
return commandStores().mapReduce(selector, new MapReduceCommandStores<>(RoutingKeys.EMPTY)
{
@Override public Void reduce(Void o1, Void o2) { return null; }
Expand Down
5 changes: 5 additions & 0 deletions accord-core/src/main/java/accord/local/SafeCommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ public void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead)
commandStore().unsafeSetSafeToRead(newSafeToRead);
}

public void addToRegainedRanges(Ranges newRegainedRanges)
{
commandStore().unsafeAddToRegainedRanges(newRegainedRanges);
}

public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
{
commandStore().unsafeSetRangesForEpoch(rangesForEpoch);
Expand Down
17 changes: 17 additions & 0 deletions accord-core/src/main/java/accord/topology/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,23 @@ public void forEach(Consumer<Shard> forEach)
forEach.accept(shards[i]);
}

public static Map<Id, Ranges> computeNodeAdditions(Topology current, Topology next)
{
Map<Id, Ranges> additions = new HashMap<>();
for (Id node : next.nodes())
{
Ranges prev = current.rangesForNode(node);
if (prev == null) prev = Ranges.EMPTY;

Ranges added = next.rangesForNode(node).without(prev);
if (added.isEmpty())
continue;

additions.put(node, added);
}
return additions;
}

public SortedArrayList<Id> nodes()
{
return nodes;
Expand Down
54 changes: 54 additions & 0 deletions accord-core/src/main/java/accord/topology/TopologyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package accord.topology;

import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +28,7 @@
import java.util.function.Supplier;
import javax.annotation.Nullable;

import accord.primitives.Routables;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,6 +54,8 @@
import accord.utils.async.AsyncResults;
import accord.utils.async.NestedAsyncResult;

import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;

/**
* Manages topology state changes and update bookkeeping
*
Expand Down Expand Up @@ -308,6 +312,56 @@ public void reportTopology(Topology topology)
updateActive();
}

public static class RegainingEpochRange
{
public final long epoch;
public final Ranges range;

public RegainingEpochRange(long epoch, Ranges range)
{
this.epoch = epoch;
this.range = range;
}

public long epoch()
{
return epoch;
}

public Ranges range()
{
return range;
}
}

@Nullable
public RegainingEpochRange epochAndRangeToBeRetired(Topology curr, Topology next)
{
Map<Id, Ranges> additions = Topology.computeNodeAdditions(curr, next);
long greatestEpoch = -1;
Ranges range = Ranges.EMPTY;

synchronized (this)
{
for (Map.Entry<Id, Ranges> entry : additions.entrySet())
{
for (ActiveEpoch activeEpoch : this.active) {
if (activeEpoch.all().rangesForNode(entry.getKey()).intersects(entry.getValue()) && greatestEpoch < activeEpoch.epoch())
{
greatestEpoch = activeEpoch.epoch();
range = range.union(MERGE_ADJACENT, activeEpoch.all().rangesForNode(entry.getKey()).slice(entry.getValue(), Routables.Slice.Minimal));
break;
}
}
}
}

if (greatestEpoch != -1)
return new RegainingEpochRange(greatestEpoch, range);

return null;
}

private final AtomicBoolean updatingActive = new AtomicBoolean();
private void updateActive()
{
Expand Down
Loading