Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<NatsVersion>2.8.0-preview.2</NatsVersion>
<NatsVersion>2.8.0</NatsVersion>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="NATS.Net" Version="$(NatsVersion)" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ private async IAsyncEnumerable<NatsPcgMsg<T>> ConsumeAsync([EnumeratorCancellati

if (!hasNext)
{
if (_js.Connection.Opts.DrainSubscriptionsOnDispose)
Comment thread
mtmk marked this conversation as resolved.
{
yield break;
}

break;
}

Expand Down
18 changes: 18 additions & 0 deletions src/Synadia.Orbit.PCGroups/PACKAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,24 @@ record Event(string EventId, string UserId, string Type);
| Stream Setup | Requires SubjectTransform | Auto-creates work-queue stream |
| Configuration | Simpler | More flexible |

## Graceful Shutdown

PCGroups supports NATS .NET drain-on-dispose. Enable it on the connection to let
in-flight JetStream consumer messages already buffered by the client continue to
flow through the PCGroups async enumerable before the connection is closed:

```csharp
await using var nats = new NatsConnection(new NatsOpts
{
DrainSubscriptionsOnDispose = true,
ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30),
});
```

Without `DrainSubscriptionsOnDispose`, NATS .NET closes the socket first during
connection disposal and buffered consumer messages may remain pending until
`AckWait` expires.

## Custom Partition Mappings

For fine-grained control over partition distribution:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ private async IAsyncEnumerable<NatsPcgMsg<T>> ConsumeAsync([EnumeratorCancellati

if (!hasNext)
{
if (_js.Connection.Opts.DrainSubscriptionsOnDispose)
Comment thread
mtmk marked this conversation as resolved.
{
yield break;
}

break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using NATS.Client.JetStream.Models;
using NATS.Net;
using Synadia.Orbit.PCGroups.Elastic;
using Synadia.Orbit.PCGroups.Test;
using Synadia.Orbit.TestUtils;

namespace Synadia.Orbit.PCGroups.Test.Elastic;
Expand Down Expand Up @@ -686,6 +687,104 @@ await js.CreatePcgElasticAsync(
}
}

[Fact]
public async Task ConsumeElastic_ConnectionDisposeDrainsBufferedMessagesAndCompletes()
{
const int totalMsgs = 30;
const int bailAt = 5;

await using var setup = new NatsConnection(new NatsOpts { Url = _server.Url });
var setupJs = setup.CreateJetStreamContext();

var id = Guid.NewGuid().ToString("N");
var streamName = $"test-stream-{id}";
var subject = $"{id}.orders.*";
var groupName = $"test-group-{id}";
var workQueueStreamName = $"{streamName}-{groupName}";

await setupJs.CreateStreamAsync(new StreamConfig
{
Name = streamName,
Subjects = [subject],
});

await setupJs.CreatePcgElasticAsync(
streamName,
groupName,
maxNumMembers: 1,
partitioningFilters: [new NatsPcgPartitioningFilter(subject, [1])]);

await setupJs.AddPcgElasticMembersAsync(streamName, groupName, ["worker"]);

for (var i = 0; i < totalMsgs; i++)
{
await setupJs.PublishAsync($"{id}.orders.{i}", $"payload-{i}");
}

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var consumed = 0;
var reachedBail = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var releaseAfterDisposeStarts = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var nats = new NatsConnection(new NatsOpts
{
Url = _server.Url,
DrainSubscriptionsOnDispose = true,
ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30),
});
var js = nats.CreateJetStreamContext();

Task? disposeTask = null;
var consumeTask = Task.Run(
async () =>
{
try
{
await foreach (var msg in js.ConsumePcgElasticAsync<string>(streamName, groupName, "worker", cancellationToken: cts.Token))
{
Interlocked.Increment(ref consumed);
await msg.AckAsync(cancellationToken: cts.Token);

if (Volatile.Read(ref consumed) == bailAt)
{
reachedBail.TrySetResult(true);
await releaseAfterDisposeStarts.Task.ConfigureAwait(false);
}
}
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
}
},
cts.Token);

try
{
await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10));

disposeTask = nats.DisposeAsync().AsTask();
releaseAfterDisposeStarts.TrySetResult(true);
await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10));

await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10));

Assert.True(Volatile.Read(ref consumed) > bailAt, $"consumed {Volatile.Read(ref consumed)} should be greater than {bailAt}");

await using var check = new NatsConnection(new NatsOpts { Url = _server.Url });
var checkJs = check.CreateJetStreamContext();
var consumer = await checkJs.GetConsumerAsync(workQueueStreamName, "worker");
Assert.Equal(0, consumer.Info.NumAckPending);
}
finally
{
releaseAfterDisposeStarts.TrySetResult(true);
cts.Cancel();
disposeTask ??= nats.DisposeAsync().AsTask();
await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5)));
await Task.WhenAny(consumeTask, Task.Delay(TimeSpan.FromSeconds(5)));
Comment thread
colprog marked this conversation as resolved.
}
}

[Fact]
public async Task CreatePcgElastic_EmptyWildcards_FullSubjectPartition_Success()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using NATS.Client.JetStream.Models;
using NATS.Net;
using Synadia.Orbit.PCGroups.Static;
using Synadia.Orbit.PCGroups.Test;
using Synadia.Orbit.TestUtils;

namespace Synadia.Orbit.PCGroups.Test.Static;
Expand Down Expand Up @@ -534,4 +535,105 @@ await js.CreatePcgStaticAsync(
await js.DeleteStreamAsync(streamName);
}
}

[Fact]
public async Task ConsumeStatic_ConnectionDisposeDrainsBufferedMessagesAndCompletes()
{
const int totalMsgs = 30;
const int bailAt = 5;

await using var setup = new NatsConnection(new NatsOpts { Url = _server.Url });
var setupJs = setup.CreateJetStreamContext();

var id = Guid.NewGuid().ToString("N");
var streamName = $"test-stream-{id}";
var subject = $"{id}.orders.*";
var groupName = $"test-group-{id}";

await setupJs.CreateStreamAsync(new StreamConfig
{
Name = streamName,
Subjects = [subject],
SubjectTransform = new SubjectTransform
{
Src = subject,
Dest = $"{{{{partition(1,1)}}}}.{id}.orders.{{{{wildcard(1)}}}}",
},
});

await setupJs.CreatePcgStaticAsync(
streamName,
groupName,
maxNumMembers: 1,
filters: [subject],
members: ["worker"]);

for (var i = 0; i < totalMsgs; i++)
{
await setupJs.PublishAsync($"{id}.orders.{i}", $"payload-{i}");
}

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var consumed = 0;
var reachedBail = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var releaseAfterDisposeStarts = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var nats = new NatsConnection(new NatsOpts
{
Url = _server.Url,
DrainSubscriptionsOnDispose = true,
ConsumerDrainOnDisposeTimeout = TimeSpan.FromSeconds(30),
});
var js = nats.CreateJetStreamContext();

Task? disposeTask = null;
var consumeTask = Task.Run(
async () =>
{
try
{
await foreach (var msg in js.ConsumePcgStaticAsync<string>(streamName, groupName, "worker", cancellationToken: cts.Token))
{
Interlocked.Increment(ref consumed);
await msg.AckAsync(cancellationToken: cts.Token);

if (Volatile.Read(ref consumed) == bailAt)
{
reachedBail.TrySetResult(true);
await releaseAfterDisposeStarts.Task.ConfigureAwait(false);
}
}
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
}
},
cts.Token);

try
{
await TaskTestHelpers.AssertCompletesWithinAsync(reachedBail.Task, TimeSpan.FromSeconds(10));

disposeTask = nats.DisposeAsync().AsTask();
releaseAfterDisposeStarts.TrySetResult(true);
await TaskTestHelpers.AssertCompletesWithinAsync(disposeTask, TimeSpan.FromSeconds(10));

await TaskTestHelpers.AssertCompletesWithinAsync(consumeTask, TimeSpan.FromSeconds(10));

Assert.True(Volatile.Read(ref consumed) > bailAt, $"consumed {Volatile.Read(ref consumed)} should be greater than {bailAt}");

await using var check = new NatsConnection(new NatsOpts { Url = _server.Url });
var checkJs = check.CreateJetStreamContext();
var consumer = await checkJs.GetConsumerAsync(streamName, $"{groupName}-worker");
Assert.Equal(0, consumer.Info.NumAckPending);
}
finally
{
releaseAfterDisposeStarts.TrySetResult(true);
cts.Cancel();
disposeTask ??= nats.DisposeAsync().AsTask();
await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5)));
await Task.WhenAny(consumeTask, Task.Delay(TimeSpan.FromSeconds(5)));
Comment thread
colprog marked this conversation as resolved.
}
}
}
14 changes: 14 additions & 0 deletions tests/Synadia.Orbit.PCGroups.Test/TaskTestHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Synadia Communications, Inc. All rights reserved.
// Licensed under the Apache License, Version 2.0.

namespace Synadia.Orbit.PCGroups.Test;

internal static class TaskTestHelpers
{
public static async Task AssertCompletesWithinAsync(Task task, TimeSpan timeout)
{
var completed = await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false);
Assert.True(ReferenceEquals(task, completed), $"Task did not complete within {timeout}.");
await Task.WhenAll(task).ConfigureAwait(false);
}
}
Loading