Skip to content

Support drain-on-dispose for PCGroups consumers#66

Merged
mtmk merged 4 commits into
synadia-io:mainfrom
colprog:drain_on_close
May 30, 2026
Merged

Support drain-on-dispose for PCGroups consumers#66
mtmk merged 4 commits into
synadia-io:mainfrom
colprog:drain_on_close

Conversation

@colprog
Copy link
Copy Markdown
Contributor

@colprog colprog commented May 29, 2026

Attempt to fix #65

Upgrade NATS.Net to 2.8.0 and let static and elastic PCGroups consume loops complete when the underlying JetStream consumer finishes during drain-on-dispose.

Add regression coverage for buffered-message drain behavior on connection disposal and document the graceful shutdown option.

Upgrade NATS.Net to 2.8.0 and let static and elastic PCGroups consume loops complete when the underlying JetStream consumer finishes during drain-on-dispose.

Add regression coverage for buffered-message drain behavior on connection disposal and document the graceful shutdown option.
@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer Bot commented May 29, 2026

Claude finished @colprog's task in 7m 30s —— View job


PR Review: Support drain-on-dispose for PCGroups consumers

  • Gather context (CLAUDE.md, PR diff)
  • Review static/elastic consume context changes
  • Review test coverage
  • Review documentation changes
  • Post inline comments for specific issues
  • Post overall review summary

Summary

The approach is correct. When nats.DisposeAsync() triggers drain, NATS.Net 2.8.0 completes the ConsumeAsync enumerable cleanly (hasNext == false) rather than throwing. Without the fix the outer loop restarts consume, hitting errors during connection teardown. The yield break terminates the iterator, the extension method's finally then calls context.DisposeAsync() and the watch task is cancelled cleanly. End-to-end lifecycle is sound.

Issues found

Medium — implicit NATS.Net contract (both contexts)

_js.Connection.Opts.DrainSubscriptionsOnDispose is a static option used as a runtime proxy for "drain is in progress". This is only safe if NATS.Net guarantees that ConsumeAsync never completes with false during ordinary fetch-window expiration when this option is set — only during drain. If that invariant ever changes, consumers configured with DrainSubscriptionsOnDispose = true would silently stop after the first fetch round. The assumption is reasonable given the 2.8.0 behaviour, but it isn't documented. See inline comments on NatsPcgElasticConsumeContext.cs:226 and NatsPcgStaticConsumeContext.cs:181.

Low — test resource leak (both test files)

nats is not await using (intentionally, to test manual disposal), but it is also not disposed in the finally block. If AssertCompletesWithinAsync(reachedBail.Task, ...) times out before nats.DisposeAsync() is reached, the connection leaks for the test run. Adding await nats.DisposeAsync() to the finally block is the minimal fix. See inline comments on NatsPcgElasticExtensionsTests.cs:779 and NatsPcgStaticExtensionsTests.cs:631.

No other issues

  • The NATS.Net version bump (preview.2 → 2.8.0) is straightforward.
  • TaskTestHelpers.AssertCompletesWithinAsync is implemented correctly (ReferenceEquals identity check, exception re-thrown from the original task).
  • Documentation in PACKAGE.md is accurate and well-placed.
  • The yield break vs break distinction is correct: break exits to the outer restart loop while yield break terminates the whole iterator, which is exactly what drain requires.
  • The extension method's finally (context.DisposeAsync()) ensures the KV watch task is cancelled cleanly after the drain path exits, so there is no watch-task leak.

Comment thread src/Synadia.Orbit.PCGroups/Elastic/NatsPcgElasticConsumeContext.cs
Comment thread src/Synadia.Orbit.PCGroups/Static/NatsPcgStaticConsumeContext.cs
colprog added 2 commits May 29, 2026 23:54
Ensure the drain-on-dispose regression tests dispose their NATS connection from finally when an early assertion fails before the explicit disposal step.
Use generic TaskCompletionSource in the drain-on-dispose regression tests and avoid awaiting externally supplied tasks inside the timeout helper so the net481 build passes analyzers.
Replace per-message sleeps in the drain-on-dispose regression
tests with an explicit gate that pauses consumption at the bail
point until disposal starts. This keeps the 10 second assertions
while reducing CI timing variance.
Copy link
Copy Markdown
Collaborator

@mtmk mtmk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks @colprog

@mtmk mtmk merged commit bc72447 into synadia-io:main May 30, 2026
15 of 16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PCGroups Elastic consume uses the same cancellation token for lifecycle shutdown and JetStream consume, bypassing NATS.Net drain-on-dispose

2 participants