Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions docs/consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,65 @@ Options:
| groupProtocol | `'classic' \| 'consumer'` | `'classic'` | Group protocol to use. Use `'classic'` for the original consumer group protocol and `'consumer'` for the new protocol introduced in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol).<br/><br/> The `'consumer'` protocol provides server-side partition assignment and incremental rebalancing behavior. |
| groupRemoteAssignor | `string` | `null` | Server-side assignor to use for `groupProtocol=consumer`. Keep it unset to let the server select a suitable assignor for the group. Available assignors: `'uniform'` or `'range'`. |
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. <br/><br/> Not supported for `groupProtocol=consumer`. |
| protocolsMetadata | `GroupProtocolsMetadataCallback` | | Callback used to compute opaque protocol metadata before joining or rejoining a classic consumer group. The returned `Buffer` is sent as the subscription `user_data` for every configured protocol. Not supported for `groupProtocol=consumer`. |
| partitionAssigner | `GroupPartitionsAssigner` | | Client-side partition assignment strategy.<br/><br/> Not supported for `groupProtocol=consumer`, use `groupRemoteAssignor` instead. |
| streamContext | `unknown` | | Default opaque user data for `MessagesStream` instances created by this consumer. It is forwarded to stream-owned `ConnectionPool` and `Connection` instances. Kafka never reads, mutates, or interprets this value. |

It also supports all the constructor options of `Base`.

The readonly `streamContext` getters expose the same opaque values on the consumer instance.

### Protocol metadata callback

`protocolsMetadata` lets each member attach fresh, opaque metadata to its `JoinGroup` subscription. Kafka forwards these bytes without interpreting them. The group leader can read the bytes from each member in a custom `partitionAssigner` through `member.metadata`.

The callback is invoked once per join or rebalance, before sending `JoinGroup`. The consumer refreshes metadata for its currently tracked topics before invoking it.

```typescript
type GroupProtocolsMetadataCallback = (
protocols: GroupProtocolSubscription[],
topics: string[],
metadata: ClusterMetadata,
callback: (error: Error | null, metadata?: Buffer) => void
) => Buffer | Promise<Buffer> | undefined
```

The callback can be used in three styles:

- Return a `Buffer` directly.
- Return a `Promise<Buffer>`.
- Return `undefined` and invoke the callback with a `Buffer`.

If the resolved value is not a `Buffer`, `joinGroup()` fails. Do not mix styles; if multiple paths settle, only the first result is used.

Example:

```typescript
const consumer = new Consumer({
groupId: 'my-group',
clientId: 'my-consumer',
bootstrapBrokers: ['localhost:9092'],
protocolsMetadata (_protocols, topics, metadata) {
const userData: Record<string, number[]> = {}

for (const topic of topics) {
userData[topic] = metadata.topics.get(topic)!.partitions.map((_partition, partitionId) => partitionId)
}

return Buffer.from(JSON.stringify(userData), 'utf8')
},
partitionAssigner (_current, members, topics, metadata) {
for (const member of members.values()) {
const userData = JSON.parse(member.metadata!.toString('utf8'))
// Use the member-provided opaque metadata while computing assignments.
}

// Return GroupPartitionsAssignments[].
return []
}
})
```

## Basic Methods

### `isActive`
Expand Down Expand Up @@ -297,12 +349,13 @@ This method is no-op for `groupProtocol=consumer`.

Options:

| Property | Type | Default | Description |
| ----------------- | ----------------------------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. |
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. |
| Property | Type | Default | Description |
| ----------------- | -------------------------------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. |
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. |
| protocolsMetadata | `GroupProtocolsMetadataCallback` | | Callback used to compute opaque protocol metadata before joining or rejoining a classic consumer group. See [Protocol metadata callback](#protocol-metadata-callback). |

### `leaveGroup([force])`

Expand Down
248 changes: 164 additions & 84 deletions src/clients/consumer/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
type SyncGroupRequestAssignment,
type SyncGroupResponse
} from '../../apis/consumer/sync-group-v5.ts'
import { type Callback } from '../../apis/definitions.ts'
import { FetchIsolationLevels, FindCoordinatorKeyTypes } from '../../apis/enumerations.ts'
import { type FindCoordinatorRequest, type FindCoordinatorResponse } from '../../apis/metadata/find-coordinator-v6.ts'
import {
Expand All @@ -57,11 +58,7 @@ import { INT32_SIZE } from '../../protocol/definitions.ts'
import { Reader } from '../../protocol/reader.ts'
import { IS_CONTROL } from '../../protocol/records.ts'
import { Writer } from '../../protocol/writer.ts'
import {
kAutocommit,
kGetFetchNode,
kRefreshOffsetsAndFetch
} from '../../symbols.ts'
import { kAutocommit, kGetFetchNode, kRefreshOffsetsAndFetch } from '../../symbols.ts'
import { emitExperimentalApiWarning } from '../../utils.ts'
import {
Base,
Expand Down Expand Up @@ -115,6 +112,7 @@ import {
type GroupAssignment,
type GroupOptions,
type GroupPartitionsAssigner,
type GroupProtocolsMetadataCallback,
type GroupProtocolSubscription,
type ListCommitsOptions,
type ListOffsetsOptions,
Expand Down Expand Up @@ -697,6 +695,11 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
options.heartbeatInterval ??= (this[kOptions] as GroupOptions).heartbeatInterval!
options.protocols ??= (this[kOptions] as GroupOptions).protocols!

const protocolsMetadata = (this[kOptions] as GroupOptions).protocolsMetadata
if (!options.protocolsMetadata && protocolsMetadata) {
options.protocolsMetadata = protocolsMetadata
}

this.#validateGroupOptions(options)

this.#membershipActive = true
Expand Down Expand Up @@ -1694,69 +1697,36 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa

this.#cancelHeartbeat()

const protocols: JoinGroupRequestProtocol[] = []
for (const protocol of options.protocols) {
protocols.push({
name: protocol.name,
metadata: this.#encodeProtocolSubscriptionMetadata(protocol, this.topics.current)
})
}

this.#performDeduplicateGroupOperaton<JoinGroupResponse>(
'joinGroup',
(connection, groupCallback) => {
this[kGetApi]<JoinGroupRequest, JoinGroupResponse>('JoinGroup', (error, api) => {
if (error) {
groupCallback(error)
return
}

api!(
connection,
this.groupId,
options.sessionTimeout,
options.rebalanceTimeout,
this.memberId ?? '',
this.groupInstanceId,
'consumer',
protocols,
'',
groupCallback
)
})
},
(error, response) => {
if (!this.#membershipActive) {
callback(null)
return
}

if (error) {
if (this.#getRejoinError(error)) {
this.#performJoinGroup(options, callback)
return
}

callback(error)
return
}
this.#createJoinGroupProtocols(options, (error, protocols) => {
if (error) {
callback(error)
return
}

// This is for Azure Event Hubs compatibility, which does not respond with an error on the first join
this.memberId = response!.memberId!
this.generationId = response!.generationId
this.#isLeader = response!.leader === this.memberId
this.#protocol = response!.protocolName!

this.#members = new Map()
for (const member of response!.members) {
this.#members.set(
member.memberId,
this.#decodeProtocolSubscriptionMetadata(member.memberId, member.metadata!)
)
}
this.#performDeduplicateGroupOperaton<JoinGroupResponse>(
'joinGroup',
(connection, groupCallback) => {
this[kGetApi]<JoinGroupRequest, JoinGroupResponse>('JoinGroup', (error, api) => {
if (error) {
groupCallback(error)
return
}

// Send a syncGroup request
this.#syncGroup(options.partitionAssigner, (error, response) => {
api!(
connection,
this.groupId,
options.sessionTimeout,
options.rebalanceTimeout,
this.memberId ?? '',
this.groupInstanceId,
'consumer',
protocols!,
'',
groupCallback
)
})
},
(error, response) => {
if (!this.#membershipActive) {
callback(null)
return
Expand All @@ -1772,26 +1742,58 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
return
}

this.assignments = response!
this.#syncPreferredReadReplicas()
// This is for Azure Event Hubs compatibility, which does not respond with an error on the first join
this.memberId = response!.memberId!
this.generationId = response!.generationId
this.#isLeader = response!.leader === this.memberId
this.#protocol = response!.protocolName!

this.#members = new Map()
for (const member of response!.members) {
this.#members.set(
member.memberId,
this.#decodeProtocolSubscriptionMetadata(member.memberId, member.metadata!)
)
}

this.#cancelHeartbeat()
this.#heartbeatInterval = setTimeout(() => {
this.#heartbeat(options)
}, options.heartbeatInterval)

this.emitWithDebug('consumer', 'group:join', {
groupId: this.groupId,
memberId: this.memberId,
generationId: this.generationId,
isLeader: this.#isLeader,
assignments: this.assignments
})
// Send a syncGroup request
this.#syncGroup(options.partitionAssigner, (error, response) => {
if (!this.#membershipActive) {
callback(null)
return
}

callback(null, this.memberId!)
})
}
)
if (error) {
if (this.#getRejoinError(error)) {
this.#performJoinGroup(options, callback)
return
}

callback(error)
return
}

this.assignments = response!
this.#syncPreferredReadReplicas()

this.#cancelHeartbeat()
this.#heartbeatInterval = setTimeout(() => {
this.#heartbeat(options)
}, options.heartbeatInterval)

this.emitWithDebug('consumer', 'group:join', {
groupId: this.groupId,
memberId: this.memberId,
generationId: this.generationId,
isLeader: this.#isLeader,
assignments: this.assignments
})

callback(null, this.memberId!)
})
}
)
})
}

#performLeaveGroup (force: boolean, callback: CallbackWithPromise<void>): void {
Expand Down Expand Up @@ -2018,6 +2020,84 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
}
}

#createJoinGroupProtocols (options: Required<GroupOptions>, callback: Callback<JoinGroupRequestProtocol[]>): void {
if (typeof options.protocolsMetadata !== 'function') {
callback(
null,
options.protocols.map(protocol => ({
name: protocol.name,
metadata: this.#encodeProtocolSubscriptionMetadata(protocol, this.topics.current)
}))
)
return
}

this[kMetadata]({ topics: this.topics.current, forceUpdate: true }, (error, metadata) => {
if (error) {
callback(this.#handleError(error))
return
}

this.#resolveProtocolsMetadata(options.protocolsMetadata, options.protocols, this.topics.current, metadata!, (
error,
userData
) => {
if (error) {
callback(error)
return
} else if (!Buffer.isBuffer(userData)) {
callback(new UserError('protocolsMetadata must resolve to a Buffer.'))
return
}

callback(
null,
options.protocols.map(protocol => ({
name: protocol.name,
metadata: this.#encodeProtocolSubscriptionMetadata({ ...protocol, metadata: userData }, this.topics.current)
}))
)
})
})
}

#resolveProtocolsMetadata (
protocolsMetadata: GroupProtocolsMetadataCallback,
protocols: GroupProtocolSubscription[],
topics: string[],
metadata: ClusterMetadata,
callback: Callback<Buffer>
): void {
let settled = false

function resolveCallback (error: Error | null, userData?: Buffer) {
if (settled) {
return
}

settled = true
callback(error, userData)
}

let result: Buffer | Promise<Buffer> | undefined
try {
result = protocolsMetadata(protocols, topics, metadata, resolveCallback)

if (typeof result !== 'undefined') {
if (typeof (result as Promise<Buffer>).then === 'function') {
;(result as Promise<Buffer>).then(
userData => process.nextTick(resolveCallback, null, userData),
error => process.nextTick(resolveCallback, error)
)
} else {
resolveCallback(null, result as Buffer)
}
}
} catch (error) {
resolveCallback(error as Error)
}
}

/*
The following two methods follow:
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
Expand Down
1 change: 1 addition & 0 deletions src/clients/consumer/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export const groupOptionsProperties = {
}
}
},
protocolsMetadata: { function: true },
partitionAssigner: { function: true }
}

Expand Down
Loading
Loading