From 35c43f5581567cdbcf0f5c84e6ef465c303345da Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Wed, 17 Jun 2026 14:51:53 +0000 Subject: [PATCH 1/2] feat: add consumer protocols metadata callback Signed-off-by: Paolo Insogna --- docs/consumer.md | 65 ++++++- src/clients/consumer/consumer.ts | 253 ++++++++++++++++--------- src/clients/consumer/options.ts | 1 + src/clients/consumer/types.ts | 9 + test/clients/consumer/consumer.test.ts | 124 ++++++++++++ 5 files changed, 361 insertions(+), 91 deletions(-) diff --git a/docs/consumer.md b/docs/consumer.md index 5a359028..817271fe 100644 --- a/docs/consumer.md +++ b/docs/consumer.md @@ -47,6 +47,7 @@ Options: | groupProtocol | `'classic' \| 'consumer'` | `'classic'` | Group protocol to use. Use `'classic'` for the original consumer group protocol and `'consumer'` for the new protocol introduced in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol).

The `'consumer'` protocol provides server-side partition assignment and incremental rebalancing behavior. | | groupRemoteAssignor | `string` | `null` | Server-side assignor to use for `groupProtocol=consumer`. Keep it unset to let the server select a suitable assignor for the group. Available assignors: `'uniform'` or `'range'`. | | protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.

Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties.

Not supported for `groupProtocol=consumer`. | +| protocolsMetadata | `GroupProtocolsMetadataCallback` | | Callback used to compute opaque protocol metadata before joining or rejoining a classic consumer group. The returned `Buffer` is sent as the subscription `user_data` for every configured protocol. Not supported for `groupProtocol=consumer`. | | partitionAssigner | `GroupPartitionsAssigner` | | Client-side partition assignment strategy.

Not supported for `groupProtocol=consumer`, use `groupRemoteAssignor` instead. | | streamContext | `unknown` | | Default opaque user data for `MessagesStream` instances created by this consumer. It is forwarded to stream-owned `ConnectionPool` and `Connection` instances. Kafka never reads, mutates, or interprets this value. | @@ -54,6 +55,57 @@ It also supports all the constructor options of `Base`. The readonly `streamContext` getters expose the same opaque values on the consumer instance. +### Protocol metadata callback + +`protocolsMetadata` lets each member attach fresh, opaque metadata to its `JoinGroup` subscription. Kafka forwards these bytes without interpreting them. The group leader can read the bytes from each member in a custom `partitionAssigner` through `member.metadata`. + +The callback is invoked once per join or rebalance, before sending `JoinGroup`. The consumer refreshes metadata for its currently tracked topics before invoking it. + +```typescript +type GroupProtocolsMetadataCallback = ( + protocols: GroupProtocolSubscription[], + topics: string[], + metadata: ClusterMetadata, + callback: (error: Error | null, metadata?: Buffer) => void +) => Buffer | Promise | undefined +``` + +The callback can be used in three styles: + +- Return a `Buffer` directly. +- Return a `Promise`. +- Return `undefined` and invoke the callback with a `Buffer`. + +If the resolved value is not a `Buffer`, `joinGroup()` fails. Do not mix styles; if multiple paths settle, only the first result is used. + +Example: + +```typescript +const consumer = new Consumer({ + groupId: 'my-group', + clientId: 'my-consumer', + bootstrapBrokers: ['localhost:9092'], + protocolsMetadata (_protocols, topics, metadata) { + const userData: Record = {} + + for (const topic of topics) { + userData[topic] = metadata.topics.get(topic)!.partitions.map((_partition, partitionId) => partitionId) + } + + return Buffer.from(JSON.stringify(userData), 'utf8') + }, + partitionAssigner (_current, members, topics, metadata) { + for (const member of members.values()) { + const userData = JSON.parse(member.metadata!.toString('utf8')) + // Use the member-provided opaque metadata while computing assignments. + } + + // Return GroupPartitionsAssignments[]. + return [] + } +}) +``` + ## Basic Methods ### `isActive` @@ -297,12 +349,13 @@ This method is no-op for `groupProtocol=consumer`. Options: -| Property | Type | Default | Description | -| ----------------- | ----------------------------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.

This is only relevant when Kafka creates a new group. | -| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.

This is only relevant when Kafka creates a new group. | -| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. | -| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.

Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. | +| Property | Type | Default | Description | +| ----------------- | -------------------------------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.

This is only relevant when Kafka creates a new group. | +| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.

This is only relevant when Kafka creates a new group. | +| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. | +| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.

Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. | +| protocolsMetadata | `GroupProtocolsMetadataCallback` | | Callback used to compute opaque protocol metadata before joining or rejoining a classic consumer group. See [Protocol metadata callback](#protocol-metadata-callback). | ### `leaveGroup([force])` diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts index 154db1b1..868b7d32 100644 --- a/src/clients/consumer/consumer.ts +++ b/src/clients/consumer/consumer.ts @@ -38,6 +38,7 @@ import { type SyncGroupRequestAssignment, type SyncGroupResponse } from '../../apis/consumer/sync-group-v5.ts' +import { type Callback } from '../../apis/definitions.ts' import { FetchIsolationLevels, FindCoordinatorKeyTypes } from '../../apis/enumerations.ts' import { type FindCoordinatorRequest, type FindCoordinatorResponse } from '../../apis/metadata/find-coordinator-v6.ts' import { @@ -57,11 +58,7 @@ import { INT32_SIZE } from '../../protocol/definitions.ts' import { Reader } from '../../protocol/reader.ts' import { IS_CONTROL } from '../../protocol/records.ts' import { Writer } from '../../protocol/writer.ts' -import { - kAutocommit, - kGetFetchNode, - kRefreshOffsetsAndFetch -} from '../../symbols.ts' +import { kAutocommit, kGetFetchNode, kRefreshOffsetsAndFetch } from '../../symbols.ts' import { emitExperimentalApiWarning } from '../../utils.ts' import { Base, @@ -115,6 +112,7 @@ import { type GroupAssignment, type GroupOptions, type GroupPartitionsAssigner, + type GroupProtocolsMetadataCallback, type GroupProtocolSubscription, type ListCommitsOptions, type ListOffsetsOptions, @@ -697,6 +695,11 @@ export class Consumer, callback: CallbackWithPromise): void { + const diagnosticOptions = { ...options } + delete diagnosticOptions.protocolsMetadata + consumerGroupChannel.traceCallback( this.#performJoinGroup, 1, - createDiagnosticContext({ client: this, operation: 'joinGroup', options }), + createDiagnosticContext({ client: this, operation: 'joinGroup', options: diagnosticOptions }), this, options, callback @@ -1694,69 +1700,36 @@ export class Consumer( - 'joinGroup', - (connection, groupCallback) => { - this[kGetApi]('JoinGroup', (error, api) => { - if (error) { - groupCallback(error) - return - } - - api!( - connection, - this.groupId, - options.sessionTimeout, - options.rebalanceTimeout, - this.memberId ?? '', - this.groupInstanceId, - 'consumer', - protocols, - '', - groupCallback - ) - }) - }, - (error, response) => { - if (!this.#membershipActive) { - callback(null) - return - } - - if (error) { - if (this.#getRejoinError(error)) { - this.#performJoinGroup(options, callback) - return - } - - callback(error) - return - } + this.#createJoinGroupProtocols(options, (error, protocols) => { + if (error) { + callback(error) + return + } - // This is for Azure Event Hubs compatibility, which does not respond with an error on the first join - this.memberId = response!.memberId! - this.generationId = response!.generationId - this.#isLeader = response!.leader === this.memberId - this.#protocol = response!.protocolName! - - this.#members = new Map() - for (const member of response!.members) { - this.#members.set( - member.memberId, - this.#decodeProtocolSubscriptionMetadata(member.memberId, member.metadata!) - ) - } + this.#performDeduplicateGroupOperaton( + 'joinGroup', + (connection, groupCallback) => { + this[kGetApi]('JoinGroup', (error, api) => { + if (error) { + groupCallback(error) + return + } - // Send a syncGroup request - this.#syncGroup(options.partitionAssigner, (error, response) => { + api!( + connection, + this.groupId, + options.sessionTimeout, + options.rebalanceTimeout, + this.memberId ?? '', + this.groupInstanceId, + 'consumer', + protocols!, + '', + groupCallback + ) + }) + }, + (error, response) => { if (!this.#membershipActive) { callback(null) return @@ -1772,26 +1745,58 @@ export class Consumer { - this.#heartbeat(options) - }, options.heartbeatInterval) - - this.emitWithDebug('consumer', 'group:join', { - groupId: this.groupId, - memberId: this.memberId, - generationId: this.generationId, - isLeader: this.#isLeader, - assignments: this.assignments - }) + // Send a syncGroup request + this.#syncGroup(options.partitionAssigner, (error, response) => { + if (!this.#membershipActive) { + callback(null) + return + } - callback(null, this.memberId!) - }) - } - ) + if (error) { + if (this.#getRejoinError(error)) { + this.#performJoinGroup(options, callback) + return + } + + callback(error) + return + } + + this.assignments = response! + this.#syncPreferredReadReplicas() + + this.#cancelHeartbeat() + this.#heartbeatInterval = setTimeout(() => { + this.#heartbeat(options) + }, options.heartbeatInterval) + + this.emitWithDebug('consumer', 'group:join', { + groupId: this.groupId, + memberId: this.memberId, + generationId: this.generationId, + isLeader: this.#isLeader, + assignments: this.assignments + }) + + callback(null, this.memberId!) + }) + } + ) + }) } #performLeaveGroup (force: boolean, callback: CallbackWithPromise): void { @@ -2018,6 +2023,84 @@ export class Consumer, callback: Callback): void { + if (typeof options.protocolsMetadata !== 'function') { + callback( + null, + options.protocols.map(protocol => ({ + name: protocol.name, + metadata: this.#encodeProtocolSubscriptionMetadata(protocol, this.topics.current) + })) + ) + return + } + + this[kMetadata]({ topics: this.topics.current, forceUpdate: true }, (error, metadata) => { + if (error) { + callback(this.#handleError(error)) + return + } + + this.#resolveProtocolsMetadata(options.protocolsMetadata, options.protocols, this.topics.current, metadata!, ( + error, + userData + ) => { + if (error) { + callback(error) + return + } else if (!Buffer.isBuffer(userData)) { + callback(new UserError('protocolsMetadata must resolve to a Buffer.')) + return + } + + callback( + null, + options.protocols.map(protocol => ({ + name: protocol.name, + metadata: this.#encodeProtocolSubscriptionMetadata({ ...protocol, metadata: userData }, this.topics.current) + })) + ) + }) + }) + } + + #resolveProtocolsMetadata ( + protocolsMetadata: GroupProtocolsMetadataCallback, + protocols: GroupProtocolSubscription[], + topics: string[], + metadata: ClusterMetadata, + callback: Callback + ): void { + let settled = false + + function resolveCallback (error: Error | null, userData?: Buffer) { + if (settled) { + return + } + + settled = true + callback(error, userData) + } + + let result: Buffer | Promise | undefined + try { + result = protocolsMetadata(protocols, topics, metadata, resolveCallback) + + if (typeof result !== 'undefined') { + if (typeof (result as Promise).then === 'function') { + ;(result as Promise).then( + userData => process.nextTick(resolveCallback, null, userData), + error => process.nextTick(resolveCallback, error) + ) + } else { + resolveCallback(null, result as Buffer) + } + } + } catch (error) { + resolveCallback(error as Error) + } + } + /* The following two methods follow: https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json diff --git a/src/clients/consumer/options.ts b/src/clients/consumer/options.ts index 4ed9074f..60943188 100644 --- a/src/clients/consumer/options.ts +++ b/src/clients/consumer/options.ts @@ -26,6 +26,7 @@ export const groupOptionsProperties = { } } }, + protocolsMetadata: { function: true }, partitionAssigner: { function: true } } diff --git a/src/clients/consumer/types.ts b/src/clients/consumer/types.ts index fe7b256a..19d3ca17 100644 --- a/src/clients/consumer/types.ts +++ b/src/clients/consumer/types.ts @@ -1,3 +1,4 @@ +import { type Callback } from '../../apis/definitions.ts' import { type FetchRequestTopic } from '../../apis/consumer/fetch-v17.ts' import { type GroupProtocols } from '../../apis/enumerations.ts' import { type ConnectionPool } from '../../network/connection-pool.ts' @@ -52,6 +53,13 @@ export type GroupPartitionsAssigner = ( metadata: ClusterMetadata ) => GroupPartitionsAssignments[] +export type GroupProtocolsMetadataCallback = ( + protocols: GroupProtocolSubscription[], + topics: string[], + metadata: ClusterMetadata, + callback: Callback +) => Buffer | Promise | undefined + export const MessagesStreamModes = { LATEST: 'latest', EARLIEST: 'earliest', @@ -83,6 +91,7 @@ export interface GroupOptions { rebalanceTimeout?: number heartbeatInterval?: number protocols?: GroupProtocolSubscription[] + protocolsMetadata?: GroupProtocolsMetadataCallback partitionAssigner?: GroupPartitionsAssigner assignmentUserData?: Buffer } diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index 1a900a16..e307f1e8 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -3780,6 +3780,130 @@ test('joinGroup should not fail when the partition assigner misses a member', as deepStrictEqual(consumer2.assignments, []) }) +test('joinGroup should expose protocolsMetadata to the partition assigner', async t => { + const topic = await createTopic(t, true, 3) + const groupId = createGroupId() + const seenUserData: Record[] = [] + + function protocolsMetadata ( + _protocols: unknown, + topics: string[], + metadata: ClusterMetadata + ): Buffer { + const userData: Record = {} + + for (const topic of topics) { + userData[topic] = metadata.topics.get(topic)!.partitions.map((_partition, partitionId) => partitionId) + } + + return Buffer.from(JSON.stringify(userData), 'utf8') + } + + function partitionAssigner ( + _current: string, + members: Map, + topics: Set, + metadata: ClusterMetadata + ): GroupPartitionsAssignments[] { + const assignments: GroupPartitionsAssignments[] = [] + + seenUserData.length = 0 + for (const [memberId, member] of members) { + seenUserData.push(JSON.parse(member.metadata!.toString('utf8'))) + assignments.push({ memberId, assignments: new Map() }) + } + + for (const topic of topics) { + const partitionsCount = metadata.topics.get(topic)!.partitionsCount + + for (let i = 0; i < partitionsCount; i++) { + const member = assignments[i % assignments.length] + let topicAssignments = member.assignments.get(topic) + + if (!topicAssignments) { + topicAssignments = { topic, partitions: [] } + member.assignments.set(topic, topicAssignments) + } + + topicAssignments.partitions.push(i) + } + } + + return assignments + } + + const consumer1 = createConsumer(t, { groupId, partitionAssigner, protocolsMetadata }) + const consumer2 = createConsumer(t, { groupId, partitionAssigner, protocolsMetadata }) + + await consumer1.topics.trackAll(topic) + await consumer2.topics.trackAll(topic) + + await consumer1.joinGroup() + const rejoinPromise = once(consumer1, 'consumer:group:join') + await consumer2.joinGroup() + await rejoinPromise + + deepStrictEqual(seenUserData, [{ [topic]: [0, 1, 2] }, { [topic]: [0, 1, 2] }]) +}) + +test('joinGroup should support callback style protocolsMetadata', async t => { + const topic = await createTopic(t, true, 3) + const consumer = createConsumer(t) + let sawForceUpdate = false + + await consumer.topics.trackAll(topic) + + mockMetadata(consumer, 1, null, null, (original, options, callback) => { + sawForceUpdate = options.forceUpdate === true + original(options, callback) + }) + + await consumer.joinGroup({ + protocolsMetadata (_protocols, topics, metadata, callback) { + strictEqual(topics[0], topic) + strictEqual(metadata.topics.get(topic)!.partitionsCount, 3) + callback(null, Buffer.from('callback-metadata')) + } + }) + + strictEqual(sawForceUpdate, true) + deepStrictEqual(consumer.assignments, [{ topic, partitions: [0, 1, 2] }]) +}) + +test('joinGroup should support promise style protocolsMetadata', async t => { + const topic = await createTopic(t, true, 3) + const consumer = createConsumer(t) + + await consumer.topics.trackAll(topic) + + await consumer.joinGroup({ + async protocolsMetadata (_protocols, topics, metadata, callback) { + callback(null, Buffer.from('ignored-callback-metadata')) + strictEqual(topics[0], topic) + strictEqual(metadata.topics.get(topic)!.partitionsCount, 3) + return Buffer.from('promise-metadata') + } + }) + + deepStrictEqual(consumer.assignments, [{ topic, partitions: [0, 1, 2] }]) +}) + +test('joinGroup should fail when protocolsMetadata does not resolve to a buffer', async t => { + const topic = await createTopic(t, true, 3) + const consumer = createConsumer(t) + + await consumer.topics.trackAll(topic) + + await rejects( + consumer.joinGroup({ + protocolsMetadata () { + return 'not-a-buffer' as any + } + }), + /protocolsMetadata must resolve to a Buffer\./ + ) +}) + test('joinGroup might receive no assignment', async t => { const topic = await createTopic(t, true) const groupId = createGroupId() From 0314ce9655b06efb56b0deadf4ed6e6671acff89 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Wed, 17 Jun 2026 15:00:14 +0000 Subject: [PATCH 2/2] fixup Signed-off-by: Paolo Insogna --- src/clients/consumer/consumer.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts index 868b7d32..badc0ebe 100644 --- a/src/clients/consumer/consumer.ts +++ b/src/clients/consumer/consumer.ts @@ -1215,13 +1215,10 @@ export class Consumer, callback: CallbackWithPromise): void { - const diagnosticOptions = { ...options } - delete diagnosticOptions.protocolsMetadata - consumerGroupChannel.traceCallback( this.#performJoinGroup, 1, - createDiagnosticContext({ client: this, operation: 'joinGroup', options: diagnosticOptions }), + createDiagnosticContext({ client: this, operation: 'joinGroup', options }), this, options, callback