diff --git a/docs/consumer.md b/docs/consumer.md
index 5a359028..817271fe 100644
--- a/docs/consumer.md
+++ b/docs/consumer.md
@@ -47,6 +47,7 @@ Options:
| groupProtocol | `'classic' \| 'consumer'` | `'classic'` | Group protocol to use. Use `'classic'` for the original consumer group protocol and `'consumer'` for the new protocol introduced in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol).
The `'consumer'` protocol provides server-side partition assignment and incremental rebalancing behavior. |
| groupRemoteAssignor | `string` | `null` | Server-side assignor to use for `groupProtocol=consumer`. Keep it unset to let the server select a suitable assignor for the group. Available assignors: `'uniform'` or `'range'`. |
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.
Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties.
Not supported for `groupProtocol=consumer`. |
+| protocolsMetadata | `GroupProtocolsMetadataCallback` | | Callback used to compute opaque protocol metadata before joining or rejoining a classic consumer group. The returned `Buffer` is sent as the subscription `user_data` for every configured protocol. Not supported for `groupProtocol=consumer`. |
| partitionAssigner | `GroupPartitionsAssigner` | | Client-side partition assignment strategy.
Not supported for `groupProtocol=consumer`, use `groupRemoteAssignor` instead. |
| streamContext | `unknown` | | Default opaque user data for `MessagesStream` instances created by this consumer. It is forwarded to stream-owned `ConnectionPool` and `Connection` instances. Kafka never reads, mutates, or interprets this value. |
@@ -54,6 +55,57 @@ It also supports all the constructor options of `Base`.
The readonly `streamContext` getters expose the same opaque values on the consumer instance.
+### Protocol metadata callback
+
+`protocolsMetadata` lets each member attach fresh, opaque metadata to its `JoinGroup` subscription. Kafka forwards these bytes without interpreting them. The group leader can read the bytes from each member in a custom `partitionAssigner` through `member.metadata`.
+
+The callback is invoked once per join or rebalance, before sending `JoinGroup`. The consumer refreshes metadata for its currently tracked topics before invoking it.
+
+```typescript
+type GroupProtocolsMetadataCallback = (
+ protocols: GroupProtocolSubscription[],
+ topics: string[],
+ metadata: ClusterMetadata,
+ callback: (error: Error | null, metadata?: Buffer) => void
+) => Buffer | Promise | undefined
+```
+
+The callback can be used in three styles:
+
+- Return a `Buffer` directly.
+- Return a `Promise`.
+- Return `undefined` and invoke the callback with a `Buffer`.
+
+If the resolved value is not a `Buffer`, `joinGroup()` fails. Do not mix styles; if multiple paths settle, only the first result is used.
+
+Example:
+
+```typescript
+const consumer = new Consumer({
+ groupId: 'my-group',
+ clientId: 'my-consumer',
+ bootstrapBrokers: ['localhost:9092'],
+ protocolsMetadata (_protocols, topics, metadata) {
+ const userData: Record = {}
+
+ for (const topic of topics) {
+ userData[topic] = metadata.topics.get(topic)!.partitions.map((_partition, partitionId) => partitionId)
+ }
+
+ return Buffer.from(JSON.stringify(userData), 'utf8')
+ },
+ partitionAssigner (_current, members, topics, metadata) {
+ for (const member of members.values()) {
+ const userData = JSON.parse(member.metadata!.toString('utf8'))
+ // Use the member-provided opaque metadata while computing assignments.
+ }
+
+ // Return GroupPartitionsAssignments[].
+ return []
+ }
+})
+```
+
## Basic Methods
### `isActive`
@@ -297,12 +349,13 @@ This method is no-op for `groupProtocol=consumer`.
Options:
-| Property | Type | Default | Description |
-| ----------------- | ----------------------------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.
This is only relevant when Kafka creates a new group. |
-| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.
This is only relevant when Kafka creates a new group. |
-| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. |
-| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.
Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. |
+| Property | Type | Default | Description |
+| ----------------- | -------------------------------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.
This is only relevant when Kafka creates a new group. |
+| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.
This is only relevant when Kafka creates a new group. |
+| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. |
+| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.
Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. |
+| protocolsMetadata | `GroupProtocolsMetadataCallback` | | Callback used to compute opaque protocol metadata before joining or rejoining a classic consumer group. See [Protocol metadata callback](#protocol-metadata-callback). |
### `leaveGroup([force])`
diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts
index 154db1b1..badc0ebe 100644
--- a/src/clients/consumer/consumer.ts
+++ b/src/clients/consumer/consumer.ts
@@ -38,6 +38,7 @@ import {
type SyncGroupRequestAssignment,
type SyncGroupResponse
} from '../../apis/consumer/sync-group-v5.ts'
+import { type Callback } from '../../apis/definitions.ts'
import { FetchIsolationLevels, FindCoordinatorKeyTypes } from '../../apis/enumerations.ts'
import { type FindCoordinatorRequest, type FindCoordinatorResponse } from '../../apis/metadata/find-coordinator-v6.ts'
import {
@@ -57,11 +58,7 @@ import { INT32_SIZE } from '../../protocol/definitions.ts'
import { Reader } from '../../protocol/reader.ts'
import { IS_CONTROL } from '../../protocol/records.ts'
import { Writer } from '../../protocol/writer.ts'
-import {
- kAutocommit,
- kGetFetchNode,
- kRefreshOffsetsAndFetch
-} from '../../symbols.ts'
+import { kAutocommit, kGetFetchNode, kRefreshOffsetsAndFetch } from '../../symbols.ts'
import { emitExperimentalApiWarning } from '../../utils.ts'
import {
Base,
@@ -115,6 +112,7 @@ import {
type GroupAssignment,
type GroupOptions,
type GroupPartitionsAssigner,
+ type GroupProtocolsMetadataCallback,
type GroupProtocolSubscription,
type ListCommitsOptions,
type ListOffsetsOptions,
@@ -697,6 +695,11 @@ export class Consumer(
- 'joinGroup',
- (connection, groupCallback) => {
- this[kGetApi]('JoinGroup', (error, api) => {
- if (error) {
- groupCallback(error)
- return
- }
-
- api!(
- connection,
- this.groupId,
- options.sessionTimeout,
- options.rebalanceTimeout,
- this.memberId ?? '',
- this.groupInstanceId,
- 'consumer',
- protocols,
- '',
- groupCallback
- )
- })
- },
- (error, response) => {
- if (!this.#membershipActive) {
- callback(null)
- return
- }
-
- if (error) {
- if (this.#getRejoinError(error)) {
- this.#performJoinGroup(options, callback)
- return
- }
-
- callback(error)
- return
- }
+ this.#createJoinGroupProtocols(options, (error, protocols) => {
+ if (error) {
+ callback(error)
+ return
+ }
- // This is for Azure Event Hubs compatibility, which does not respond with an error on the first join
- this.memberId = response!.memberId!
- this.generationId = response!.generationId
- this.#isLeader = response!.leader === this.memberId
- this.#protocol = response!.protocolName!
-
- this.#members = new Map()
- for (const member of response!.members) {
- this.#members.set(
- member.memberId,
- this.#decodeProtocolSubscriptionMetadata(member.memberId, member.metadata!)
- )
- }
+ this.#performDeduplicateGroupOperaton(
+ 'joinGroup',
+ (connection, groupCallback) => {
+ this[kGetApi]('JoinGroup', (error, api) => {
+ if (error) {
+ groupCallback(error)
+ return
+ }
- // Send a syncGroup request
- this.#syncGroup(options.partitionAssigner, (error, response) => {
+ api!(
+ connection,
+ this.groupId,
+ options.sessionTimeout,
+ options.rebalanceTimeout,
+ this.memberId ?? '',
+ this.groupInstanceId,
+ 'consumer',
+ protocols!,
+ '',
+ groupCallback
+ )
+ })
+ },
+ (error, response) => {
if (!this.#membershipActive) {
callback(null)
return
@@ -1772,26 +1742,58 @@ export class Consumer {
- this.#heartbeat(options)
- }, options.heartbeatInterval)
-
- this.emitWithDebug('consumer', 'group:join', {
- groupId: this.groupId,
- memberId: this.memberId,
- generationId: this.generationId,
- isLeader: this.#isLeader,
- assignments: this.assignments
- })
+ // Send a syncGroup request
+ this.#syncGroup(options.partitionAssigner, (error, response) => {
+ if (!this.#membershipActive) {
+ callback(null)
+ return
+ }
- callback(null, this.memberId!)
- })
- }
- )
+ if (error) {
+ if (this.#getRejoinError(error)) {
+ this.#performJoinGroup(options, callback)
+ return
+ }
+
+ callback(error)
+ return
+ }
+
+ this.assignments = response!
+ this.#syncPreferredReadReplicas()
+
+ this.#cancelHeartbeat()
+ this.#heartbeatInterval = setTimeout(() => {
+ this.#heartbeat(options)
+ }, options.heartbeatInterval)
+
+ this.emitWithDebug('consumer', 'group:join', {
+ groupId: this.groupId,
+ memberId: this.memberId,
+ generationId: this.generationId,
+ isLeader: this.#isLeader,
+ assignments: this.assignments
+ })
+
+ callback(null, this.memberId!)
+ })
+ }
+ )
+ })
}
#performLeaveGroup (force: boolean, callback: CallbackWithPromise): void {
@@ -2018,6 +2020,84 @@ export class Consumer, callback: Callback): void {
+ if (typeof options.protocolsMetadata !== 'function') {
+ callback(
+ null,
+ options.protocols.map(protocol => ({
+ name: protocol.name,
+ metadata: this.#encodeProtocolSubscriptionMetadata(protocol, this.topics.current)
+ }))
+ )
+ return
+ }
+
+ this[kMetadata]({ topics: this.topics.current, forceUpdate: true }, (error, metadata) => {
+ if (error) {
+ callback(this.#handleError(error))
+ return
+ }
+
+ this.#resolveProtocolsMetadata(options.protocolsMetadata, options.protocols, this.topics.current, metadata!, (
+ error,
+ userData
+ ) => {
+ if (error) {
+ callback(error)
+ return
+ } else if (!Buffer.isBuffer(userData)) {
+ callback(new UserError('protocolsMetadata must resolve to a Buffer.'))
+ return
+ }
+
+ callback(
+ null,
+ options.protocols.map(protocol => ({
+ name: protocol.name,
+ metadata: this.#encodeProtocolSubscriptionMetadata({ ...protocol, metadata: userData }, this.topics.current)
+ }))
+ )
+ })
+ })
+ }
+
+ #resolveProtocolsMetadata (
+ protocolsMetadata: GroupProtocolsMetadataCallback,
+ protocols: GroupProtocolSubscription[],
+ topics: string[],
+ metadata: ClusterMetadata,
+ callback: Callback
+ ): void {
+ let settled = false
+
+ function resolveCallback (error: Error | null, userData?: Buffer) {
+ if (settled) {
+ return
+ }
+
+ settled = true
+ callback(error, userData)
+ }
+
+ let result: Buffer | Promise | undefined
+ try {
+ result = protocolsMetadata(protocols, topics, metadata, resolveCallback)
+
+ if (typeof result !== 'undefined') {
+ if (typeof (result as Promise).then === 'function') {
+ ;(result as Promise).then(
+ userData => process.nextTick(resolveCallback, null, userData),
+ error => process.nextTick(resolveCallback, error)
+ )
+ } else {
+ resolveCallback(null, result as Buffer)
+ }
+ }
+ } catch (error) {
+ resolveCallback(error as Error)
+ }
+ }
+
/*
The following two methods follow:
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
diff --git a/src/clients/consumer/options.ts b/src/clients/consumer/options.ts
index 4ed9074f..60943188 100644
--- a/src/clients/consumer/options.ts
+++ b/src/clients/consumer/options.ts
@@ -26,6 +26,7 @@ export const groupOptionsProperties = {
}
}
},
+ protocolsMetadata: { function: true },
partitionAssigner: { function: true }
}
diff --git a/src/clients/consumer/types.ts b/src/clients/consumer/types.ts
index fe7b256a..19d3ca17 100644
--- a/src/clients/consumer/types.ts
+++ b/src/clients/consumer/types.ts
@@ -1,3 +1,4 @@
+import { type Callback } from '../../apis/definitions.ts'
import { type FetchRequestTopic } from '../../apis/consumer/fetch-v17.ts'
import { type GroupProtocols } from '../../apis/enumerations.ts'
import { type ConnectionPool } from '../../network/connection-pool.ts'
@@ -52,6 +53,13 @@ export type GroupPartitionsAssigner = (
metadata: ClusterMetadata
) => GroupPartitionsAssignments[]
+export type GroupProtocolsMetadataCallback = (
+ protocols: GroupProtocolSubscription[],
+ topics: string[],
+ metadata: ClusterMetadata,
+ callback: Callback
+) => Buffer | Promise | undefined
+
export const MessagesStreamModes = {
LATEST: 'latest',
EARLIEST: 'earliest',
@@ -83,6 +91,7 @@ export interface GroupOptions {
rebalanceTimeout?: number
heartbeatInterval?: number
protocols?: GroupProtocolSubscription[]
+ protocolsMetadata?: GroupProtocolsMetadataCallback
partitionAssigner?: GroupPartitionsAssigner
assignmentUserData?: Buffer
}
diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts
index 1a900a16..e307f1e8 100644
--- a/test/clients/consumer/consumer.test.ts
+++ b/test/clients/consumer/consumer.test.ts
@@ -3780,6 +3780,130 @@ test('joinGroup should not fail when the partition assigner misses a member', as
deepStrictEqual(consumer2.assignments, [])
})
+test('joinGroup should expose protocolsMetadata to the partition assigner', async t => {
+ const topic = await createTopic(t, true, 3)
+ const groupId = createGroupId()
+ const seenUserData: Record[] = []
+
+ function protocolsMetadata (
+ _protocols: unknown,
+ topics: string[],
+ metadata: ClusterMetadata
+ ): Buffer {
+ const userData: Record = {}
+
+ for (const topic of topics) {
+ userData[topic] = metadata.topics.get(topic)!.partitions.map((_partition, partitionId) => partitionId)
+ }
+
+ return Buffer.from(JSON.stringify(userData), 'utf8')
+ }
+
+ function partitionAssigner (
+ _current: string,
+ members: Map,
+ topics: Set,
+ metadata: ClusterMetadata
+ ): GroupPartitionsAssignments[] {
+ const assignments: GroupPartitionsAssignments[] = []
+
+ seenUserData.length = 0
+ for (const [memberId, member] of members) {
+ seenUserData.push(JSON.parse(member.metadata!.toString('utf8')))
+ assignments.push({ memberId, assignments: new Map() })
+ }
+
+ for (const topic of topics) {
+ const partitionsCount = metadata.topics.get(topic)!.partitionsCount
+
+ for (let i = 0; i < partitionsCount; i++) {
+ const member = assignments[i % assignments.length]
+ let topicAssignments = member.assignments.get(topic)
+
+ if (!topicAssignments) {
+ topicAssignments = { topic, partitions: [] }
+ member.assignments.set(topic, topicAssignments)
+ }
+
+ topicAssignments.partitions.push(i)
+ }
+ }
+
+ return assignments
+ }
+
+ const consumer1 = createConsumer(t, { groupId, partitionAssigner, protocolsMetadata })
+ const consumer2 = createConsumer(t, { groupId, partitionAssigner, protocolsMetadata })
+
+ await consumer1.topics.trackAll(topic)
+ await consumer2.topics.trackAll(topic)
+
+ await consumer1.joinGroup()
+ const rejoinPromise = once(consumer1, 'consumer:group:join')
+ await consumer2.joinGroup()
+ await rejoinPromise
+
+ deepStrictEqual(seenUserData, [{ [topic]: [0, 1, 2] }, { [topic]: [0, 1, 2] }])
+})
+
+test('joinGroup should support callback style protocolsMetadata', async t => {
+ const topic = await createTopic(t, true, 3)
+ const consumer = createConsumer(t)
+ let sawForceUpdate = false
+
+ await consumer.topics.trackAll(topic)
+
+ mockMetadata(consumer, 1, null, null, (original, options, callback) => {
+ sawForceUpdate = options.forceUpdate === true
+ original(options, callback)
+ })
+
+ await consumer.joinGroup({
+ protocolsMetadata (_protocols, topics, metadata, callback) {
+ strictEqual(topics[0], topic)
+ strictEqual(metadata.topics.get(topic)!.partitionsCount, 3)
+ callback(null, Buffer.from('callback-metadata'))
+ }
+ })
+
+ strictEqual(sawForceUpdate, true)
+ deepStrictEqual(consumer.assignments, [{ topic, partitions: [0, 1, 2] }])
+})
+
+test('joinGroup should support promise style protocolsMetadata', async t => {
+ const topic = await createTopic(t, true, 3)
+ const consumer = createConsumer(t)
+
+ await consumer.topics.trackAll(topic)
+
+ await consumer.joinGroup({
+ async protocolsMetadata (_protocols, topics, metadata, callback) {
+ callback(null, Buffer.from('ignored-callback-metadata'))
+ strictEqual(topics[0], topic)
+ strictEqual(metadata.topics.get(topic)!.partitionsCount, 3)
+ return Buffer.from('promise-metadata')
+ }
+ })
+
+ deepStrictEqual(consumer.assignments, [{ topic, partitions: [0, 1, 2] }])
+})
+
+test('joinGroup should fail when protocolsMetadata does not resolve to a buffer', async t => {
+ const topic = await createTopic(t, true, 3)
+ const consumer = createConsumer(t)
+
+ await consumer.topics.trackAll(topic)
+
+ await rejects(
+ consumer.joinGroup({
+ protocolsMetadata () {
+ return 'not-a-buffer' as any
+ }
+ }),
+ /protocolsMetadata must resolve to a Buffer\./
+ )
+})
+
test('joinGroup might receive no assignment', async t => {
const topic = await createTopic(t, true)
const groupId = createGroupId()