Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions src/clients/producer/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ export class Producer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
deduplicateCallback => {
this[kPerformWithRetry]<FindCoordinatorResponse>(
'findCoordinator',
retryCallback => {
(retryCallback, attempt) => {
this[kGetBootstrapConnection]((error, connection) => {
if (error) {
retryCallback(error)
Expand All @@ -616,7 +616,7 @@ export class Producer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa

api!(connection!, FindCoordinatorKeyTypes.TRANSACTION, [transactionalId], retryCallback)
})
})
}, attempt)
},
(error, response) => {
if (error) {
Expand Down Expand Up @@ -909,12 +909,8 @@ export class Producer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
deduplicateCallback => {
this[kPerformWithRetry]<InitProducerIdResponse>(
'initProducerId',
retryCallback => {
const connector = transactionalId
? this.#getCoordinatorConnection.bind(this)
: this[kGetBootstrapConnection].bind(this)

connector((error, connection) => {
(retryCallback, attempt) => {
const onConnection: CallbackWithPromise<Connection> = (error, connection) => {
if (error) {
retryCallback(error)
return
Expand All @@ -937,7 +933,16 @@ export class Producer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
}
)
})
})
}

// The transactional path fails over inside #getCoordinatorConnection (via
// #findCoordinator -> kGetBootstrapConnection). The idempotent path must thread
// `attempt` so kGetBootstrapConnection rotates past a reachable-but-hung broker.
if (transactionalId) {
this.#getCoordinatorConnection(onConnection)
} else {
this[kGetBootstrapConnection](onConnection, attempt)
}
},
(error, response) => {
if (error) {
Expand Down
60 changes: 60 additions & 0 deletions test/clients/producer/producer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,66 @@ test('initIdempotentProducer should handle unavailable API errors', async t => {
)
})

test('initIdempotentProducer fails over to a healthy broker when the bootstrap broker is reachable but hung', async t => {
// Regression: an idempotent producer running InitProducerId must rotate past a
// bootstrap broker that accepts TCP but never serves requests (paused/fenced/GC-stalled).
// The rotation in kGetBootstrapConnection only fires when attempt > 0 is threaded
// through, which #initIdempotentProducer previously failed to do, so it wedged on the
// hung broker for the whole retry budget instead of failing over.
const producer = createProducer(t, {
idempotent: true,
bootstrapBrokers: ['broker-a:9092', 'broker-b:9093', 'broker-c:9094'],
retries: 5,
retryDelay: 0
})

// Every broker accepts TCP, so getFirstAvailable always returns a connection to whatever
// broker sits at the head of the list. The "hung" broker only manifests once InitProducerId
// is actually executed against it.
const hungHost = 'broker-a'
const triedHeads: string[] = []
mockConnectionPoolGetFirstAvailable(producer[kConnections], () => true, null, undefined, (_original, brokers, callback) => {
const head = brokers[0]
triedHeads.push(head.host)
callback(null, { host: head.host } as Connection)
return true
})

mockMethod(producer, kGetApi, () => true, null, undefined, (original, name, callback) => {
if (name === 'InitProducerId') {
const api = (
connection: Connection,
_transactionalId: string | undefined,
_timeout: number,
_producerId: bigint,
_producerEpoch: number,
apiCallback: Callback<unknown>
) => {
if (connection.host === hungHost) {
// TCP-reachable but never responds: surfaces as a retriable connection reset.
apiCallback(new NetworkError('Connection closed'))
return
}

apiCallback(null, { throttleTimeMs: 0, errorCode: 0, producerId: 42n, producerEpoch: 0 })
}

callback(null, api as any)
return true
}

original(name, callback)
return true
})

const info = await producer.initIdempotentProducer({})

strictEqual(info.producerId, 42n)
// attempt 0 selected the hung bootstrap broker; attempt 1 rotated past it to a healthy one.
strictEqual(triedHeads[0], 'broker-a')
strictEqual(triedHeads[1], 'broker-b')
})

test('send should return ProduceResult with offsets and support diagnostic channels', async t => {
const producer = createProducer(t)
const testTopic = await createTopic(t)
Expand Down
Loading