Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/B3.Exchange.Gateway/EntryPointListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ private void ConstructAndStartSession(NetworkStream stream, Socket sock, byte[]?
persistedState: persistedState,
resumeAsNegotiated: resumeAsNegotiated,
persistedMaxOrderRatePerSecond: persistedMaxOrderRatePerSecond,
onIdentityChanged: onIdentityChanged);
onIdentityChanged: onIdentityChanged,
onTakeOverRollback: s => _registry.Register(s));
_registry.Register(session);
lock (_lock) _sessions.Add(session);
session.Start();
Expand Down
7 changes: 6 additions & 1 deletion src/B3.Exchange.Gateway/FixpSession.Lifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,16 @@ private void CloseLocked(string reason, CloseKind kind)
}
}
}
else
else if (kind != CloseKind.SessionTakeOver)
{
// Host shutdown / transport error: keep journal + ring;
// persist final snapshot so the reconnecting peer sees the
// correct LastIncomingSeqNo / OutboundMsgSeqNum.
// SessionTakeOver is intentionally excluded: the new session
// already persisted its snapshot (with the higher SessionVerID)
// before calling Close on this evicted session. Saving here
// would overwrite the new snapshot with this session's older
// SessionVerID, rolling durable state backwards.
SaveStateSnapshotSafe();
}
try { _onClosed?.Invoke(this, reason); }
Expand Down
26 changes: 26 additions & 0 deletions src/B3.Exchange.Gateway/FixpSession.Negotiate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan<byte> fixedBlock, ReadOnlySp
// no live session to own it — the old session's TCP will time out
// naturally and call OnSessionClosed via the TransportError path.
FixpSession? evictedByTakeOver = null;
ulong evictedVerId = 0UL;
if (claim == SessionClaimRegistry.ClaimResult.DuplicateConnection)
{
// Session takeover — the peer crashed and reconnected before our
Expand All @@ -146,6 +147,7 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan<byte> fixedBlock, ReadOnlySp
"session {ConnectionId} taking over sessionId={SessionId} from {OldConnectionId} (new verId={NewVerId})",
ConnectionId, req.SessionId, oldSession.ConnectionId, req.SessionVerId);
evictedByTakeOver = oldSession;
evictedVerId = oldSession.SessionVerId;
}
}
if (claim != SessionClaimRegistry.ClaimResult.Accepted)
Expand Down Expand Up @@ -186,6 +188,19 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan<byte> fixedBlock, ReadOnlySp
// can be retried by the peer (same SessionVerID, same TCP
// connection or a new one); without this the second attempt
// would hit DUPLICATE_SESSION_CONNECTION.
//
// For a takeover, also restore the evicted session's claim so
// the old TCP (still alive) remains the authoritative owner
// rather than being left unclaimed. TryRestoreTakeOver is a
// no-op (returns false) if a concurrent racing takeover has
// already won the registry in the window between
// TryForceTakeOver and here.
var takeOverRolledBack = false;
if (evictedByTakeOver is not null)
{
takeOverRolledBack = _claims.TryRestoreTakeOver(req.SessionId,
this, evictedByTakeOver, evictedVerId);
}
_claims.Release(req.SessionId, this);
_claimedSessionId = 0;
SessionId = 0;
Expand All @@ -194,6 +209,17 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan<byte> fixedBlock, ReadOnlySp
// Issue #485: roll back Identity to the pending format so
// OnSessionClosed doesn't evict ownership for a real session.
RollbackIdentity(pendingIdentity);
// Issue #492: UpdateIdentityAfterNegotiate above overwrote the
// evicted session's SessionRegistry entry with this (failed) new
// session; RollbackIdentity then removed it, leaving the old
// owner unroutable. Re-register the evicted session AFTER the
// rollback so routing is restored — but only when the claim was
// actually handed back (takeOverRolledBack), to stay in lock-step
// with the claim registry and avoid clobbering a racing takeover.
if (takeOverRolledBack && evictedByTakeOver is not null)
{
_onTakeOverRollback?.Invoke(evictedByTakeOver);
}
// No spec-defined "internal error" reject code; UNSPECIFIED
// (0) is the closest match. The peer will retry.
var rejectFrame = new byte[NegotiateRejectEncoder.Total];
Expand Down
15 changes: 14 additions & 1 deletion src/B3.Exchange.Gateway/FixpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ public sealed partial class FixpSession : IAsyncDisposable
/// </summary>
private readonly Action<FixpSession, ContractsSessionId, ContractsSessionId>? _onIdentityChanged;

/// <summary>
/// Issue #492: re-registers an evicted-by-takeover session in the
/// <see cref="SessionRegistry"/> when a takeover is rolled back after
/// the new session already overwrote the old session's registry entry
/// (e.g. snapshot persistence failed mid-Negotiate). Wired to
/// <c>SessionRegistry.Register</c> so the old, still-live owner keeps
/// receiving routed execution reports. Invoked only in lock-step with a
/// successful <see cref="SessionClaimRegistry.TryRestoreTakeOver"/>.
/// </summary>
private readonly Action<FixpSession>? _onTakeOverRollback;

public long ConnectionId { get; }

/// <summary>
Expand Down Expand Up @@ -369,7 +380,8 @@ public FixpSession(long connectionId, uint enteringFirm, uint sessionId,
B3.Exchange.Gateway.Persistence.FixpSessionStateSnapshot? persistedState = null,
bool resumeAsNegotiated = false,
int? persistedMaxOrderRatePerSecond = null,
Action<FixpSession, ContractsSessionId, ContractsSessionId>? onIdentityChanged = null)
Action<FixpSession, ContractsSessionId, ContractsSessionId>? onIdentityChanged = null,
Action<FixpSession>? onTakeOverRollback = null)
{
ArgumentNullException.ThrowIfNull(logger);
ConnectionId = connectionId;
Expand All @@ -392,6 +404,7 @@ public FixpSession(long connectionId, uint enteringFirm, uint sessionId,
_options = options ?? FixpSessionOptions.Default;
_options.Validate();
_onIdentityChanged = onIdentityChanged;
_onTakeOverRollback = onTakeOverRollback;
if (persistedMaxOrderRatePerSecond is < 0)
throw new ArgumentOutOfRangeException(nameof(persistedMaxOrderRatePerSecond));
_outboundJournal = outboundJournal;
Expand Down
32 changes: 32 additions & 0 deletions src/B3.Exchange.Gateway/SessionClaimRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,36 @@ public void Release(uint sessionId, object claimToken, bool forgetLastVersion =
}
}
}

/// <summary>
/// Rolls back a failed <see cref="TryForceTakeOver"/> by atomically
/// restoring the evicted session's claim and reverting
/// <c>_lastSessionVerId</c> to the pre-takeover value. Only acts
/// when <paramref name="newToken"/> still holds the active claim
/// (guards against a concurrent takeover winning the registry
/// between the failed persist and this restore call). Safe to call
/// multiple times; idempotent when the claim has already moved on.
/// Returns <c>true</c> when the claim was actually restored to
/// <paramref name="oldToken"/>, so the caller can keep the
/// <see cref="SessionRegistry"/> entry in lock-step; returns
/// <c>false</c> when a concurrent takeover already moved the claim on
/// (in which case the caller must not roll the registry back either).
/// </summary>
public bool TryRestoreTakeOver(uint sessionId, object newToken,
object oldToken, ulong oldVerId)
{
ArgumentNullException.ThrowIfNull(newToken);
ArgumentNullException.ThrowIfNull(oldToken);
lock (_lock)
{
if (_activeClaims.TryGetValue(sessionId, out var current) &&
ReferenceEquals(current, newToken))
{
_activeClaims[sessionId] = oldToken;
_lastSessionVerId[sessionId] = oldVerId;
return true;
}
}
return false;
}
}
Loading