Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions docs/consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The complete TypeScript type of the `Consumer` is determined by the `deserialize
| `consumer:group:leave` | `ConsumerGroupLeavePayload` | Emitted when leaving a group. |
| `consumer:group:rejoin` | (none) | Emitted when re-joining a group after a rebalance. |
| `consumer:group:rebalance` | `ConsumerGroupRebalancePayload` | Emitted when group rebalancing occurs. |
| `consumer:group:autocommit:error` | `ConsumerGroupAutocommitErrorPayload` | Emitted when autocommit fails during a cooperative rebalance. The rebalance still proceeds. |
| `consumer:heartbeat:start` | `ConsumerHeartbeatPayload?` | Emitted when starting new heartbeats. |
| `consumer:heartbeat:cancel` | `ConsumerHeartbeatPayload` | Emitted if a scheduled heartbeat has been canceled. |
| `consumer:heartbeat:end` | `ConsumerHeartbeatPayload?` | Emitted during successful heartbeats. |
Expand Down Expand Up @@ -46,14 +47,32 @@ Options:
| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats.<br/><br/> Not supported for `groupProtocol=consumer`, instead it is set with the broker configuration property `group.consumer.heartbeat.interval`. |
| groupProtocol | `'classic' \| 'consumer'` | `'classic'` | Group protocol to use. Use `'classic'` for the original consumer group protocol and `'consumer'` for the new protocol introduced in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol).<br/><br/> The `'consumer'` protocol provides server-side partition assignment and incremental rebalancing behavior. |
| groupRemoteAssignor | `string` | `null` | Server-side assignor to use for `groupProtocol=consumer`. Keep it unset to let the server select a suitable assignor for the group. Available assignors: `'uniform'` or `'range'`. |
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. <br/><br/> Not supported for `groupProtocol=consumer`. |
| partitionAssigner | `GroupPartitionsAssigner` | | Client-side partition assignment strategy.<br/><br/> Not supported for `groupProtocol=consumer`, use `groupRemoteAssignor` instead. |
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. Use `{ name: 'cooperative-sticky', version: 3 }` to opt in to KIP-429 cooperative rebalancing. <br/><br/> Not supported for `groupProtocol=consumer`. |
| partitionAssigner | `GroupPartitionsAssigner` | | Client-side partition assignment strategy.<br/><br/> Built-in assigners are `roundRobinAssigner` and `cooperativeStickyAssigner`. Not supported for `groupProtocol=consumer`, use `groupRemoteAssignor` instead. |
| streamContext | `unknown` | | Default opaque user data for `MessagesStream` instances created by this consumer. It is forwarded to stream-owned `ConnectionPool` and `Connection` instances. Kafka never reads, mutates, or interprets this value. |

It also supports all the constructor options of `Base`.

The readonly `streamContext` getters expose the same opaque values on the consumer instance.

### Cooperative Sticky Assignment

Classic consumer groups can opt in to [KIP-429](https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol) cooperative sticky assignment:

```ts
import { COOPERATIVE_STICKY_ASSIGNOR, Consumer } from '@platformatic/kafka'

const consumer = new Consumer({
groupId: 'my-group',
bootstrapBrokers: ['localhost:9092'],
protocols: [{ name: COOPERATIVE_STICKY_ASSIGNOR, version: 3 }]
})
```

The cooperative sticky assignor preserves existing ownership where possible and performs partition transfers over a follow-up rebalance so partitions are revoked before another member starts consuming them. All members of the group should advertise the same assignor during the migration.

Subscription metadata includes `rackId` when using protocol version 3, but the built-in assignor does not yet use rack information for rack-aware partition placement.

## Basic Methods

### `isActive`
Expand Down Expand Up @@ -302,7 +321,7 @@ Options:
| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. |
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. |
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. Use `{ name: 'cooperative-sticky', version: 3 }` to opt in to KIP-429 cooperative rebalancing. |

### `leaveGroup([force])`

Expand Down
1 change: 1 addition & 0 deletions docs/kips.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Other KIPs up to Kafka 4.1.0 are likely supported, or at least have protocol/API
| [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API) | Supported |
| [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) | Supported |
| [KIP-368](https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate) | Supported |
| [KIP-429](https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol) | Supported |
| [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430%3A+Return+Authorized+Operations+in+Metadata%2C+Describe+Topics%2C+and+Describe+Groups+Responses) | Supported |
| [KIP-455](https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment) | Supported |
| [KIP-482](https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields) | Supported |
Expand Down
34 changes: 14 additions & 20 deletions src/clients/admin/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ import {
} from '../../diagnostic.ts'
import { MultipleErrors, UserError } from '../../errors.ts'
import { type Broker, type Connection } from '../../index.ts'
import { Reader } from '../../protocol/reader.ts'
import {
Base,
kAfterCreate,
Expand All @@ -113,6 +112,7 @@ import {
kValidateOptions
} from '../base/base.ts'
import { type BaseOptions } from '../base/types.ts'
import { decodeConsumerProtocolAssignment, decodeConsumerProtocolSubscription } from '../consumer/consumer-protocol.ts'
import { type GroupAssignment } from '../consumer/types.ts'
import {
adminListOffsetsOptionsValidator,
Expand Down Expand Up @@ -1307,32 +1307,26 @@ export class Admin extends Base<AdminOptions> {
}

for (const member of raw.members) {
const reader = Reader.from(member.memberMetadata)

let memberMetadata: GroupMember['metadata'] | undefined
let memberAssignments: Map<string, GroupAssignment> | undefined

if (reader.remaining > 0) {
if (member.memberMetadata.length > 0) {
const subscription = decodeConsumerProtocolSubscription(member.memberMetadata)
memberMetadata = {
version: reader.readInt16(),
topics: reader.readArray(r => r.readString(false), false, false),
metadata: reader.readBytes(false)
version: subscription.version,
topics: subscription.topics,
metadata: subscription.userData,
ownedPartitions: subscription.ownedPartitions,
generationId: subscription.generationId,
rackId: subscription.rackId
}

reader.reset(member.memberAssignment)
reader.skip(2) // Ignore Version information

memberAssignments = reader.readMap(
r => {
const topic = r.readString(false)

return [topic, { topic, partitions: reader.readArray(r => r.readInt32(), false, false) }]
},
false,
false
memberAssignments = new Map(
decodeConsumerProtocolAssignment(member.memberAssignment).assignedPartitions.map(assignment => [
assignment.topic,
assignment
])
)

// Ignore the user data
}

group.members.set(member.memberId, {
Expand Down
174 changes: 174 additions & 0 deletions src/clients/consumer/consumer-protocol.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { Reader } from '../../protocol/reader.ts'
import { Writer } from '../../protocol/writer.ts'
import { type GroupAssignment } from './types.ts'

export const CONSUMER_PROTOCOL_LOWEST_VERSION = 0
export const CONSUMER_PROTOCOL_HIGHEST_VERSION = 3

export interface ConsumerProtocolSubscriptionData {
version: number
topics: string[]
userData: Buffer
ownedPartitions: GroupAssignment[]
generationId: number
rackId: string | null
}

export interface ConsumerProtocolAssignmentData {
version: number
assignedPartitions: GroupAssignment[]
userData: Buffer
}

function supportedVersion (version: number): number {
if (version < CONSUMER_PROTOCOL_LOWEST_VERSION) {
return CONSUMER_PROTOCOL_LOWEST_VERSION
}

if (version > CONSUMER_PROTOCOL_HIGHEST_VERSION) {
return CONSUMER_PROTOCOL_HIGHEST_VERSION
}

return version
}

function sortedAssignments (assignments: GroupAssignment[]): GroupAssignment[] {
return assignments
.filter(({ partitions }) => partitions.length > 0)
.map(({ topic, partitions }) => ({ topic, partitions: [...partitions].sort((a, b) => a - b) }))
.sort((a, b) => a.topic.localeCompare(b.topic))
}

function appendTopicPartitions (writer: Writer, assignments: GroupAssignment[]): Writer {
return writer.appendArray(
sortedAssignments(assignments),
(w, { topic, partitions }) => {
w.appendString(topic, false).appendArray(partitions, (w, partition) => w.appendInt32(partition), false, false)
},
false,
false
)
}

function readTopicPartitions (reader: Reader): GroupAssignment[] {
return reader.readArray(
r => ({
topic: r.readString(false),
partitions: r.readArray(r => r.readInt32(), false, false)
}),
false,
false
)
}

export function encodeConsumerProtocolSubscription (data: {
version: number
topics: string[]
userData?: Buffer | string
ownedPartitions?: GroupAssignment[]
generationId?: number
rackId?: string | null
}): Buffer {
const version = supportedVersion(data.version)
const userData = typeof data.userData === 'string' ? Buffer.from(data.userData) : data.userData
const writer = Writer.create()
.appendInt16(version)
.appendArray([...data.topics].sort(), (w, topic) => w.appendString(topic, false), false, false)
.appendBytes(userData, false)

if (version >= 1) {
appendTopicPartitions(writer, data.ownedPartitions ?? [])
}

if (version >= 2) {
writer.appendInt32(data.generationId ?? -1)
}

if (version >= 3) {
writer.appendString(data.rackId || null, false)
}

return writer.buffer
}

export function decodeConsumerProtocolSubscription (buffer: Buffer): ConsumerProtocolSubscriptionData {
const reader = Reader.from(buffer)
const encodedVersion = reader.readInt16()
const version = supportedVersion(encodedVersion)
const subscription: ConsumerProtocolSubscriptionData = {
version: encodedVersion,
topics: reader.readArray(r => r.readString(false), false, false),
userData: reader.readBytes(false),
ownedPartitions: [],
generationId: -1,
rackId: null
}

if (version >= 1) {
subscription.ownedPartitions = readTopicPartitions(reader)
}

if (version >= 2) {
subscription.generationId = reader.readInt32()
}

if (version >= 3) {
subscription.rackId = reader.readNullableString(false)
}

return subscription
}

export function decodeCooperativeStickyGeneration (userData: Buffer): number {
if (userData.length < 4) {
return -1
}

try {
return Reader.from(userData).readInt32()
} catch {
return -1
}
}

export function encodeConsumerProtocolAssignment (data: {
version: number
assignedPartitions: GroupAssignment[]
userData?: Buffer | string | null
}): Buffer {
const userData = typeof data.userData === 'string' ? Buffer.from(data.userData) : data.userData
const writer = Writer.create().appendInt16(supportedVersion(data.version))
appendTopicPartitions(writer, data.assignedPartitions)
writer.appendBytes(userData ?? null, false)
return writer.buffer
}

export function decodeConsumerProtocolAssignment (buffer: Buffer): ConsumerProtocolAssignmentData {
if (buffer.length === 0) {
return {
version: 0,
assignedPartitions: [],
userData: Buffer.alloc(0)
}
}

const reader = Reader.from(buffer)
const encodedVersion = reader.readInt16()

if (reader.remaining === 0) {
return {
version: encodedVersion,
assignedPartitions: [],
userData: Buffer.alloc(0)
}
}

const assignedPartitions = readTopicPartitions(reader)
const userData = reader.remaining > 0 ? reader.readBytes(false) : Buffer.alloc(0)

return {
version: encodedVersion,
assignedPartitions,
userData
}
}
Loading
Loading