[fix][consumer] Add reconnect failure listener and auto-close on max retry exhaustion#1490
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an opt-in notification/auto-close mechanism for consumers that can no longer reconnect to a broker, addressing the “silent broken consumer” failure mode described in #1481.
Changes:
- Extends
ConsumerOptionswithMaxReconnectToBrokerListener(callback) andCloseConsumerOnMaxReconnectToBroker(auto-close). - Wires the new options into
partitionConsumer.reconnectToBroker()andnewPartitionConsumerOpts(). - Adds integration-style tests that validate listener invocation and auto-close behavior when reconnect retries are exhausted.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
pulsar/consumer.go |
Adds new public ConsumerOptions fields + GoDoc for reconnect exhaustion notification and optional auto-close. |
pulsar/consumer_impl.go |
Propagates the new ConsumerOptions fields into partitionConsumerOpts. |
pulsar/consumer_partition.go |
Invokes listener / triggers async close when reconnect attempts are exhausted. |
pulsar/consumer_test.go |
Adds tests using testcontainers to validate listener firing and auto-close on retry exhaustion. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if maxRetry == 0 || bo.IsMaxBackoffReached() { | ||
| pc.metrics.ConsumersReconnectMaxRetry.Inc() | ||
| if pc.options.maxReconnectToBrokerListener != nil { | ||
| pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err) | ||
| } |
There was a problem hiding this comment.
I confirmed the bug. With MaxReconnectToBroker unset, bo.IsMaxBackoffReached() stays true and the branch returns err, so the retry loop re-enters indefinitely — listener fires every iteration, close goroutines pile up.
Didn't take the "make it terminal" option: it would silently change the default contract of retrying forever at the backoff ceiling.
Fix: closure-local maxRetryNotified bool gating only the listener and close goroutine; retry flow and metric untouched. Plain bool is fine since opFn runs sequentially.
Added TestConsumerMaxReconnectToBrokerListenerFiresOnceWhenBackoffMaxed: MaxReconnectToBroker unset + always-maxed backoff, kills broker, waits 3 s past first invocation, asserts counter == 1. Passes.
| if pc.options.closeConsumerOnMaxReconnectToBroker { | ||
| go pc.parentConsumer.Close() | ||
| } |
There was a problem hiding this comment.
Addressed by the same patch as the previous thread - go pc.parentConsumer.Close() and maxReconnectToBrokerListener are both inside one if !maxRetryNotified { … } block, so they fire together exactly once per reconnect cycle. Used a plain bool rather than sync.Once/atomic because opFn is invoked sequentially by internal.Retry - no concurrent entry into the closure, no race to protect against. Happy to swap to sync.Once if you'd prefer the explicit signal.
| // (e.g. recreate the consumer). Only fires when MaxReconnectToBroker is set to a finite value | ||
| // or when the backoff policy signals IsMaxBackoffReached. | ||
| MaxReconnectToBrokerListener func(consumer Consumer, err error) | ||
|
|
||
| // CloseConsumerOnMaxReconnectToBroker, when true, automatically closes the consumer after | ||
| // exhausting all reconnect attempts. The close happens asynchronously after | ||
| // MaxReconnectToBrokerListener (if set) returns. Default: false. |
There was a problem hiding this comment.
Addressed in the commit instead of committing suggestion because it was outdated due to the previous commit.
| req := testcontainers.ContainerRequest{ | ||
| Image: getPulsarTestImage(), | ||
| ExposedPorts: []string{"6650/tcp", "8080/tcp"}, | ||
| WaitingFor: wait.ForExposedPort(), | ||
| Cmd: []string{"bin/pulsar", "standalone", "-nfw", "--advertised-address", "localhost"}, | ||
| } | ||
| c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ | ||
| ContainerRequest: req, | ||
| Started: true, | ||
| }) | ||
| require.NoError(t, err) | ||
| endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") | ||
| require.NoError(t, err) |
There was a problem hiding this comment.
Addressed in latest commit: registered t.Cleanup right after container creation in all three tests as a best-effort safety net (logs on error), and changed the in-body _ = c.Terminate(...) to require.NoError(t, c.Terminate(...)) since that call is part of the scenario.
| MaxReconnectToBroker: &maxRetry, | ||
| BackOffPolicyFunc: func() backoff.Policy { | ||
| return newTestBackoffPolicy(100*time.Millisecond, 1*time.Second) | ||
| }, |
There was a problem hiding this comment.
Already addressed in a follow-up commit: added TestConsumerMaxReconnectToBrokerListenerFiresOnceWhenBackoffMaxed with a maxBackoffReachedPolicy whose IsMaxBackoffReached() always returns true. It asserts the listener fires exactly once even after many additional retry iterations, covering the previously untested exhaustion path.
| ContainerRequest: req, | ||
| Started: true, | ||
| }) | ||
| require.NoError(t, err) |
There was a problem hiding this comment.
Already addressed in the latest commit: t.Cleanup registered immediately after container creation in TestConsumerMaxReconnectToBrokerAutoClose (and the other two reconnect tests) - best-effort terminate that logs on error, with the in-body terminate now checked via require.NoError.
|
Hi @PavelZeger, could you follow up on the Java client logic? |
Sure, @nodece. What's done now
What I left for later
Does this work for you? If you want me to go further on 3 or 4 in this PR, let me know — otherwise I'll open a follow-up for 4 after this PR. Sources:
|
Fixes #1481
Motivation
When a
partitionConsumerexhausts all broker reconnection attempts (controlled byMaxReconnectToBroker), the client silently increments a metric and exits the retry loop,leaving the consumer alive but unable to receive messages. There is no way for application
code to detect this failure or react to it (e.g. recreate the consumer or alert on-call).
Modifications
Added two opt-in fields to
ConsumerOptions:MaxReconnectToBrokerListener func(consumer Consumer, err error)— a callback invokedexactly once, on the same internal goroutine, immediately after the last reconnect attempt
fails. The
consumerargument is the parentConsumerthe application holds, anderris the last connection error. The listener fires whenever
MaxReconnectToBrokerretriesare exhausted or when the configured backoff policy signals
IsMaxBackoffReached.CloseConsumerOnMaxReconnectToBroker bool— whentrue, automatically closes theconsumer after exhausting reconnect attempts. The close runs asynchronously after
MaxReconnectToBrokerListener(if set) returns. InternallyparentConsumer.Close()islaunched in a goroutine; this cancels the consumer's context, which unblocks the
internal.Retryloop, allowingrunEventsLoopto process the close request withoutdeadlocking.
Both fields default to their zero values (
nil/false), so there is no behaviour changefor existing consumers.
Why points 3 and 4 from the issue are not implemented in this PR
The original issue suggested two additional fixes:
3. Propagate the error to the consumer's error channel
The
Consumerinterface does not expose an error channel. Adding one would be a breakingAPI change: every implementation (
consumer,consumer_multitopic,consumer_regex,consumer_zero_queue) would need a new method, and all existing callers that perform atype-assertion or embed the interface would break. This is a larger design decision that
warrants its own issue and a deprecation / migration path. The
MaxReconnectToBrokerListenercallback achieves the same observable outcome (application code is notified of the failure)
without modifying the public interface.
4. Update consumer state to a terminal "failed" state
There is currently no
consumerFailedstate in the internal state machine(
consumerInit → consumerReady → consumerClosing → consumerClosed). Introducing a newterminal state would require updating every state guard in
consumer_partition.go(there are more than a dozen) as well as the multi-topic and regex consumer wrappers.
In practice, enabling
CloseConsumerOnMaxReconnectToBrokeralready transitions theconsumer through
consumerClosing → consumerClosed, which is the correct terminal stateand prevents any further operations on a dead consumer. A separate "failed" state that
carries an error cause can be considered as a follow-up if observability tooling needs to
distinguish a failed-closed consumer from a normally-closed one.
Verifying this change
This change added behaviour that requires a running broker to test end-to-end. Unit-level
verification can be done by constructing a
partitionConsumerOptsdirectly with amaxReconnectToBrokerof 1 and asserting the listener fires and the consumer closes.Integration test coverage is tracked as a follow-up.
Does this pull request potentially affect one of the following parts:
ConsumerOptionsDocumentation
ConsumerOptionsfields