From c1079ff6c89fd5793d285c6aeeeb7fce5fc2e782 Mon Sep 17 00:00:00 2001 From: pedrotravi Date: Wed, 27 May 2026 23:07:34 +0000 Subject: [PATCH 1/3] test(gateway): add E2E test for session takeover (issue #492) Adds FixpSessionTakeoverTests with an end-to-end integration test that exercises the exact scenario described in issue #492: - Client 1 establishes a session (sessionId=1, verId=2) over TCP and remains connected. - Client 2 connects on a new transport and sends Negotiate with the same sessionId but a strictly-greater verId=3, simulating a fast reconnect after a crash before the exchange idle-timeout fires. - Asserts the exchange accepts the takeover (NegotiateResponse, not NegotiateReject) and evicts the stale session from ActiveSessions. - Asserts the claim registry records verId=3 for sessionId=1. The test confirms that PR #491 code (TryForceTakeOver path) is correct. The issue was a log-level or deployment concern on the reporter's side, not a code defect. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../FixpSessionTakeoverTests.cs | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 tests/B3.Exchange.Gateway.Tests/FixpSessionTakeoverTests.cs diff --git a/tests/B3.Exchange.Gateway.Tests/FixpSessionTakeoverTests.cs b/tests/B3.Exchange.Gateway.Tests/FixpSessionTakeoverTests.cs new file mode 100644 index 0000000..c86c614 --- /dev/null +++ b/tests/B3.Exchange.Gateway.Tests/FixpSessionTakeoverTests.cs @@ -0,0 +1,165 @@ +using B3.Exchange.Contracts; +using B3.EntryPoint.Wire; +using B3.Exchange.Gateway; +using B3.Exchange.Matching; +using Microsoft.Extensions.Logging.Abstractions; +using System.Net; +using System.Net.Sockets; +using System.Text; + +namespace B3.Exchange.Gateway.Tests; + +/// +/// Issue #492 — Session takeover end-to-end. Validates that when a live +/// session (old TCP still connected) is targeted by a Negotiate from a +/// new transport carrying a strictly-greater sessionVerId, the exchange +/// accepts the new session via +/// rather than rejecting with DUPLICATE_SESSION_CONNECTION. +/// +public class FixpSessionTakeoverTests +{ + private sealed class NoOpEngineSink : IInboundCommandSink + { + public bool EnqueueNewOrder(in NewOrderCommand cmd, SessionId session, uint enteringFirm, ulong clOrdIdValue) => true; + public bool EnqueueCancel(in CancelOrderCommand cmd, SessionId session, uint enteringFirm, ulong clOrdIdValue, ulong origClOrdIdValue) => true; + public bool EnqueueReplace(in ReplaceOrderCommand cmd, SessionId session, uint enteringFirm, ulong clOrdIdValue, ulong origClOrdIdValue) => true; + public bool EnqueueCross(in CrossOrderCommand cmd, SessionId session, uint enteringFirm) => true; + public bool EnqueueMassCancel(in MassCancelCommand cmd, SessionId session, uint enteringFirm) => true; + public void OnDecodeError(SessionId session, string error) { } + public void OnSessionClosed(SessionId session) { } + } + + private static async Task<(ushort TemplateId, byte[] Frame)> ReadOneFrameAsync( + NetworkStream stream, CancellationToken ct) + { + var hdr = new byte[EntryPointFrameReader.WireHeaderSize]; + int read = 0; + while (read < hdr.Length) + { + int n = await stream.ReadAsync(hdr.AsMemory(read), ct); + if (n == 0) throw new EndOfStreamException("connection closed"); + read += n; + } + ushort msgLen = System.Buffers.Binary.BinaryPrimitives.ReadUInt16LittleEndian(hdr.AsSpan(0, 2)); + ushort tid = System.Buffers.Binary.BinaryPrimitives.ReadUInt16LittleEndian(hdr.AsSpan(6, 2)); + int bodyLen = msgLen - EntryPointFrameReader.WireHeaderSize; + var body = new byte[bodyLen]; + int btotal = 0; + while (btotal < bodyLen) + { + int n = await stream.ReadAsync(body.AsMemory(btotal), ct); + if (n == 0) throw new EndOfStreamException("connection closed"); + btotal += n; + } + return (tid, body); + } + + private static EntryPointListener BuildListener( + FirmRegistry firms, SessionClaimRegistry claims, + NegotiationValidator negValidator, EstablishValidator estValidator) + => new( + new IPEndPoint(IPAddress.Loopback, 0), + new NoOpEngineSink(), + new SessionRegistry(), + NullLoggerFactory.Instance, + sessionOptions: new FixpSessionOptions + { + HeartbeatIntervalMs = 60_000, + IdleTimeoutMs = 60_000, + TestRequestGraceMs = 60_000, + SuspendedTimeoutMs = 0, + FirstFrameTimeoutMs = 5_000, + }, + negotiationValidator: negValidator, + sessionClaims: claims, + establishValidator: estValidator); + + /// + /// Issue #492: trading-host crashes (old TCP still alive from exchange POV), + /// restarts with an incremented sessionVerId. The exchange must accept the + /// new Negotiate via TryForceTakeOver and close the stale old session. + /// Before PR #491 this would hang on DUPLICATE_SESSION_CONNECTION. + /// + [Fact] + public async Task Negotiate_HigherVerid_WhileOldSessionStillConnected_AcceptsViaTakeOver() + { + var firms = new FirmRegistry( + new[] { new Firm(Id: "F1", Name: "Firm 1", EnteringFirmCode: 42u) }, + new[] { new SessionCredential(SessionId: "1", FirmId: "F1", AccessKey: "", + AllowedSourceCidrs: null, Policy: SessionPolicy.Default) }); + var claims = new SessionClaimRegistry(); + var negValidator = new NegotiationValidator(firms, claims, devMode: true, + timestampSkewToleranceNs: 0); + var estValidator = new EstablishValidator(timestampSkewToleranceNs: 0); + + await using var listener = BuildListener(firms, claims, negValidator, estValidator); + listener.Start(); + + var creds = Encoding.UTF8.GetBytes("{\"auth_type\":\"basic\",\"username\":\"1\",\"access_key\":\"\"}"); + var buf = new byte[512]; + + // --- Phase 1: establish the old session (verId=2) and keep it alive --- + using var client1 = new TcpClient(); + await client1.ConnectAsync(IPAddress.Loopback, listener.LocalEndpoint!.Port); + var stream1 = client1.GetStream(); + + int len = EntryPointFixpFrameCodec.EncodeNegotiate(buf, + sessionId: 1, sessionVerId: 2UL, timestampNanos: 0UL, enteringFirm: 42u, + onBehalfFirm: null, credentials: creds, + clientIp: ReadOnlySpan.Empty, + clientAppName: ReadOnlySpan.Empty, + clientAppVersion: ReadOnlySpan.Empty); + await stream1.WriteAsync(buf.AsMemory(0, len)); + using var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var (tid1, _) = await ReadOneFrameAsync(stream1, cts1.Token); + Assert.Equal(EntryPointFrameReader.TidNegotiateResponse, tid1); + + len = EntryPointFixpFrameCodec.EncodeEstablish(buf, + sessionId: 1, sessionVerId: 2UL, timestampNanos: 0UL, + keepAliveIntervalMillis: 10_000, nextSeqNo: 1, + cancelOnDisconnectType: 0, codTimeoutWindowMillis: 0, + credentials: ReadOnlySpan.Empty); + await stream1.WriteAsync(buf.AsMemory(0, len)); + var (tid2, _) = await ReadOneFrameAsync(stream1, cts1.Token); + Assert.Equal(EntryPointFrameReader.TidEstablishAck, tid2); + + var oldEstablished = await TestUtil.WaitUntilAsync( + () => listener.ActiveSessions.Any(s => s.SessionId == 1 && s.State == FixpState.Established), + TimeSpan.FromSeconds(5)); + Assert.True(oldEstablished, "old session (verId=2) must reach Established state"); + + // --- Phase 2: reconnect with incremented verId (old TCP still alive) --- + // This simulates: trading-host crashes, restarts fast (before exchange + // detects dead TCP), and sends Negotiate with verId=3. + using var client2 = new TcpClient(); + await client2.ConnectAsync(IPAddress.Loopback, listener.LocalEndpoint!.Port); + var stream2 = client2.GetStream(); + + len = EntryPointFixpFrameCodec.EncodeNegotiate(buf, + sessionId: 1, sessionVerId: 3UL, timestampNanos: 0UL, enteringFirm: 42u, + onBehalfFirm: null, credentials: creds, + clientIp: ReadOnlySpan.Empty, + clientAppName: ReadOnlySpan.Empty, + clientAppVersion: ReadOnlySpan.Empty); + await stream2.WriteAsync(buf.AsMemory(0, len)); + using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var (tid3, _) = await ReadOneFrameAsync(stream2, cts2.Token); + + // This is the core assertion of issue #492: the exchange MUST accept + // the reconnect via TryForceTakeOver, not reject with + // DUPLICATE_SESSION_CONNECTION (which would give TidNegotiateReject). + Assert.Equal(EntryPointFrameReader.TidNegotiateResponse, tid3); + + // Old session (verId=2) must be evicted from ActiveSessions after the + // takeover. The new session (verId=3) has the same SessionId=1 and + // remains open — so we filter by SessionVerId to tell them apart. + var oldEvicted = await TestUtil.WaitUntilAsync( + () => !listener.ActiveSessions.Any(s => s.SessionId == 1 && s.SessionVerId == 2UL), + TimeSpan.FromSeconds(5)); + Assert.True(oldEvicted, "old session (verId=2) must be removed from ActiveSessions after the takeover"); + + // New session (verId=3) claim must be held in the registry. + Assert.True(claims.TryGetActiveClaim(1u, out _, out var claimedVerId)); + Assert.Equal(3UL, claimedVerId); + } +} From 298cf92b95efa4db2b98c79f395b7979caf37b72 Mon Sep 17 00:00:00 2001 From: pedrotravi Date: Wed, 27 May 2026 23:14:02 +0000 Subject: [PATCH 2/3] fix(gateway): prevent snapshot rollback on session takeover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs found by code review on PR #493: 1. (High) CloseLocked called SaveStateSnapshotSafe() for the evicted session (kind=SessionTakeOver), overwriting the snapshot that the new session had just persisted with the higher sessionVerId. Fixed by excluding SessionTakeOver from the 'else' persist branch — the new session already owns the durable state; the evicted session must not touch it. 2. (Medium) On TrySaveStateSnapshot() failure in the takeover path, the rollback only released the new session's claim but did not restore the evicted session's claim. Left the old live TCP unclaimed with _lastSessionVerId advanced to the new verId, making the old session unable to re-negotiate. Fixed by adding SessionClaimRegistry. TryRestoreTakeOver() and calling it from the rollback path before Release(), atomically reinstating the evicted session's claim and reverting _lastSessionVerId. Adds two tests: - Negotiate_HigherVerid_WhileOldSessionStillConnected_AcceptsViaTakeOver (E2E, no persister — covers the core #492 takeover acceptance path) - TakeOver_WithStatePersister_FinalSnapshotHasNewVerid (persister-wired — catches the snapshot overwrite regression) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../FixpSession.Lifecycle.cs | 7 +- .../FixpSession.Negotiate.cs | 13 +++ .../SessionClaimRegistry.cs | 25 +++++ .../FixpSessionTakeoverTests.cs | 98 ++++++++++++++++++- 4 files changed, 140 insertions(+), 3 deletions(-) diff --git a/src/B3.Exchange.Gateway/FixpSession.Lifecycle.cs b/src/B3.Exchange.Gateway/FixpSession.Lifecycle.cs index 14c0e18..8af3810 100644 --- a/src/B3.Exchange.Gateway/FixpSession.Lifecycle.cs +++ b/src/B3.Exchange.Gateway/FixpSession.Lifecycle.cs @@ -402,11 +402,16 @@ private void CloseLocked(string reason, CloseKind kind) } } } - else + else if (kind != CloseKind.SessionTakeOver) { // Host shutdown / transport error: keep journal + ring; // persist final snapshot so the reconnecting peer sees the // correct LastIncomingSeqNo / OutboundMsgSeqNum. + // SessionTakeOver is intentionally excluded: the new session + // already persisted its snapshot (with the higher SessionVerID) + // before calling Close on this evicted session. Saving here + // would overwrite the new snapshot with this session's older + // SessionVerID, rolling durable state backwards. SaveStateSnapshotSafe(); } try { _onClosed?.Invoke(this, reason); } diff --git a/src/B3.Exchange.Gateway/FixpSession.Negotiate.cs b/src/B3.Exchange.Gateway/FixpSession.Negotiate.cs index 8848b5b..78ccffa 100644 --- a/src/B3.Exchange.Gateway/FixpSession.Negotiate.cs +++ b/src/B3.Exchange.Gateway/FixpSession.Negotiate.cs @@ -132,6 +132,7 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan fixedBlock, ReadOnlySp // no live session to own it — the old session's TCP will time out // naturally and call OnSessionClosed via the TransportError path. FixpSession? evictedByTakeOver = null; + ulong evictedVerId = 0UL; if (claim == SessionClaimRegistry.ClaimResult.DuplicateConnection) { // Session takeover — the peer crashed and reconnected before our @@ -146,6 +147,7 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan fixedBlock, ReadOnlySp "session {ConnectionId} taking over sessionId={SessionId} from {OldConnectionId} (new verId={NewVerId})", ConnectionId, req.SessionId, oldSession.ConnectionId, req.SessionVerId); evictedByTakeOver = oldSession; + evictedVerId = oldSession.SessionVerId; } } if (claim != SessionClaimRegistry.ClaimResult.Accepted) @@ -186,6 +188,17 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan fixedBlock, ReadOnlySp // can be retried by the peer (same SessionVerID, same TCP // connection or a new one); without this the second attempt // would hit DUPLICATE_SESSION_CONNECTION. + // + // For a takeover, also restore the evicted session's claim so + // the old TCP (still alive) remains the authoritative owner + // rather than being left unclaimed. TryRestoreTakeOver is a + // no-op if a concurrent racing takeover has already won the + // registry in the window between TryForceTakeOver and here. + if (evictedByTakeOver is not null) + { + _claims.TryRestoreTakeOver(req.SessionId, this, + evictedByTakeOver, evictedVerId); + } _claims.Release(req.SessionId, this); _claimedSessionId = 0; SessionId = 0; diff --git a/src/B3.Exchange.Gateway/SessionClaimRegistry.cs b/src/B3.Exchange.Gateway/SessionClaimRegistry.cs index 2c02856..51e5507 100644 --- a/src/B3.Exchange.Gateway/SessionClaimRegistry.cs +++ b/src/B3.Exchange.Gateway/SessionClaimRegistry.cs @@ -242,4 +242,29 @@ public void Release(uint sessionId, object claimToken, bool forgetLastVersion = } } } + + /// + /// Rolls back a failed by atomically + /// restoring the evicted session's claim and reverting + /// _lastSessionVerId to the pre-takeover value. Only acts + /// when still holds the active claim + /// (guards against a concurrent takeover winning the registry + /// between the failed persist and this restore call). Safe to call + /// multiple times; idempotent when the claim has already moved on. + /// + public void TryRestoreTakeOver(uint sessionId, object newToken, + object oldToken, ulong oldVerId) + { + ArgumentNullException.ThrowIfNull(newToken); + ArgumentNullException.ThrowIfNull(oldToken); + lock (_lock) + { + if (_activeClaims.TryGetValue(sessionId, out var current) && + ReferenceEquals(current, newToken)) + { + _activeClaims[sessionId] = oldToken; + _lastSessionVerId[sessionId] = oldVerId; + } + } + } } diff --git a/tests/B3.Exchange.Gateway.Tests/FixpSessionTakeoverTests.cs b/tests/B3.Exchange.Gateway.Tests/FixpSessionTakeoverTests.cs index c86c614..d750b8e 100644 --- a/tests/B3.Exchange.Gateway.Tests/FixpSessionTakeoverTests.cs +++ b/tests/B3.Exchange.Gateway.Tests/FixpSessionTakeoverTests.cs @@ -1,6 +1,7 @@ using B3.Exchange.Contracts; using B3.EntryPoint.Wire; using B3.Exchange.Gateway; +using B3.Exchange.Gateway.Persistence; using B3.Exchange.Matching; using Microsoft.Extensions.Logging.Abstractions; using System.Net; @@ -56,7 +57,8 @@ public void OnSessionClosed(SessionId session) { } private static EntryPointListener BuildListener( FirmRegistry firms, SessionClaimRegistry claims, - NegotiationValidator negValidator, EstablishValidator estValidator) + NegotiationValidator negValidator, EstablishValidator estValidator, + IFixpSessionStatePersister? statePersister = null) => new( new IPEndPoint(IPAddress.Loopback, 0), new NoOpEngineSink(), @@ -72,7 +74,30 @@ private static EntryPointListener BuildListener( }, negotiationValidator: negValidator, sessionClaims: claims, - establishValidator: estValidator); + establishValidator: estValidator, + statePersister: statePersister); + + private sealed class RecordingStatePersister : IFixpSessionStatePersister + { + private readonly object _lock = new(); + private FixpSessionStateSnapshot? _last; + + public FixpSessionStateSnapshot? LastSaved + { + get { lock (_lock) return _last; } + } + + public void Save(in FixpSessionStateSnapshot snapshot) + { + lock (_lock) _last = snapshot; + } + + public FixpSessionStateSnapshot? Load(uint sessionId) => null; + public IReadOnlyCollection LoadAll() + => Array.Empty(); + public void Remove(uint sessionId) { } + public void Dispose() { } + } /// /// Issue #492: trading-host crashes (old TCP still alive from exchange POV), @@ -162,4 +187,73 @@ public async Task Negotiate_HigherVerid_WhileOldSessionStillConnected_AcceptsVia Assert.True(claims.TryGetActiveClaim(1u, out _, out var claimedVerId)); Assert.Equal(3UL, claimedVerId); } + + /// + /// After a successful takeover, the persisted snapshot must reflect the + /// new session's verId (3), not the evicted session's verId (2). + /// Bug: before the fix, CloseLocked called SaveStateSnapshotSafe() for + /// the evicted session (kind=SessionTakeOver), overwriting the new + /// session's snapshot with the old verId. + /// + [Fact] + public async Task TakeOver_WithStatePersister_FinalSnapshotHasNewVerid() + { + var firms = new FirmRegistry( + new[] { new Firm(Id: "F1", Name: "Firm 1", EnteringFirmCode: 42u) }, + new[] { new SessionCredential(SessionId: "1", FirmId: "F1", AccessKey: "", + AllowedSourceCidrs: null, Policy: SessionPolicy.Default) }); + var claims = new SessionClaimRegistry(); + var negValidator = new NegotiationValidator(firms, claims, devMode: true, + timestampSkewToleranceNs: 0); + var estValidator = new EstablishValidator(timestampSkewToleranceNs: 0); + var persister = new RecordingStatePersister(); + + await using var listener = BuildListener(firms, claims, negValidator, estValidator, + statePersister: persister); + listener.Start(); + + var creds = Encoding.UTF8.GetBytes("{\"auth_type\":\"basic\",\"username\":\"1\",\"access_key\":\"\"}"); + var buf = new byte[512]; + + // Establish old session (verId=2). + using var client1 = new TcpClient(); + await client1.ConnectAsync(IPAddress.Loopback, listener.LocalEndpoint!.Port); + var stream1 = client1.GetStream(); + int len = EntryPointFixpFrameCodec.EncodeNegotiate(buf, + sessionId: 1, sessionVerId: 2UL, timestampNanos: 0UL, enteringFirm: 42u, + onBehalfFirm: null, credentials: creds, + clientIp: ReadOnlySpan.Empty, + clientAppName: ReadOnlySpan.Empty, + clientAppVersion: ReadOnlySpan.Empty); + await stream1.WriteAsync(buf.AsMemory(0, len)); + using var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var (tid1, _) = await ReadOneFrameAsync(stream1, cts1.Token); + Assert.Equal(EntryPointFrameReader.TidNegotiateResponse, tid1); + + // Take over with verId=3. + using var client2 = new TcpClient(); + await client2.ConnectAsync(IPAddress.Loopback, listener.LocalEndpoint!.Port); + var stream2 = client2.GetStream(); + len = EntryPointFixpFrameCodec.EncodeNegotiate(buf, + sessionId: 1, sessionVerId: 3UL, timestampNanos: 0UL, enteringFirm: 42u, + onBehalfFirm: null, credentials: creds, + clientIp: ReadOnlySpan.Empty, + clientAppName: ReadOnlySpan.Empty, + clientAppVersion: ReadOnlySpan.Empty); + await stream2.WriteAsync(buf.AsMemory(0, len)); + using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var (tid2, _) = await ReadOneFrameAsync(stream2, cts2.Token); + Assert.Equal(EntryPointFrameReader.TidNegotiateResponse, tid2); + + // Wait for the old session to be evicted. + await TestUtil.WaitUntilAsync( + () => !listener.ActiveSessions.Any(s => s.SessionId == 1 && s.SessionVerId == 2UL), + TimeSpan.FromSeconds(5)); + + // The persisted snapshot MUST carry verId=3 (the new session), not + // verId=2 (the evicted session). Before the fix, the evicted session's + // CloseLocked called SaveStateSnapshotSafe() and overwrote the file. + Assert.NotNull(persister.LastSaved); + Assert.Equal(3UL, persister.LastSaved!.Value.SessionVerId); + } } From 77230e8ba1fa34b61d9c662a1e390a7ca19d2fc5 Mon Sep 17 00:00:00 2001 From: Pedro Sakuma Travi <39205549+pedrosakuma@users.noreply.github.com> Date: Tue, 23 Jun 2026 19:10:45 +0000 Subject: [PATCH 3/3] fix(gateway): restore evicted session's registry entry on takeover rollback When a session takeover's Negotiate persist fails, the rollback restored the SessionClaimRegistry claim to the evicted old session but left it unregistered in SessionRegistry (UpdateIdentityAfterNegotiate had overwritten its entry with the failed new session, and RollbackIdentity then removed it). The old, still-live owner therefore stopped receiving routed execution reports. Re-register the evicted session via a new onTakeOverRollback hook, gated on TryRestoreTakeOver (now bool) so the registry stays in lock-step with the claim registry and a racing concurrent takeover is not clobbered. Adds unit coverage for TryRestoreTakeOver's restore and racing-loser paths. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/B3.Exchange.Gateway/EntryPointListener.cs | 3 +- .../FixpSession.Negotiate.cs | 21 ++++++-- src/B3.Exchange.Gateway/FixpSession.cs | 15 +++++- .../SessionClaimRegistry.cs | 9 +++- .../SessionClaimRegistryTests.cs | 48 +++++++++++++++++++ 5 files changed, 89 insertions(+), 7 deletions(-) diff --git a/src/B3.Exchange.Gateway/EntryPointListener.cs b/src/B3.Exchange.Gateway/EntryPointListener.cs index 4e2efba..eeeb6ad 100644 --- a/src/B3.Exchange.Gateway/EntryPointListener.cs +++ b/src/B3.Exchange.Gateway/EntryPointListener.cs @@ -620,7 +620,8 @@ private void ConstructAndStartSession(NetworkStream stream, Socket sock, byte[]? persistedState: persistedState, resumeAsNegotiated: resumeAsNegotiated, persistedMaxOrderRatePerSecond: persistedMaxOrderRatePerSecond, - onIdentityChanged: onIdentityChanged); + onIdentityChanged: onIdentityChanged, + onTakeOverRollback: s => _registry.Register(s)); _registry.Register(session); lock (_lock) _sessions.Add(session); session.Start(); diff --git a/src/B3.Exchange.Gateway/FixpSession.Negotiate.cs b/src/B3.Exchange.Gateway/FixpSession.Negotiate.cs index 78ccffa..e25b669 100644 --- a/src/B3.Exchange.Gateway/FixpSession.Negotiate.cs +++ b/src/B3.Exchange.Gateway/FixpSession.Negotiate.cs @@ -192,12 +192,14 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan fixedBlock, ReadOnlySp // For a takeover, also restore the evicted session's claim so // the old TCP (still alive) remains the authoritative owner // rather than being left unclaimed. TryRestoreTakeOver is a - // no-op if a concurrent racing takeover has already won the - // registry in the window between TryForceTakeOver and here. + // no-op (returns false) if a concurrent racing takeover has + // already won the registry in the window between + // TryForceTakeOver and here. + var takeOverRolledBack = false; if (evictedByTakeOver is not null) { - _claims.TryRestoreTakeOver(req.SessionId, this, - evictedByTakeOver, evictedVerId); + takeOverRolledBack = _claims.TryRestoreTakeOver(req.SessionId, + this, evictedByTakeOver, evictedVerId); } _claims.Release(req.SessionId, this); _claimedSessionId = 0; @@ -207,6 +209,17 @@ private NegotiateStep ProcessNegotiate(ReadOnlySpan fixedBlock, ReadOnlySp // Issue #485: roll back Identity to the pending format so // OnSessionClosed doesn't evict ownership for a real session. RollbackIdentity(pendingIdentity); + // Issue #492: UpdateIdentityAfterNegotiate above overwrote the + // evicted session's SessionRegistry entry with this (failed) new + // session; RollbackIdentity then removed it, leaving the old + // owner unroutable. Re-register the evicted session AFTER the + // rollback so routing is restored — but only when the claim was + // actually handed back (takeOverRolledBack), to stay in lock-step + // with the claim registry and avoid clobbering a racing takeover. + if (takeOverRolledBack && evictedByTakeOver is not null) + { + _onTakeOverRollback?.Invoke(evictedByTakeOver); + } // No spec-defined "internal error" reject code; UNSPECIFIED // (0) is the closest match. The peer will retry. var rejectFrame = new byte[NegotiateRejectEncoder.Total]; diff --git a/src/B3.Exchange.Gateway/FixpSession.cs b/src/B3.Exchange.Gateway/FixpSession.cs index fe14db5..4562817 100644 --- a/src/B3.Exchange.Gateway/FixpSession.cs +++ b/src/B3.Exchange.Gateway/FixpSession.cs @@ -142,6 +142,17 @@ public sealed partial class FixpSession : IAsyncDisposable /// private readonly Action? _onIdentityChanged; + /// + /// Issue #492: re-registers an evicted-by-takeover session in the + /// when a takeover is rolled back after + /// the new session already overwrote the old session's registry entry + /// (e.g. snapshot persistence failed mid-Negotiate). Wired to + /// SessionRegistry.Register so the old, still-live owner keeps + /// receiving routed execution reports. Invoked only in lock-step with a + /// successful . + /// + private readonly Action? _onTakeOverRollback; + public long ConnectionId { get; } /// @@ -369,7 +380,8 @@ public FixpSession(long connectionId, uint enteringFirm, uint sessionId, B3.Exchange.Gateway.Persistence.FixpSessionStateSnapshot? persistedState = null, bool resumeAsNegotiated = false, int? persistedMaxOrderRatePerSecond = null, - Action? onIdentityChanged = null) + Action? onIdentityChanged = null, + Action? onTakeOverRollback = null) { ArgumentNullException.ThrowIfNull(logger); ConnectionId = connectionId; @@ -392,6 +404,7 @@ public FixpSession(long connectionId, uint enteringFirm, uint sessionId, _options = options ?? FixpSessionOptions.Default; _options.Validate(); _onIdentityChanged = onIdentityChanged; + _onTakeOverRollback = onTakeOverRollback; if (persistedMaxOrderRatePerSecond is < 0) throw new ArgumentOutOfRangeException(nameof(persistedMaxOrderRatePerSecond)); _outboundJournal = outboundJournal; diff --git a/src/B3.Exchange.Gateway/SessionClaimRegistry.cs b/src/B3.Exchange.Gateway/SessionClaimRegistry.cs index 51e5507..4296368 100644 --- a/src/B3.Exchange.Gateway/SessionClaimRegistry.cs +++ b/src/B3.Exchange.Gateway/SessionClaimRegistry.cs @@ -251,8 +251,13 @@ public void Release(uint sessionId, object claimToken, bool forgetLastVersion = /// (guards against a concurrent takeover winning the registry /// between the failed persist and this restore call). Safe to call /// multiple times; idempotent when the claim has already moved on. + /// Returns true when the claim was actually restored to + /// , so the caller can keep the + /// entry in lock-step; returns + /// false when a concurrent takeover already moved the claim on + /// (in which case the caller must not roll the registry back either). /// - public void TryRestoreTakeOver(uint sessionId, object newToken, + public bool TryRestoreTakeOver(uint sessionId, object newToken, object oldToken, ulong oldVerId) { ArgumentNullException.ThrowIfNull(newToken); @@ -264,7 +269,9 @@ public void TryRestoreTakeOver(uint sessionId, object newToken, { _activeClaims[sessionId] = oldToken; _lastSessionVerId[sessionId] = oldVerId; + return true; } } + return false; } } diff --git a/tests/B3.Exchange.Gateway.Tests/SessionClaimRegistryTests.cs b/tests/B3.Exchange.Gateway.Tests/SessionClaimRegistryTests.cs index 7a805e5..ebd7e9a 100644 --- a/tests/B3.Exchange.Gateway.Tests/SessionClaimRegistryTests.cs +++ b/tests/B3.Exchange.Gateway.Tests/SessionClaimRegistryTests.cs @@ -236,4 +236,52 @@ public void TryForceTakeOver_OldSessionRelease_IsNoopAfterTakeover() Assert.True(r.TryGetActiveClaim(42, out var holder, out _)); Assert.Same(newToken, holder); } + + // ===== TryRestoreTakeOver (issue #492) ===== + + [Fact] + public void TryRestoreTakeOver_WhenNewTokenStillActive_RestoresOldClaimAndVerId() + { + // A takeover was force-applied, then its Negotiate persist failed. + // Rolling it back must hand the claim (and verId) back to the + // evicted old session and report success. + var r = new SessionClaimRegistry(); + var oldToken = new object(); + r.TryClaim(42, 5, oldToken); + + var newToken = new object(); + r.TryForceTakeOver(42, 6, newToken, out _); + Assert.Equal(6UL, r.CurrentSessionVerId(42)); + + var restored = r.TryRestoreTakeOver(42, newToken, oldToken, oldVerId: 5); + + Assert.True(restored); + Assert.True(r.TryGetActiveClaim(42, out var holder, out _)); + Assert.Same(oldToken, holder); + Assert.Equal(5UL, r.CurrentSessionVerId(42)); + } + + [Fact] + public void TryRestoreTakeOver_WhenRacingTakeoverWon_DoesNotClobberAndReturnsFalse() + { + // While the failed takeover was rolling back, a second takeover won + // the registry. Restore must be a no-op (return false) so the caller + // does not roll the SessionRegistry back over the racing winner. + var r = new SessionClaimRegistry(); + var oldToken = new object(); + r.TryClaim(42, 5, oldToken); + + var newToken = new object(); + r.TryForceTakeOver(42, 6, newToken, out _); + + var racingToken = new object(); + r.TryForceTakeOver(42, 7, racingToken, out _); + + var restored = r.TryRestoreTakeOver(42, newToken, oldToken, oldVerId: 5); + + Assert.False(restored); + Assert.True(r.TryGetActiveClaim(42, out var holder, out _)); + Assert.Same(racingToken, holder); + Assert.Equal(7UL, r.CurrentSessionVerId(42)); + } }