From 4735ca0a6fd7b4946feafef6defc9e659fe85df3 Mon Sep 17 00:00:00 2001 From: Zhen Wu Date: Fri, 29 May 2026 23:13:54 +0800 Subject: [PATCH 1/4] Support drain-on-dispose for PCGroups consumers Upgrade NATS.Net to 2.8.0 and let static and elastic PCGroups consume loops complete when the underlying JetStream consumer finishes during drain-on-dispose. Add regression coverage for buffered-message drain behavior on connection disposal and document the graceful shutdown option. --- Directory.Packages.props | 2 +- .../Elastic/NatsPcgElasticConsumeContext.cs | 5 + src/Synadia.Orbit.PCGroups/PACKAGE.md | 18 ++++ .../Static/NatsPcgStaticConsumeContext.cs | 5 + .../Elastic/NatsPcgElasticExtensionsTests.cs | 94 ++++++++++++++++++ .../Static/NatsPcgStaticExtensionsTests.cs | 97 +++++++++++++++++++ .../TaskTestHelpers.cs | 21 ++++ 7 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 27c31c9..3d46e46 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,7 +1,7 @@ true - 2.8.0-preview.2 + 2.8.0 diff --git a/src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs b/src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs index 03fe0a7..cb4aac2 100644 --- a/src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs +++ b/src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs @@ -223,6 +223,11 @@ private async IAsyncEnumerable> ConsumeAsync([EnumeratorCancellati if (!hasNext) { + if (_js.Connection.Opts.DrainSubscriptionsOnDispose) + { + yield break; + } + break; } diff --git a/src/Synadia.Orbit.PCGroups/PACKAGE.md b/src/Synadia.Orbit.PCGroups/PACKAGE.md index 1919410..ae4b2bd 100644 --- a/src/Synadia.Orbit.PCGroups/PACKAGE.md +++ b/src/Synadia.Orbit.PCGroups/PACKAGE.md @@ -142,6 +142,24 @@ record Event(string EventId, string UserId, string Type); | Stream Setup | Requires SubjectTransform | Auto-creates work-queue stream | | Configuration | Simpler | More flexible | +## Graceful Shutdown + +PCGroups supports NATS .NET drain-on-dispose. Enable it on the connection to let +in-flight JetStream consumer messages already buffered by the client continue to +flow through the PCGroups async enumerable before the connection is closed: + +```csharp +await using var nats = new NatsConnection(new NatsOpts +{ + DrainSubscriptionsOnDispose = true, + ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30), +}); +``` + +Without `DrainSubscriptionsOnDispose`, NATS .NET closes the socket first during +connection disposal and buffered consumer messages may remain pending until +`AckWait` expires. + ## Custom Partition Mappings For fine-grained control over partition distribution: diff --git a/src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs b/src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs index ed0668f..a6ff637 100644 --- a/src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs +++ b/src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs @@ -178,6 +178,11 @@ private async IAsyncEnumerable> ConsumeAsync([EnumeratorCancellati if (!hasNext) { + if (_js.Connection.Opts.DrainSubscriptionsOnDispose) + { + yield break; + } + break; } diff --git a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs index 5fcdae9..48a209f 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs @@ -6,6 +6,7 @@ using NATS.Client.JetStream.Models; using NATS.Net; using Synadia.Orbit.PCGroups.Elastic; +using Synadia.Orbit.PCGroups.Test; using Synadia.Orbit.TestUtils; namespace Synadia.Orbit.PCGroups.Test.Elastic; @@ -686,6 +687,99 @@ await js.CreatePcgElasticAsync( } } + [Fact] + public async Task ConsumeElastic_ConnectionDisposeDrainsBufferedMessagesAndCompletes() + { + const int totalMsgs = 100; + const int bailAt = 10; + + await using var setup = new NatsConnection(new NatsOpts { Url = _server.Url }); + var setupJs = setup.CreateJetStreamContext(); + + var id = Guid.NewGuid().ToString("N"); + var streamName = $"test-stream-{id}"; + var subject = $"{id}.orders.*"; + var groupName = $"test-group-{id}"; + var workQueueStreamName = $"{streamName}-{groupName}"; + + await setupJs.CreateStreamAsync(new StreamConfig + { + Name = streamName, + Subjects = [subject], + }); + + await setupJs.CreatePcgElasticAsync( + streamName, + groupName, + maxNumMembers: 1, + partitioningFilters: [new NatsPcgPartitioningFilter(subject, [1])]); + + await setupJs.AddPcgElasticMembersAsync(streamName, groupName, ["worker"]); + + for (var i = 0; i < totalMsgs; i++) + { + await setupJs.PublishAsync($"{id}.orders.{i}", $"payload-{i}"); + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var consumed = 0; + var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var nats = new NatsConnection(new NatsOpts + { + Url = _server.Url, + DrainSubscriptionsOnDispose = true, + ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30), + }); + var js = nats.CreateJetStreamContext(); + + var consumeTask = Task.Run( + async () => + { + try + { + await foreach (var msg in js.ConsumePcgElasticAsync(streamName, groupName, "worker", cancellationToken: cts.Token)) + { + Interlocked.Increment(ref consumed); + await msg.AckAsync(cancellationToken: cts.Token); + + if (Volatile.Read(ref consumed) == bailAt) + { + reachedBail.TrySetResult(); + } + + await Task.Delay(50, cts.Token); + } + } + catch (OperationCanceledException) when (cts.IsCancellationRequested) + { + } + }, + cts.Token); + + try + { + await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10)); + + var disposeTask = nats.DisposeAsync().AsTask(); + await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10)); + + await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10)); + + Assert.True(Volatile.Read(ref consumed) > bailAt, $"consumed {Volatile.Read(ref consumed)} should be greater than {bailAt}"); + + await using var check = new NatsConnection(new NatsOpts { Url = _server.Url }); + var checkJs = check.CreateJetStreamContext(); + var consumer = await checkJs.GetConsumerAsync(workQueueStreamName, "worker"); + Assert.Equal(0, consumer.Info.NumAckPending); + } + finally + { + cts.Cancel(); + await Task.WhenAny(consumeTask, Task.Delay(TimeSpan.FromSeconds(5))); + } + } + [Fact] public async Task CreatePcgElastic_EmptyWildcards_FullSubjectPartition_Success() { diff --git a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs index 640326a..2e723cc 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs @@ -6,6 +6,7 @@ using NATS.Client.JetStream.Models; using NATS.Net; using Synadia.Orbit.PCGroups.Static; +using Synadia.Orbit.PCGroups.Test; using Synadia.Orbit.TestUtils; namespace Synadia.Orbit.PCGroups.Test.Static; @@ -534,4 +535,100 @@ await js.CreatePcgStaticAsync( await js.DeleteStreamAsync(streamName); } } + + [Fact] + public async Task ConsumeStatic_ConnectionDisposeDrainsBufferedMessagesAndCompletes() + { + const int totalMsgs = 100; + const int bailAt = 10; + + await using var setup = new NatsConnection(new NatsOpts { Url = _server.Url }); + var setupJs = setup.CreateJetStreamContext(); + + var id = Guid.NewGuid().ToString("N"); + var streamName = $"test-stream-{id}"; + var subject = $"{id}.orders.*"; + var groupName = $"test-group-{id}"; + + await setupJs.CreateStreamAsync(new StreamConfig + { + Name = streamName, + Subjects = [subject], + SubjectTransform = new SubjectTransform + { + Src = subject, + Dest = $"{{{{partition(1,1)}}}}.{id}.orders.{{{{wildcard(1)}}}}", + }, + }); + + await setupJs.CreatePcgStaticAsync( + streamName, + groupName, + maxNumMembers: 1, + filters: [subject], + members: ["worker"]); + + for (var i = 0; i < totalMsgs; i++) + { + await setupJs.PublishAsync($"{id}.orders.{i}", $"payload-{i}"); + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var consumed = 0; + var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var nats = new NatsConnection(new NatsOpts + { + Url = _server.Url, + DrainSubscriptionsOnDispose = true, + ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30), + }); + var js = nats.CreateJetStreamContext(); + + var consumeTask = Task.Run( + async () => + { + try + { + await foreach (var msg in js.ConsumePcgStaticAsync(streamName, groupName, "worker", cancellationToken: cts.Token)) + { + Interlocked.Increment(ref consumed); + await msg.AckAsync(cancellationToken: cts.Token); + + if (Volatile.Read(ref consumed) == bailAt) + { + reachedBail.TrySetResult(); + } + + await Task.Delay(50, cts.Token); + } + } + catch (OperationCanceledException) when (cts.IsCancellationRequested) + { + } + }, + cts.Token); + + try + { + await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10)); + + var disposeTask = nats.DisposeAsync().AsTask(); + await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10)); + + await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10)); + + Assert.True(Volatile.Read(ref consumed) > bailAt, $"consumed {Volatile.Read(ref consumed)} should be greater than {bailAt}"); + + await using var check = new NatsConnection(new NatsOpts { Url = _server.Url }); + var checkJs = check.CreateJetStreamContext(); + var consumer = await checkJs.GetConsumerAsync(streamName, $"{groupName}-worker"); + Assert.Equal(0, consumer.Info.NumAckPending); + } + finally + { + cts.Cancel(); + await Task.WhenAny(consumeTask, Task.Delay(TimeSpan.FromSeconds(5))); + } + } } diff --git a/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs b/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs new file mode 100644 index 0000000..f2c6e91 --- /dev/null +++ b/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs @@ -0,0 +1,21 @@ +// Copyright (c) Synadia Communications, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. + +namespace Synadia.Orbit.PCGroups.Test; + +internal static class TaskTestHelpers +{ + public static async Task AssertCompletesWithinAsync(Task task, TimeSpan timeout) + { + var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false); + Assert.True(ReferenceEquals(task, completed), $"Task did not complete within {timeout}."); + await task.ConfigureAwait(false); + } + + public static async Task AssertCompletesWithinAsync(Task task, TimeSpan timeout) + { + var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false); + Assert.True(ReferenceEquals(task, completed), $"Task did not complete within {timeout}."); + return await task.ConfigureAwait(false); + } +} From 6187b3dd59b7fe37201ce7b80152b1a1ed0a448d Mon Sep 17 00:00:00 2001 From: Zhen Wu Date: Fri, 29 May 2026 23:37:47 +0800 Subject: [PATCH 2/4] Dispose PCGroups drain test connections on failure Ensure the drain-on-dispose regression tests dispose their NATS connection from finally when an early assertion fails before the explicit disposal step. --- .../Elastic/NatsPcgElasticExtensionsTests.cs | 5 ++++- .../Static/NatsPcgStaticExtensionsTests.cs | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs index 48a209f..f967b47 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs @@ -733,6 +733,7 @@ await setupJs.CreatePcgElasticAsync( }); var js = nats.CreateJetStreamContext(); + Task? disposeTask = null; var consumeTask = Task.Run( async () => { @@ -761,7 +762,7 @@ await setupJs.CreatePcgElasticAsync( { await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10)); - var disposeTask = nats.DisposeAsync().AsTask(); + disposeTask = nats.DisposeAsync().AsTask(); await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10)); await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10)); @@ -776,6 +777,8 @@ await setupJs.CreatePcgElasticAsync( finally { cts.Cancel(); + disposeTask ??= nats.DisposeAsync().AsTask(); + await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5))); await Task.WhenAny(consumeTask, Task.Delay(TimeSpan.FromSeconds(5))); } } diff --git a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs index 2e723cc..e8325c6 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs @@ -585,6 +585,7 @@ await setupJs.CreatePcgStaticAsync( }); var js = nats.CreateJetStreamContext(); + Task? disposeTask = null; var consumeTask = Task.Run( async () => { @@ -613,7 +614,7 @@ await setupJs.CreatePcgStaticAsync( { await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10)); - var disposeTask = nats.DisposeAsync().AsTask(); + disposeTask = nats.DisposeAsync().AsTask(); await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10)); await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10)); @@ -628,6 +629,8 @@ await setupJs.CreatePcgStaticAsync( finally { cts.Cancel(); + disposeTask ??= nats.DisposeAsync().AsTask(); + await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5))); await Task.WhenAny(consumeTask, Task.Delay(TimeSpan.FromSeconds(5))); } } From b9f1689b5e7a0158db5a103ef31a3496b0c6f09d Mon Sep 17 00:00:00 2001 From: Zhen Wu Date: Fri, 29 May 2026 23:30:53 +0800 Subject: [PATCH 3/4] Make PCGroups drain tests net481-compatible Use generic TaskCompletionSource in the drain-on-dispose regression tests and avoid awaiting externally supplied tasks inside the timeout helper so the net481 build passes analyzers. --- .../Elastic/NatsPcgElasticExtensionsTests.cs | 4 ++-- .../Static/NatsPcgStaticExtensionsTests.cs | 4 ++-- tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs | 9 +-------- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs index f967b47..cd51a55 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs @@ -723,7 +723,7 @@ await setupJs.CreatePcgElasticAsync( using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var consumed = 0; - var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var nats = new NatsConnection(new NatsOpts { @@ -746,7 +746,7 @@ await setupJs.CreatePcgElasticAsync( if (Volatile.Read(ref consumed) == bailAt) { - reachedBail.TrySetResult(); + reachedBail.TrySetResult(true); } await Task.Delay(50, cts.Token); diff --git a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs index e8325c6..667930b 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs @@ -575,7 +575,7 @@ await setupJs.CreatePcgStaticAsync( using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var consumed = 0; - var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var nats = new NatsConnection(new NatsOpts { @@ -598,7 +598,7 @@ await setupJs.CreatePcgStaticAsync( if (Volatile.Read(ref consumed) == bailAt) { - reachedBail.TrySetResult(); + reachedBail.TrySetResult(true); } await Task.Delay(50, cts.Token); diff --git a/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs b/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs index f2c6e91..1da7035 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs @@ -9,13 +9,6 @@ public static async Task AssertCompletesWithinAsync(Task task, TimeSpan timeout) { var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false); Assert.True(ReferenceEquals(task, completed), $"Task did not complete within {timeout}."); - await task.ConfigureAwait(false); - } - - public static async Task AssertCompletesWithinAsync(Task task, TimeSpan timeout) - { - var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false); - Assert.True(ReferenceEquals(task, completed), $"Task did not complete within {timeout}."); - return await task.ConfigureAwait(false); + await Task.WhenAll(task).ConfigureAwait(false); } } From c86ed2f300e073e791103c94d03a7dad74ce4253 Mon Sep 17 00:00:00 2001 From: Zhen Wu Date: Sat, 30 May 2026 00:11:06 +0800 Subject: [PATCH 4/4] Make PCGroups drain tests more deterministic Replace per-message sleeps in the drain-on-dispose regression tests with an explicit gate that pauses consumption at the bail point until disposal starts. This keeps the 10 second assertions while reducing CI timing variance. --- .../Elastic/NatsPcgElasticExtensionsTests.cs | 10 ++++++---- .../Static/NatsPcgStaticExtensionsTests.cs | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs index cd51a55..eecabd7 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs @@ -690,8 +690,8 @@ await js.CreatePcgElasticAsync( [Fact] public async Task ConsumeElastic_ConnectionDisposeDrainsBufferedMessagesAndCompletes() { - const int totalMsgs = 100; - const int bailAt = 10; + const int totalMsgs = 30; + const int bailAt = 5; await using var setup = new NatsConnection(new NatsOpts { Url = _server.Url }); var setupJs = setup.CreateJetStreamContext(); @@ -724,6 +724,7 @@ await setupJs.CreatePcgElasticAsync( using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var consumed = 0; var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var releaseAfterDisposeStarts = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var nats = new NatsConnection(new NatsOpts { @@ -747,9 +748,8 @@ await setupJs.CreatePcgElasticAsync( if (Volatile.Read(ref consumed) == bailAt) { reachedBail.TrySetResult(true); + await releaseAfterDisposeStarts.Task.ConfigureAwait(false); } - - await Task.Delay(50, cts.Token); } } catch (OperationCanceledException) when (cts.IsCancellationRequested) @@ -763,6 +763,7 @@ await setupJs.CreatePcgElasticAsync( await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10)); disposeTask = nats.DisposeAsync().AsTask(); + releaseAfterDisposeStarts.TrySetResult(true); await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10)); await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10)); @@ -776,6 +777,7 @@ await setupJs.CreatePcgElasticAsync( } finally { + releaseAfterDisposeStarts.TrySetResult(true); cts.Cancel(); disposeTask ??= nats.DisposeAsync().AsTask(); await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5))); diff --git a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs index 667930b..3363fa8 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs @@ -539,8 +539,8 @@ await js.CreatePcgStaticAsync( [Fact] public async Task ConsumeStatic_ConnectionDisposeDrainsBufferedMessagesAndCompletes() { - const int totalMsgs = 100; - const int bailAt = 10; + const int totalMsgs = 30; + const int bailAt = 5; await using var setup = new NatsConnection(new NatsOpts { Url = _server.Url }); var setupJs = setup.CreateJetStreamContext(); @@ -576,6 +576,7 @@ await setupJs.CreatePcgStaticAsync( using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var consumed = 0; var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var releaseAfterDisposeStarts = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var nats = new NatsConnection(new NatsOpts { @@ -599,9 +600,8 @@ await setupJs.CreatePcgStaticAsync( if (Volatile.Read(ref consumed) == bailAt) { reachedBail.TrySetResult(true); + await releaseAfterDisposeStarts.Task.ConfigureAwait(false); } - - await Task.Delay(50, cts.Token); } } catch (OperationCanceledException) when (cts.IsCancellationRequested) @@ -615,6 +615,7 @@ await setupJs.CreatePcgStaticAsync( await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10)); disposeTask = nats.DisposeAsync().AsTask(); + releaseAfterDisposeStarts.TrySetResult(true); await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10)); await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10)); @@ -628,6 +629,7 @@ await setupJs.CreatePcgStaticAsync( } finally { + releaseAfterDisposeStarts.TrySetResult(true); cts.Cancel(); disposeTask ??= nats.DisposeAsync().AsTask(); await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5)));