Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions accord-core/src/main/java/accord/api/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ default SystemEventListener systemEvents()
*
* The number of entries before the candidate prune point that we require before we try to prune.
* This only works to reduce the time wasted pruning when there is limited benefit.
*
* TODO (desired): come up with a better pruning method
*/
int cfkPruneInterval();

Expand All @@ -95,9 +97,14 @@ default SystemEventListener systemEvents()
* Controls pruning of MaxConflicts
*
* Every n updates, max conflicts is pruned to the delta, where n is the value returned by this method
*
* TODO (expected): this isn't a very clean way to prune max conflicts - should be done by size of collection / update rate
*/
long maxConflictsPruneInterval();

default boolean softReject(long unappliedCount, long maxUnappliedAge, long cumulativeUnappliedAge) { return false; }
default boolean hardReject(int softRejectCount, int totalCount) { return false; }

/**
* Create an empty transaction that Accord can use for its own internal transactions.
*/
Expand Down
2 changes: 2 additions & 0 deletions accord-core/src/main/java/accord/api/AsyncExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import accord.utils.async.AsyncChain;
import accord.utils.async.Cancellable;

// TODO (required): consistent RejectedExecutionException handling
public interface AsyncExecutor extends Executor
{
// unlike execute, throws no exceptions, nor will not wrap the runnable
Expand All @@ -38,6 +39,7 @@ default Cancellable execute(RunOrFail run)

// Depending on this implementation this method may queue-jump, i.e. task submission order is not guaranteed.
// Make sure this is semantically safe at all call-sites.
// TODO (required): RejectedExecutionException?
default void executeMaybeImmediately(Runnable run)
{
if (!tryExecuteImmediately(run))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface CoordinatorEventListener
default void onFailed(Throwable failure, TxnId txnId, Participants<?> participants, Coordination coordination) {}

default void onPreAccepted(TxnId txnId) {}
default void onAccepted(TxnId txnId, Ballot ballot) {}
default void onAccepted(TxnId txnId, Ballot ballot, @Nullable ExecutePath path) {}
default void onExecuting(TxnId txnId, @Nullable Ballot ballot, Deps deps, @Nullable ExecutePath path) {}
default void onExecuted(TxnId txnId, Ballot ballot) {}
default void onDurable(Durability durability, @Nullable Ballot ballot, TxnId txnId) {}
Expand Down
42 changes: 40 additions & 2 deletions accord-core/src/main/java/accord/api/DataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

package accord.api;

import java.util.Map;

import accord.local.CommandStore;
import accord.local.Node;
import accord.local.RedundantBefore;
import accord.local.SafeCommandStore;
import accord.primitives.Ranges;
import accord.primitives.SyncPoint;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.UnhandledEnum;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;

Expand Down Expand Up @@ -138,12 +142,46 @@ static FetchResult failure(Throwable t)
* If RequestKind#Sync is used, fetches only a minimal subset of data, assuming there has been some data locally
* (it is to up store's implementer to know how to achieve this)
*/
FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind);
default FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind)
{
switch (kind)
{
default: throw UnhandledEnum.unknown(kind);
case Sync: return sync(node, safeStore, ranges, syncPoint, callback);
case Image: return image(node, safeStore, ranges, syncPoint, callback);
}
}

FetchResult image(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback);

default FetchResult sync(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback)
{
return sync(node, safeStore, Map.of(syncPoint.syncId, ranges), callback);
}

// TODO (desired): standardise on using only TxnId bounds for image/sync, not a full SyncPoint;
// leave it to the durability service to ensure the relevant durability via sync point or otherwise
default FetchResult sync(Node node, SafeCommandStore safeStore, Map<TxnId, Ranges> atLeast, FetchRanges callback)
{
throw new UnsupportedOperationException();
}

/**
* Logical fsync-like operation: anything within the provided ranges written to the store prior to the
* invocation of this method must be durable once the AsyncResult completes successfully. That is, a restart of the node must
* restore the DataStore to a state on or after the point at which snapshot was invoked.
*
* TODO (desired): clunky to pass integer flags around; is there a neater implementation-agnostic alternative?
*/
default void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore reportOnSuccess, int flags)
{
ensureDurable(commandStore, reportOnSuccess, flags);
}

/**
* Logical fsync-like operation: anything written to the store prior to the invocation of this method
* must be durable once the AsyncResult completes successfully. That is, a restart of the node must
* restore the DataStore to a state on or after the point at which snapshot was invoked.
*/
void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore reportOnSuccess);
void ensureDurable(CommandStore commandStore, RedundantBefore reportOnSuccess, int flags);
}
20 changes: 10 additions & 10 deletions accord-core/src/main/java/accord/api/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ enum Load
MINIMAL_WITH_DEPS
}

void open(Node node);
void start(Node node);

Command loadCommand(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore);
Expand All @@ -61,25 +62,24 @@ enum Load
Command.MinimalWithDeps loadMinimalWithDeps(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore);
void saveCommand(int store, CommandUpdate value, Runnable onFlush);

List<? extends TopologyUpdate> replayTopologies();
List<? extends TopologyUpdate> loadTopologies();
void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);

void purge(CommandStores commandStores, EpochSupplier minEpoch);

/**
* Replays all messages from journal to rehydrate CommandStores state. Returns whether it has seen (and ignored)
* any exceptions during replay.
*/
boolean replay(CommandStores commandStores);

RedundantBefore loadRedundantBefore(int store);
NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store);
NavigableMap<Timestamp, Ranges> loadSafeToRead(int store);
CommandStores.RangesForEpoch loadRangesForEpoch(int store);
void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush);

Persister<DurableBefore, DurableBefore> durableBeforePersister();

void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush);
void purge(CommandStores commandStores, EpochSupplier minEpoch);

/**
* Replays all messages from journal to rehydrate CommandStores state. Returns whether it has seen (and ignored)
* any exceptions during replay.
*/
boolean replay(CommandStores commandStores, Object param);

class TopologyUpdate
{
Expand Down
20 changes: 18 additions & 2 deletions accord-core/src/main/java/accord/api/LocalListeners.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,26 @@ public TxnListener(TxnId waiter, TxnId waitingOn, SaveStatus awaitingStatus)
this.waitingOn = waitingOn;
this.awaitingStatus = awaitingStatus;
}

@Override
public boolean equals(Object that)
{
return that instanceof TxnListener && equals((TxnListener) that);
}

public boolean equals(TxnListener that)
{
return this.waiter.equals(that.waiter) && this.waitingOn.equals(that.waitingOn) && this.awaitingStatus == that.awaitingStatus;
}

@Override
public int hashCode()
{
throw new UnsupportedOperationException();
}
}

Iterable<TxnListener> txnListeners();
Iterable<TxnId> txnListenersWaitingOn();
Iterable<Registered> complexListeners();

Iterable<TxnId> txnsWaitingOn(SaveStatus saveStatus);
}
4 changes: 2 additions & 2 deletions accord-core/src/main/java/accord/api/ProgressLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public static BlockedUntil max(BlockedUntil a, BlockedUntil b)
/**
* Record an updated local status for the transaction, to clear any waiting state it satisfies.
*/
void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after, boolean force);
void update(SafeCommandStore safeStore, Command before, Command after, boolean force);

/**
* Process a remote asynchronous callback.
Expand Down Expand Up @@ -232,7 +232,7 @@ default void maybeNotify() {}

class NoOpProgressLog implements ProgressLog
{
@Override public void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after, boolean force) {}
@Override public void update(SafeCommandStore safeStore, Command before, Command after, boolean force) {}
@Override public void remoteCallback(SafeCommandStore safeStore, SafeCommand safeCommand, SaveStatus remoteStatus, int callbackId, Node.Id from) {}
@Override public void waiting(BlockedUntil blockedUntil, SafeCommandStore safeStore, SafeCommand blockedBy, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants, StoreParticipants participants) {}
@Override public void invalidIfUncommitted(TxnId txnId) {}
Expand Down
9 changes: 5 additions & 4 deletions accord-core/src/main/java/accord/api/ProtocolModifiers.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,22 @@ public static class Toggles
public static boolean recoveryAwaitsSupersedingSyncPoints() { return recoveryAwaitsSupersedingSyncPoints; }
public static void setRecoveryAwaitsSupersedingSyncPoints(boolean newRecoveryAwaitsSupersedingSyncPoints) { recoveryAwaitsSupersedingSyncPoints = newRecoveryAwaitsSupersedingSyncPoints; }

// TODO (required): default this to false once released support via recoveryAwaitsSupersedingSyncPoints
private static boolean syncPointsTrackUnstableMediumPathDependencies = true;
private static boolean syncPointsTrackUnstableMediumPathDependencies = false;
public static boolean syncPointsTrackUnstableMediumPathDependencies() { return syncPointsTrackUnstableMediumPathDependencies; }
public static void setSyncPointsTrackUnstableMediumPathDependencies(boolean newSyncPointsTrackUnstableMediumPathDependencies) { syncPointsTrackUnstableMediumPathDependencies = newSyncPointsTrackUnstableMediumPathDependencies; }

private static boolean recoverPartialAcceptPhaseIfNoFastPath = false;
public static boolean recoverPartialAcceptPhaseIfNoFastPath() { return recoverPartialAcceptPhaseIfNoFastPath; }
public static void setRecoverPartialAcceptPhaseIfNoFastPath(boolean newSyncPointsRecoverPartialAcceptPhase) {recoverPartialAcceptPhaseIfNoFastPath = newSyncPointsRecoverPartialAcceptPhase; }

private static boolean recoverReads = false;
public static boolean recoverReads() { return recoverReads; }
public static void setRecoverReads(boolean newRecoverReads) { recoverReads = newRecoverReads; }

private static boolean filterDuplicateDependenciesFromAcceptReply = true;
public static boolean filterDuplicateDependenciesFromAcceptReply() { return filterDuplicateDependenciesFromAcceptReply; }
public static void setFilterDuplicateDependenciesFromAcceptReply(boolean newFilterDuplicateDependenciesFromAcceptReply) { filterDuplicateDependenciesFromAcceptReply = newFilterDuplicateDependenciesFromAcceptReply; }



public enum SendStableMessages { TO_ALL, FOR_READS, FOR_READS_OR_NONE_IF_FASTEXEC}
private static SendStableMessages sendStableMessages = FOR_READS_OR_NONE_IF_FASTEXEC;
public static void setSendStableMessages(SendStableMessages newSendStableMessages) { sendStableMessages = newSendStableMessages; }
Expand Down
3 changes: 2 additions & 1 deletion accord-core/src/main/java/accord/api/ViolationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import accord.primitives.Timestamp;
import accord.primitives.TxnId;

import static accord.utils.Functions.alwaysFalse;
import static accord.utils.Invariants.illegalState;

public interface ViolationHandler
Expand All @@ -57,7 +58,7 @@ static String timestampViolationMessage(@Nullable SafeCommandStore safeStore, Co
{
message += ". RedundantBefore={";
Participants<?> participants = Participants.merge(Participants.merge(otherParticipants, (Participants)otherRoute), command.route());
message += safeStore.redundantBefore().foldlWithBounds(participants, (b, m, s, e) -> (m.isEmpty() ? "[" : ", [") + s + ',' + e + "]:" + b, "", ignore -> false) + '}';
message += safeStore.redundantBefore().foldlWithBounds(participants, (b, m, s, e) -> (m.isEmpty() ? "[" : ", [") + s + ',' + e + "]:" + b, "", alwaysFalse()) + '}';
}
return message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ final void onPreAcceptedInNewEpoch(long latestEpoch)
}
catch (TopologyException t)
{
finishWithFailureOverride(t);
finishOnFailure(t);
return;
}
if (mismatch == null) onPreAccepted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,30 @@ void finishWithSuccess(Result success)
finishAndInvokeCallback(success, null);
}

void finishWithFailureOverride(Throwable failure)
protected void finishWithFailure(Throwable failure)
{
finishAndInvokeCallback(null, FailureAccumulator.append(failure, this.failure()));
finishAndInvokeCallback(null, failure);
}

void finishOnExaustion()
protected void finishOnFailure(Throwable failure)
{
finishOnFailure();
finishWithFailure(FailureAccumulator.append(failure, this.failure()));
}

void finishOnFailure()
{
finishAndInvokeCallback(null, FailureAccumulator.fail(node.agent(), this.failure(), txnId, Route.tryCastToRoute(scope())));
finishWithFailure(FailureAccumulator.fail(node.agent(), this.failure(), txnId, Route.tryCastToRoute(scope())));
}

void finishOnExaustion()
{
finishOnFailure();
}

void awaitEpochExactToFinish(long epoch, Runnable runnable)
{
setFinishing();
node.withEpochExact(epoch, executor, (ignore, failure) -> finishWithFailureOverride(failure), Rethrowable::rethrowable, () -> {
node.withEpochExact(epoch, executor, (ignore, failure) -> finishOnFailure(failure), Rethrowable::rethrowable, () -> {
runnable.run();
Invariants.require(isDone(), "%s", this);
});
Expand All @@ -143,7 +148,7 @@ void awaitEpochExactToFinish(long epoch, Runnable runnable)
void awaitEpochAtLeastToFinish(long epoch, Runnable runnable)
{
setFinishing();
node.withEpochAtLeast(epoch, executor, (ignore, failure) -> finishWithFailureOverride(failure), Rethrowable::rethrowable, () -> {
node.withEpochAtLeast(epoch, executor, (ignore, failure) -> finishOnFailure(failure), Rethrowable::rethrowable, () -> {
runnable.run();
Invariants.require(isDone(), "%s", this);
});
Expand All @@ -153,7 +158,7 @@ void awaitToFinish(AsyncChain<?> await)
{
setFinishing();
await.begin((success, fail) -> {
if (fail != null) finishWithFailureOverride(fail);
if (fail != null) finishOnFailure(fail);
else Invariants.require(isDone(), "%s", this);
});
}
Expand Down Expand Up @@ -320,7 +325,7 @@ public boolean abort()
if (isDone())
return false;

finishWithFailureOverride(Aborted.aborted(txnId, Route.tryCastToRoute(scope())));
finishOnFailure(Aborted.aborted(txnId, Route.tryCastToRoute(scope())));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void retry()
@Override
void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
{
finishWithFailureOverride(mismatch);
finishOnFailure(mismatch);
}

@Override
Expand All @@ -175,7 +175,7 @@ void onPreAccepted()
try { topologies = node.topology().active().reselect(this.topologies, QuorumEpochIntersections.preaccept.include, scope, executeAtEpoch, executeAtEpoch, LIVE, Owned); }
catch (Throwable t)
{
finishWithFailureOverride(t);
finishOnFailure(t);
return;
}
CoordinationFlags flags = oks.foldlNonNull((d, k, v, out) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void onFailureInternal(Node.Id from, int fromIndex, Throwable failure)
@Override
void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
{
finishWithFailureOverride(mismatch);
finishOnFailure(mismatch);
}

@Override
Expand All @@ -136,7 +136,7 @@ long executeAtEpoch()
@Override
void onPreAccepted()
{
finishAndInvokeCallback(maxConflict, null);
finishWithSuccess(maxConflict);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import accord.topology.TopologyMismatch;
import accord.utils.SortedListMap;

import static accord.coordinate.Propose.NotAccept.proposeInvalidate;
import static accord.coordinate.Propose.NotAccept.proposeAndCommitInvalidate;
import static accord.coordinate.tracking.RequestStatus.Success;

/**
Expand Down Expand Up @@ -95,6 +95,7 @@ public void onFailureInternal(Id from, int fromIndex, Throwable failure)
case NoChange:
break;
case Failed:
proposeAndCommitInvalidate(node, executor, Ballot.ZERO, txnId, scope.homeKey(), scope, txnId, null);
finishOnFailure();
break;
case Success:
Expand All @@ -108,7 +109,7 @@ public void onSuccessInternal(Id from, int fromIndex, PreAcceptReply reply)
if (!reply.isOk())
{
// we've been preempted by a recovery coordinator; defer to it, and wait to hear any result
finishWithFailureOverride(Preempted.preempted(node.agent(), txnId, scope.homeKey()));
finishOnFailure(Preempted.preempted(node.agent(), txnId, scope.homeKey()));
}
else
{
Expand All @@ -135,12 +136,7 @@ void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
* We cannot execute the transaction because the execution epoch's topology no longer contains all of the
* participating keys/ranges, so we propose that the transaction is invalidated in its coordination epoch
*/
BiConsumer<? super T, Throwable> callback = finishAndTakeCallback();
proposeInvalidate(node, executor, node.uniqueTimestamp(Ballot::fromValues), txnId, scope.homeKey(), (outcome, failure) -> {
if (failure != null)
mismatch.addSuppressed(failure);
callback.accept(null, mismatch);
});
finishWithFailure(mismatch);
}

@Override
Expand Down
Loading