diff --git a/Directory.Packages.props b/Directory.Packages.props index 27c31c9..3d46e46 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,7 +1,7 @@ true - 2.8.0-preview.2 + 2.8.0 diff --git a/src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs b/src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs index 03fe0a7..cb4aac2 100644 --- a/src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs +++ b/src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs @@ -223,6 +223,11 @@ private async IAsyncEnumerable> ConsumeAsync([EnumeratorCancellati if (!hasNext) { + if (_js.Connection.Opts.DrainSubscriptionsOnDispose) + { + yield break; + } + break; } diff --git a/src/Synadia.Orbit.PCGroups/PACKAGE.md b/src/Synadia.Orbit.PCGroups/PACKAGE.md index 1919410..ae4b2bd 100644 --- a/src/Synadia.Orbit.PCGroups/PACKAGE.md +++ b/src/Synadia.Orbit.PCGroups/PACKAGE.md @@ -142,6 +142,24 @@ record Event(string EventId, string UserId, string Type); | Stream Setup | Requires SubjectTransform | Auto-creates work-queue stream | | Configuration | Simpler | More flexible | +## Graceful Shutdown + +PCGroups supports NATS .NET drain-on-dispose. Enable it on the connection to let +in-flight JetStream consumer messages already buffered by the client continue to +flow through the PCGroups async enumerable before the connection is closed: + +```csharp +await using var nats = new NatsConnection(new NatsOpts +{ + DrainSubscriptionsOnDispose = true, + ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30), +}); +``` + +Without `DrainSubscriptionsOnDispose`, NATS .NET closes the socket first during +connection disposal and buffered consumer messages may remain pending until +`AckWait` expires. + ## Custom Partition Mappings For fine-grained control over partition distribution: diff --git a/src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs b/src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs index ed0668f..a6ff637 100644 --- a/src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs +++ b/src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs @@ -178,6 +178,11 @@ private async IAsyncEnumerable> ConsumeAsync([EnumeratorCancellati if (!hasNext) { + if (_js.Connection.Opts.DrainSubscriptionsOnDispose) + { + yield break; + } + break; } diff --git a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs index 5fcdae9..eecabd7 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Elastic/NatsPcgElasticExtensionsTests.cs @@ -6,6 +6,7 @@ using NATS.Client.JetStream.Models; using NATS.Net; using Synadia.Orbit.PCGroups.Elastic; +using Synadia.Orbit.PCGroups.Test; using Synadia.Orbit.TestUtils; namespace Synadia.Orbit.PCGroups.Test.Elastic; @@ -686,6 +687,104 @@ await js.CreatePcgElasticAsync( } } + [Fact] + public async Task ConsumeElastic_ConnectionDisposeDrainsBufferedMessagesAndCompletes() + { + const int totalMsgs = 30; + const int bailAt = 5; + + await using var setup = new NatsConnection(new NatsOpts { Url = _server.Url }); + var setupJs = setup.CreateJetStreamContext(); + + var id = Guid.NewGuid().ToString("N"); + var streamName = $"test-stream-{id}"; + var subject = $"{id}.orders.*"; + var groupName = $"test-group-{id}"; + var workQueueStreamName = $"{streamName}-{groupName}"; + + await setupJs.CreateStreamAsync(new StreamConfig + { + Name = streamName, + Subjects = [subject], + }); + + await setupJs.CreatePcgElasticAsync( + streamName, + groupName, + maxNumMembers: 1, + partitioningFilters: [new NatsPcgPartitioningFilter(subject, [1])]); + + await setupJs.AddPcgElasticMembersAsync(streamName, groupName, ["worker"]); + + for (var i = 0; i < totalMsgs; i++) + { + await setupJs.PublishAsync($"{id}.orders.{i}", $"payload-{i}"); + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var consumed = 0; + var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var releaseAfterDisposeStarts = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var nats = new NatsConnection(new NatsOpts + { + Url = _server.Url, + DrainSubscriptionsOnDispose = true, + ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30), + }); + var js = nats.CreateJetStreamContext(); + + Task? disposeTask = null; + var consumeTask = Task.Run( + async () => + { + try + { + await foreach (var msg in js.ConsumePcgElasticAsync(streamName, groupName, "worker", cancellationToken: cts.Token)) + { + Interlocked.Increment(ref consumed); + await msg.AckAsync(cancellationToken: cts.Token); + + if (Volatile.Read(ref consumed) == bailAt) + { + reachedBail.TrySetResult(true); + await releaseAfterDisposeStarts.Task.ConfigureAwait(false); + } + } + } + catch (OperationCanceledException) when (cts.IsCancellationRequested) + { + } + }, + cts.Token); + + try + { + await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10)); + + disposeTask = nats.DisposeAsync().AsTask(); + releaseAfterDisposeStarts.TrySetResult(true); + await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10)); + + await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10)); + + Assert.True(Volatile.Read(ref consumed) > bailAt, $"consumed {Volatile.Read(ref consumed)} should be greater than {bailAt}"); + + await using var check = new NatsConnection(new NatsOpts { Url = _server.Url }); + var checkJs = check.CreateJetStreamContext(); + var consumer = await checkJs.GetConsumerAsync(workQueueStreamName, "worker"); + Assert.Equal(0, consumer.Info.NumAckPending); + } + finally + { + releaseAfterDisposeStarts.TrySetResult(true); + cts.Cancel(); + disposeTask ??= nats.DisposeAsync().AsTask(); + await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5))); + await Task.WhenAny(consumeTask, Task.Delay(TimeSpan.FromSeconds(5))); + } + } + [Fact] public async Task CreatePcgElastic_EmptyWildcards_FullSubjectPartition_Success() { diff --git a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs index 640326a..3363fa8 100644 --- a/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs +++ b/tests/Synadia.Orbit.PCGroups.Test/Static/NatsPcgStaticExtensionsTests.cs @@ -6,6 +6,7 @@ using NATS.Client.JetStream.Models; using NATS.Net; using Synadia.Orbit.PCGroups.Static; +using Synadia.Orbit.PCGroups.Test; using Synadia.Orbit.TestUtils; namespace Synadia.Orbit.PCGroups.Test.Static; @@ -534,4 +535,105 @@ await js.CreatePcgStaticAsync( await js.DeleteStreamAsync(streamName); } } + + [Fact] + public async Task ConsumeStatic_ConnectionDisposeDrainsBufferedMessagesAndCompletes() + { + const int totalMsgs = 30; + const int bailAt = 5; + + await using var setup = new NatsConnection(new NatsOpts { Url = _server.Url }); + var setupJs = setup.CreateJetStreamContext(); + + var id = Guid.NewGuid().ToString("N"); + var streamName = $"test-stream-{id}"; + var subject = $"{id}.orders.*"; + var groupName = $"test-group-{id}"; + + await setupJs.CreateStreamAsync(new StreamConfig + { + Name = streamName, + Subjects = [subject], + SubjectTransform = new SubjectTransform + { + Src = subject, + Dest = $"{{{{partition(1,1)}}}}.{id}.orders.{{{{wildcard(1)}}}}", + }, + }); + + await setupJs.CreatePcgStaticAsync( + streamName, + groupName, + maxNumMembers: 1, + filters: [subject], + members: ["worker"]); + + for (var i = 0; i < totalMsgs; i++) + { + await setupJs.PublishAsync($"{id}.orders.{i}", $"payload-{i}"); + } + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var consumed = 0; + var reachedBail = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var releaseAfterDisposeStarts = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var nats = new NatsConnection(new NatsOpts + { + Url = _server.Url, + DrainSubscriptionsOnDispose = true, + ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30), + }); + var js = nats.CreateJetStreamContext(); + + Task? disposeTask = null; + var consumeTask = Task.Run( + async () => + { + try + { + await foreach (var msg in js.ConsumePcgStaticAsync(streamName, groupName, "worker", cancellationToken: cts.Token)) + { + Interlocked.Increment(ref consumed); + await msg.AckAsync(cancellationToken: cts.Token); + + if (Volatile.Read(ref consumed) == bailAt) + { + reachedBail.TrySetResult(true); + await releaseAfterDisposeStarts.Task.ConfigureAwait(false); + } + } + } + catch (OperationCanceledException) when (cts.IsCancellationRequested) + { + } + }, + cts.Token); + + try + { + await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10)); + + disposeTask = nats.DisposeAsync().AsTask(); + releaseAfterDisposeStarts.TrySetResult(true); + await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10)); + + await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10)); + + Assert.True(Volatile.Read(ref consumed) > bailAt, $"consumed {Volatile.Read(ref consumed)} should be greater than {bailAt}"); + + await using var check = new NatsConnection(new NatsOpts { Url = _server.Url }); + var checkJs = check.CreateJetStreamContext(); + var consumer = await checkJs.GetConsumerAsync(streamName, $"{groupName}-worker"); + Assert.Equal(0, consumer.Info.NumAckPending); + } + finally + { + releaseAfterDisposeStarts.TrySetResult(true); + cts.Cancel(); + disposeTask ??= nats.DisposeAsync().AsTask(); + await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5))); + await Task.WhenAny(consumeTask, Task.Delay(TimeSpan.FromSeconds(5))); + } + } } diff --git a/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs b/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs new file mode 100644 index 0000000..1da7035 --- /dev/null +++ b/tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs @@ -0,0 +1,14 @@ +// Copyright (c) Synadia Communications, Inc. All rights reserved. +// Licensed under the Apache License, Version 2.0. + +namespace Synadia.Orbit.PCGroups.Test; + +internal static class TaskTestHelpers +{ + public static async Task AssertCompletesWithinAsync(Task task, TimeSpan timeout) + { + var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false); + Assert.True(ReferenceEquals(task, completed), $"Task did not complete within {timeout}."); + await Task.WhenAll(task).ConfigureAwait(false); + } +}