From fa714c447a620e817efeee969e95a0da4160a68a Mon Sep 17 00:00:00 2001 From: Derek Binnersley Date: Fri, 22 May 2026 00:33:53 -0700 Subject: [PATCH] implementation of KIP-429 Signed-off-by: Derek Binnersley --- docs/consumer.md | 25 +- docs/kips.md | 1 + src/clients/admin/admin.ts | 34 +- src/clients/consumer/consumer-protocol.ts | 174 ++++++++ src/clients/consumer/consumer.ts | 247 +++++++++--- src/clients/consumer/index.ts | 2 + src/clients/consumer/messages-stream.ts | 10 +- src/clients/consumer/partitions-assigners.ts | 372 ++++++++++++++++++ src/clients/consumer/types.ts | 13 + test/clients/admin/admin.test.ts | 3 + .../consumer/consumer-protocol.test.ts | 90 +++++ test/clients/consumer/consumer.test.ts | 283 ++++++++++++- .../messages-stream-rebalance.test.ts | 37 +- .../consumer/partitions-assigners.test.ts | 181 +++++++++ 14 files changed, 1383 insertions(+), 89 deletions(-) create mode 100644 src/clients/consumer/consumer-protocol.ts create mode 100644 test/clients/consumer/consumer-protocol.test.ts create mode 100644 test/clients/consumer/partitions-assigners.test.ts diff --git a/docs/consumer.md b/docs/consumer.md index 5a359028..d1252eaf 100644 --- a/docs/consumer.md +++ b/docs/consumer.md @@ -14,6 +14,7 @@ The complete TypeScript type of the `Consumer` is determined by the `deserialize | `consumer:group:leave` | `ConsumerGroupLeavePayload` | Emitted when leaving a group. | | `consumer:group:rejoin` | (none) | Emitted when re-joining a group after a rebalance. | | `consumer:group:rebalance` | `ConsumerGroupRebalancePayload` | Emitted when group rebalancing occurs. | +| `consumer:group:autocommit:error` | `ConsumerGroupAutocommitErrorPayload` | Emitted when autocommit fails during a cooperative rebalance. The rebalance still proceeds. | | `consumer:heartbeat:start` | `ConsumerHeartbeatPayload?` | Emitted when starting new heartbeats. | | `consumer:heartbeat:cancel` | `ConsumerHeartbeatPayload` | Emitted if a scheduled heartbeat has been canceled. | | `consumer:heartbeat:end` | `ConsumerHeartbeatPayload?` | Emitted during successful heartbeats. | @@ -46,14 +47,32 @@ Options: | heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats.

Not supported for `groupProtocol=consumer`, instead it is set with the broker configuration property `group.consumer.heartbeat.interval`. | | groupProtocol | `'classic' \| 'consumer'` | `'classic'` | Group protocol to use. Use `'classic'` for the original consumer group protocol and `'consumer'` for the new protocol introduced in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol).

The `'consumer'` protocol provides server-side partition assignment and incremental rebalancing behavior. | | groupRemoteAssignor | `string` | `null` | Server-side assignor to use for `groupProtocol=consumer`. Keep it unset to let the server select a suitable assignor for the group. Available assignors: `'uniform'` or `'range'`. | -| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.

Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties.

Not supported for `groupProtocol=consumer`. | -| partitionAssigner | `GroupPartitionsAssigner` | | Client-side partition assignment strategy.

Not supported for `groupProtocol=consumer`, use `groupRemoteAssignor` instead. | +| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.

Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. Use `{ name: 'cooperative-sticky', version: 3 }` to opt in to KIP-429 cooperative rebalancing.

Not supported for `groupProtocol=consumer`. | +| partitionAssigner | `GroupPartitionsAssigner` | | Client-side partition assignment strategy.

Built-in assigners are `roundRobinAssigner` and `cooperativeStickyAssigner`. Not supported for `groupProtocol=consumer`, use `groupRemoteAssignor` instead. | | streamContext | `unknown` | | Default opaque user data for `MessagesStream` instances created by this consumer. It is forwarded to stream-owned `ConnectionPool` and `Connection` instances. Kafka never reads, mutates, or interprets this value. | It also supports all the constructor options of `Base`. The readonly `streamContext` getters expose the same opaque values on the consumer instance. +### Cooperative Sticky Assignment + +Classic consumer groups can opt in to [KIP-429](https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol) cooperative sticky assignment: + +```ts +import { COOPERATIVE_STICKY_ASSIGNOR, Consumer } from '@platformatic/kafka' + +const consumer = new Consumer({ + groupId: 'my-group', + bootstrapBrokers: ['localhost:9092'], + protocols: [{ name: COOPERATIVE_STICKY_ASSIGNOR, version: 3 }] +}) +``` + +The cooperative sticky assignor preserves existing ownership where possible and performs partition transfers over a follow-up rebalance so partitions are revoked before another member starts consuming them. All members of the group should advertise the same assignor during the migration. + +Subscription metadata includes `rackId` when using protocol version 3, but the built-in assignor does not yet use rack information for rack-aware partition placement. + ## Basic Methods ### `isActive` @@ -302,7 +321,7 @@ Options: | sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.

This is only relevant when Kafka creates a new group. | | rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.

This is only relevant when Kafka creates a new group. | | heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. | -| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.

Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. | +| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.

Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. Use `{ name: 'cooperative-sticky', version: 3 }` to opt in to KIP-429 cooperative rebalancing. | ### `leaveGroup([force])` diff --git a/docs/kips.md b/docs/kips.md index 45d69f9e..2f743a70 100644 --- a/docs/kips.md +++ b/docs/kips.md @@ -26,6 +26,7 @@ Other KIPs up to Kafka 4.1.0 are likely supported, or at least have protocol/API | [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API) | Supported | | [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) | Supported | | [KIP-368](https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate) | Supported | +| [KIP-429](https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol) | Supported | | [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430%3A+Return+Authorized+Operations+in+Metadata%2C+Describe+Topics%2C+and+Describe+Groups+Responses) | Supported | | [KIP-455](https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment) | Supported | | [KIP-482](https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields) | Supported | diff --git a/src/clients/admin/admin.ts b/src/clients/admin/admin.ts index 54cd8ca6..d50d4023 100644 --- a/src/clients/admin/admin.ts +++ b/src/clients/admin/admin.ts @@ -97,7 +97,6 @@ import { } from '../../diagnostic.ts' import { MultipleErrors, UserError } from '../../errors.ts' import { type Broker, type Connection } from '../../index.ts' -import { Reader } from '../../protocol/reader.ts' import { Base, kAfterCreate, @@ -113,6 +112,7 @@ import { kValidateOptions } from '../base/base.ts' import { type BaseOptions } from '../base/types.ts' +import { decodeConsumerProtocolAssignment, decodeConsumerProtocolSubscription } from '../consumer/consumer-protocol.ts' import { type GroupAssignment } from '../consumer/types.ts' import { adminListOffsetsOptionsValidator, @@ -1307,32 +1307,26 @@ export class Admin extends Base { } for (const member of raw.members) { - const reader = Reader.from(member.memberMetadata) - let memberMetadata: GroupMember['metadata'] | undefined let memberAssignments: Map | undefined - if (reader.remaining > 0) { + if (member.memberMetadata.length > 0) { + const subscription = decodeConsumerProtocolSubscription(member.memberMetadata) memberMetadata = { - version: reader.readInt16(), - topics: reader.readArray(r => r.readString(false), false, false), - metadata: reader.readBytes(false) + version: subscription.version, + topics: subscription.topics, + metadata: subscription.userData, + ownedPartitions: subscription.ownedPartitions, + generationId: subscription.generationId, + rackId: subscription.rackId } - reader.reset(member.memberAssignment) - reader.skip(2) // Ignore Version information - - memberAssignments = reader.readMap( - r => { - const topic = r.readString(false) - - return [topic, { topic, partitions: reader.readArray(r => r.readInt32(), false, false) }] - }, - false, - false + memberAssignments = new Map( + decodeConsumerProtocolAssignment(member.memberAssignment).assignedPartitions.map(assignment => [ + assignment.topic, + assignment + ]) ) - - // Ignore the user data } group.members.set(member.memberId, { diff --git a/src/clients/consumer/consumer-protocol.ts b/src/clients/consumer/consumer-protocol.ts new file mode 100644 index 00000000..212d6001 --- /dev/null +++ b/src/clients/consumer/consumer-protocol.ts @@ -0,0 +1,174 @@ +import { Reader } from '../../protocol/reader.ts' +import { Writer } from '../../protocol/writer.ts' +import { type GroupAssignment } from './types.ts' + +export const CONSUMER_PROTOCOL_LOWEST_VERSION = 0 +export const CONSUMER_PROTOCOL_HIGHEST_VERSION = 3 + +export interface ConsumerProtocolSubscriptionData { + version: number + topics: string[] + userData: Buffer + ownedPartitions: GroupAssignment[] + generationId: number + rackId: string | null +} + +export interface ConsumerProtocolAssignmentData { + version: number + assignedPartitions: GroupAssignment[] + userData: Buffer +} + +function supportedVersion (version: number): number { + if (version < CONSUMER_PROTOCOL_LOWEST_VERSION) { + return CONSUMER_PROTOCOL_LOWEST_VERSION + } + + if (version > CONSUMER_PROTOCOL_HIGHEST_VERSION) { + return CONSUMER_PROTOCOL_HIGHEST_VERSION + } + + return version +} + +function sortedAssignments (assignments: GroupAssignment[]): GroupAssignment[] { + return assignments + .filter(({ partitions }) => partitions.length > 0) + .map(({ topic, partitions }) => ({ topic, partitions: [...partitions].sort((a, b) => a - b) })) + .sort((a, b) => a.topic.localeCompare(b.topic)) +} + +function appendTopicPartitions (writer: Writer, assignments: GroupAssignment[]): Writer { + return writer.appendArray( + sortedAssignments(assignments), + (w, { topic, partitions }) => { + w.appendString(topic, false).appendArray(partitions, (w, partition) => w.appendInt32(partition), false, false) + }, + false, + false + ) +} + +function readTopicPartitions (reader: Reader): GroupAssignment[] { + return reader.readArray( + r => ({ + topic: r.readString(false), + partitions: r.readArray(r => r.readInt32(), false, false) + }), + false, + false + ) +} + +export function encodeConsumerProtocolSubscription (data: { + version: number + topics: string[] + userData?: Buffer | string + ownedPartitions?: GroupAssignment[] + generationId?: number + rackId?: string | null +}): Buffer { + const version = supportedVersion(data.version) + const userData = typeof data.userData === 'string' ? Buffer.from(data.userData) : data.userData + const writer = Writer.create() + .appendInt16(version) + .appendArray([...data.topics].sort(), (w, topic) => w.appendString(topic, false), false, false) + .appendBytes(userData, false) + + if (version >= 1) { + appendTopicPartitions(writer, data.ownedPartitions ?? []) + } + + if (version >= 2) { + writer.appendInt32(data.generationId ?? -1) + } + + if (version >= 3) { + writer.appendString(data.rackId || null, false) + } + + return writer.buffer +} + +export function decodeConsumerProtocolSubscription (buffer: Buffer): ConsumerProtocolSubscriptionData { + const reader = Reader.from(buffer) + const encodedVersion = reader.readInt16() + const version = supportedVersion(encodedVersion) + const subscription: ConsumerProtocolSubscriptionData = { + version: encodedVersion, + topics: reader.readArray(r => r.readString(false), false, false), + userData: reader.readBytes(false), + ownedPartitions: [], + generationId: -1, + rackId: null + } + + if (version >= 1) { + subscription.ownedPartitions = readTopicPartitions(reader) + } + + if (version >= 2) { + subscription.generationId = reader.readInt32() + } + + if (version >= 3) { + subscription.rackId = reader.readNullableString(false) + } + + return subscription +} + +export function decodeCooperativeStickyGeneration (userData: Buffer): number { + if (userData.length < 4) { + return -1 + } + + try { + return Reader.from(userData).readInt32() + } catch { + return -1 + } +} + +export function encodeConsumerProtocolAssignment (data: { + version: number + assignedPartitions: GroupAssignment[] + userData?: Buffer | string | null +}): Buffer { + const userData = typeof data.userData === 'string' ? Buffer.from(data.userData) : data.userData + const writer = Writer.create().appendInt16(supportedVersion(data.version)) + appendTopicPartitions(writer, data.assignedPartitions) + writer.appendBytes(userData ?? null, false) + return writer.buffer +} + +export function decodeConsumerProtocolAssignment (buffer: Buffer): ConsumerProtocolAssignmentData { + if (buffer.length === 0) { + return { + version: 0, + assignedPartitions: [], + userData: Buffer.alloc(0) + } + } + + const reader = Reader.from(buffer) + const encodedVersion = reader.readInt16() + + if (reader.remaining === 0) { + return { + version: encodedVersion, + assignedPartitions: [], + userData: Buffer.alloc(0) + } + } + + const assignedPartitions = readTopicPartitions(reader) + const userData = reader.remaining > 0 ? reader.readBytes(false) : Buffer.alloc(0) + + return { + version: encodedVersion, + assignedPartitions, + userData + } +} diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts index 154db1b1..b53a07e1 100644 --- a/src/clients/consumer/consumer.ts +++ b/src/clients/consumer/consumer.ts @@ -53,8 +53,6 @@ import { import { type GenericError, type ProtocolError, protocolErrors, UserError } from '../../errors.ts' import { type ConnectionPool } from '../../network/connection-pool.ts' import { type Connection } from '../../network/connection.ts' -import { INT32_SIZE } from '../../protocol/definitions.ts' -import { Reader } from '../../protocol/reader.ts' import { IS_CONTROL } from '../../protocol/records.ts' import { Writer } from '../../protocol/writer.ts' import { @@ -85,6 +83,14 @@ import { import { type ClusterMetadata } from '../base/types.ts' import { ensureMetric, type Gauge, type Histogram } from '../metrics.ts' import { MessagesStream } from './messages-stream.ts' +import { + CONSUMER_PROTOCOL_HIGHEST_VERSION, + decodeCooperativeStickyGeneration, + decodeConsumerProtocolAssignment, + decodeConsumerProtocolSubscription, + encodeConsumerProtocolAssignment, + encodeConsumerProtocolSubscription +} from './consumer-protocol.ts' import { commitOptionsValidator, consumeOptionsValidator, @@ -97,7 +103,11 @@ import { listCommitsOptionsValidator, listOffsetsOptionsValidator } from './options.ts' -import { roundRobinAssigner } from './partitions-assigners.ts' +import { + COOPERATIVE_STICKY_ASSIGNOR, + cooperativeStickyAssigner, + roundRobinAssigner +} from './partitions-assigners.ts' import { TopicsMap } from './topics-map.ts' import { type CommitOptions, @@ -105,6 +115,7 @@ import { type ConsumerGroupJoinPayload, type ConsumerGroupLeavePayload, type ConsumerGroupOptions, + type ConsumerGroupAutocommitErrorPayload, type ConsumerGroupRebalancePayload, type ConsumerHeartbeatErrorPayload, type ConsumerHeartbeatPayload, @@ -134,6 +145,7 @@ export interface ConsumerEvents extends BaseEvents { 'consumer:group:leave': (payload: ConsumerGroupLeavePayload) => void 'consumer:group:rejoin': () => void 'consumer:group:rebalance': (payload: ConsumerGroupRebalancePayload) => void + 'consumer:group:autocommit:error': (payload: ConsumerGroupAutocommitErrorPayload) => void 'consumer:heartbeat:start': (payload?: ConsumerHeartbeatPayload) => void 'consumer:heartbeat:cancel': (payload: ConsumerHeartbeatPayload) => void 'consumer:heartbeat:end': (payload?: ConsumerHeartbeatPayload) => void @@ -1448,6 +1460,24 @@ export class Consumer assignment.topic === a.topic) + if (!b) { + result.push({ topic: a.topic, partitions: [...a.partitions] }) + } else { + const diff = a.partitions.filter(partition => !b.partitions.includes(partition)) + if (diff.length > 0) { + result.push({ topic: a.topic, partitions: diff }) + } + } + } + + return result + } + #revokePartitions (newAssignment: TopicPartition[]): void { const toRevoke = this.#diffAssignments(this.#assignments, newAssignment) if (toRevoke.length === 0) { @@ -1772,28 +1802,92 @@ export class Consumer { - this.#heartbeat(options) - }, options.heartbeatInterval) - - this.emitWithDebug('consumer', 'group:join', { - groupId: this.groupId, - memberId: this.memberId, - generationId: this.generationId, - isLeader: this.#isLeader, - assignments: this.assignments - }) - - callback(null, this.memberId!) + this.#completeClassicSyncGroup(options, response!, callback) }) } ) } + #completeClassicSyncGroup ( + options: Required, + assignments: GroupAssignment[], + callback: CallbackWithPromise + ): void { + if (this.#isCooperativeProtocol()) { + const previousAssignments = this.assignments ?? [] + const revoked = this.#diffGroupAssignments(previousAssignments, assignments) + const addedOnRevokeRound = this.#diffGroupAssignments(assignments, previousAssignments) + + if (revoked.length > 0) { + for (const stream of this.#streams) { + stream.pause() + } + + runConcurrentCallbacks( + 'Autocommit before cooperative rebalance failed.', + this.#streams, + (stream, concurrentCallback) => { + stream[kAutocommit](concurrentCallback) + }, + autocommitError => { + // Mirror Kafka's reference client: capture autocommit errors but + // continue the rebalance so the group does not get stuck mid-cycle. + // Per-stream autocommit errors are already surfaced through the + // `autocommit` event on each MessagesStream. + if (autocommitError) { + this.emitWithDebug('consumer', 'group:autocommit:error', { + groupId: this.groupId, + error: autocommitError + }) + } + + this.assignments = assignments + this.#syncPreferredReadReplicas() + + for (const stream of this.#streams) { + stream.resume() + } + + if (addedOnRevokeRound.length > 0) { + for (const stream of this.#streams) { + stream[kRefreshOffsetsAndFetch]() + } + } + + this.emitWithDebug('consumer', 'group:rebalance', { groupId: this.groupId }) + this.#performJoinGroup(options, callback) + } + ) + return + } + } + + const added = this.#diffGroupAssignments(assignments, this.assignments ?? []) + this.assignments = assignments + this.#syncPreferredReadReplicas() + + this.#cancelHeartbeat() + this.#heartbeatInterval = setTimeout(() => { + this.#heartbeat(options) + }, options.heartbeatInterval) + + this.emitWithDebug('consumer', 'group:join', { + groupId: this.groupId, + memberId: this.memberId, + generationId: this.generationId, + isLeader: this.#isLeader, + assignments: this.assignments + }) + + if (this.#isCooperativeProtocol() && added.length > 0) { + for (const stream of this.#streams) { + stream[kRefreshOffsetsAndFetch]() + } + } + + callback(null, this.memberId!) + } + #performLeaveGroup (force: boolean, callback: CallbackWithPromise): void { if (!this.memberId) { callback(null) @@ -2023,21 +2117,42 @@ export class Consumer w.appendString(t, false), false, false) - .appendBytes(typeof metadata.metadata === 'string' ? Buffer.from(metadata.metadata) : metadata.metadata, false) - .buffer + const cooperative = metadata.name === COOPERATIVE_STICKY_ASSIGNOR + const ownedPartitions = cooperative ? this.#ownedPartitionsForSubscription(topics) : [] + let userData = typeof metadata.metadata === 'string' ? Buffer.from(metadata.metadata) : metadata.metadata + + if (cooperative && !userData && metadata.version < 2) { + userData = Writer.create() + .appendInt32(this.memberId && ownedPartitions.length > 0 ? this.generationId : -1) + .buffer + } + + return encodeConsumerProtocolSubscription({ + version: metadata.version, + topics, + userData, + ownedPartitions, + generationId: cooperative && this.memberId && ownedPartitions.length > 0 ? this.generationId : -1, + rackId: cooperative ? this.#clientRack || null : null + }) } #decodeProtocolSubscriptionMetadata (memberId: string, buffer: Buffer): ExtendedGroupProtocolSubscription { - const reader = Reader.from(buffer) + const subscription = decodeConsumerProtocolSubscription(buffer) + const cooperative = this.#protocol === COOPERATIVE_STICKY_ASSIGNOR + const generationId = + cooperative && subscription.version < 2 + ? decodeCooperativeStickyGeneration(subscription.userData) + : subscription.generationId return { memberId, - version: reader.readInt16(), - topics: reader.readArray(r => r.readString(false), false, false), - metadata: reader.readBytes(false) + version: subscription.version, + topics: subscription.topics, + metadata: subscription.userData, + ownedPartitions: subscription.ownedPartitions, + generationId, + rackId: subscription.rackId } } @@ -2046,42 +2161,15 @@ export class Consumer { - w.appendString(topic, false).appendArray(partitions, (w, a) => w.appendInt32(a), false, false) - }, - false, - false - ) - - writer.appendBytes(userData ?? null, false) - - return writer.buffer + return encodeConsumerProtocolAssignment({ + version: this.#isCooperativeProtocol() ? CONSUMER_PROTOCOL_HIGHEST_VERSION : 0, + assignedPartitions: assignments, + userData: this[kOptions].assignmentUserData ?? null + }) } #decodeProtocolAssignment (buffer: Buffer): GroupAssignment[] { - const reader = Reader.from(buffer) - - reader.skip(2) // Ignore Version information - - if (reader.remaining < INT32_SIZE) { - return [] - } - - return reader.readArray( - r => { - return { - topic: r.readString(false), - partitions: r.readArray(r => r.readInt32(), false, false) - } - }, - false, - false - ) + return decodeConsumerProtocolAssignment(buffer).assignedPartitions } #createAssignments ( @@ -2115,7 +2203,7 @@ export class Consumer currentTopics.has(assignment.topic)) + .map(assignment => ({ topic: assignment.topic, partitions: [...assignment.partitions] })) + } + #filterUncommittedMessages (response: FetchResponse) { for (const topic of response.responses) { for (const partition of topic.partitions) { @@ -2174,6 +2290,11 @@ export class Consumer extends Readable return callback[kCallbackPromise]! } - [kAutocommit] (): void { + [kAutocommit] (callback?: Callback): void { + if (this.#autocommitInflight) { + callback && this.once('autocommit', error => callback(error)) + return + } + if (this.#offsetsToCommit.size === 0) { + callback?.(null) return } @@ -926,6 +932,7 @@ export class MessagesStream extends Readable if ((error as GenericError).findBy?.('needsRejoin', true)) { this.destroy(error) } + callback?.(error) return } @@ -934,6 +941,7 @@ export class MessagesStream extends Readable } this.emit('autocommit', null, offsets) + callback?.(null) }) } diff --git a/src/clients/consumer/partitions-assigners.ts b/src/clients/consumer/partitions-assigners.ts index 42b43bf2..0f25ad56 100644 --- a/src/clients/consumer/partitions-assigners.ts +++ b/src/clients/consumer/partitions-assigners.ts @@ -1,6 +1,14 @@ import { type ClusterMetadata } from '../base/types.ts' import { type ExtendedGroupProtocolSubscription, type GroupPartitionsAssignments } from './types.ts' +export const ROUNDROBIN_ASSIGNOR = 'roundrobin' +export const COOPERATIVE_STICKY_ASSIGNOR = 'cooperative-sticky' + +interface TopicPartition { + topic: string + partition: number +} + export function roundRobinAssigner ( _current: string, members: Map, @@ -35,3 +43,367 @@ export function roundRobinAssigner ( return assignments } + +export function cooperativeStickyAssigner ( + _current: string, + members: Map, + topics: Set, + metadata: ClusterMetadata +): GroupPartitionsAssignments[] { + const assignments = createEmptyAssignments(members) + const allPartitions = listAssignablePartitions(members, topics, metadata) + const { owners: previousOwners, contested: contestedPreviousOwners } = previousOwnership(members, allPartitions) + + for (const partition of allPartitions) { + const owner = previousOwners.get(topicPartitionKey(partition)) + if (owner) { + addAssignment(assignments, owner, partition) + } + } + + assignUnassignedPartitions(assignments, members, allPartitions) + balanceAssignments(assignments, members) + adjustCooperativeTransfers(assignments, members, previousOwners, contestedPreviousOwners) + + return Array.from(assignments, ([memberId, memberAssignments]) => ({ + memberId, + assignments: memberAssignments + })) +} + +function createEmptyAssignments ( + members: Map +): Map> { + const assignments = new Map>() + + for (const memberId of members.keys()) { + assignments.set(memberId, new Map()) + } + + return assignments +} + +function listAssignablePartitions ( + members: Map, + topics: Set, + metadata: ClusterMetadata +): TopicPartition[] { + const result: TopicPartition[] = [] + + for (const topic of [...topics].sort()) { + const topicMetadata = metadata.topics.get(topic) + if (!topicMetadata || !hasEligibleMember(members, topic)) { + continue + } + + for (let partition = 0; partition < topicMetadata.partitionsCount; partition++) { + result.push({ topic, partition }) + } + } + + return result +} + +function hasEligibleMember (members: Map, topic: string): boolean { + for (const member of members.values()) { + if (member.topics?.includes(topic)) { + return true + } + } + + return false +} + +function previousOwnership ( + members: Map, + allPartitions: TopicPartition[] +): { owners: Map; contested: Set } { + const validPartitions = new Set(allPartitions.map(topicPartitionKey)) + const maxGeneration = Math.max(-1, ...Array.from(members.values(), member => member.generationId ?? -1)) + const claims = new Map() + + for (const [memberId, member] of members) { + const generationId = member.generationId ?? -1 + if (maxGeneration >= 0 && generationId < maxGeneration) { + continue + } + + for (const owned of member.ownedPartitions ?? []) { + if (!member.topics?.includes(owned.topic)) { + continue + } + + for (const partition of owned.partitions) { + const topicPartition = { topic: owned.topic, partition } + const key = topicPartitionKey(topicPartition) + if (!validPartitions.has(key)) { + continue + } + + let partitionClaims = claims.get(key) + if (!partitionClaims) { + partitionClaims = [] + claims.set(key, partitionClaims) + } + partitionClaims.push({ memberId, generationId }) + } + } + } + + const owners = new Map() + const contested = new Set() + for (const [key, partitionClaims] of claims) { + const highestGeneration = Math.max(...partitionClaims.map(claim => claim.generationId)) + const highestGenerationClaims = partitionClaims.filter(claim => claim.generationId === highestGeneration) + + // If two members claim the same partition in the same generation, neither + // claim can be trusted. This mirrors the Kafka assignor's safety check. + if (highestGenerationClaims.length === 1) { + owners.set(key, highestGenerationClaims[0].memberId) + } else { + contested.add(key) + } + } + + return { owners, contested } +} + +function assignUnassignedPartitions ( + assignments: Map>, + members: Map, + allPartitions: TopicPartition[] +): void { + const assigned = assignedPartitionKeys(assignments) + + for (const partition of allPartitions) { + if (assigned.has(topicPartitionKey(partition))) { + continue + } + + const memberId = leastLoadedEligibleMember(assignments, members, partition) + if (memberId) { + addAssignment(assignments, memberId, partition) + assigned.add(topicPartitionKey(partition)) + } + } +} + +function balanceAssignments ( + assignments: Map>, + members: Map +): void { + const totalPartitions = assignedPartitionKeys(assignments).size + if (totalPartitions === 0 || members.size === 0) { + return + } + + const minQuota = Math.floor(totalPartitions / members.size) + const maxQuota = Math.ceil(totalPartitions / members.size) + + while (true) { + const underloadedMembers = sortedMemberIds(assignments).filter( + memberId => assignmentSize(assignments.get(memberId)!) < minQuota + ) + const overloadedMembers = sortedMemberIds(assignments) + .filter(memberId => assignmentSize(assignments.get(memberId)!) > maxQuota) + .sort((left, right) => assignmentSize(assignments.get(right)!) - assignmentSize(assignments.get(left)!)) + + if (underloadedMembers.length === 0 || overloadedMembers.length === 0) { + return + } + + let moved = false + for (const underloaded of underloadedMembers) { + for (const overloaded of overloadedMembers) { + const partition = removablePartition(assignments.get(overloaded)!, members.get(underloaded)!) + if (!partition) { + continue + } + + removeAssignment(assignments, overloaded, partition) + addAssignment(assignments, underloaded, partition) + moved = true + break + } + + if (moved) { + break + } + } + + if (!moved) { + return + } + } +} + +function adjustCooperativeTransfers ( + assignments: Map>, + members: Map, + previousOwners: Map, + contestedPreviousOwners: Set +): void { + const added = new Map() + const revoked = new Set() + + for (const [memberId, memberAssignments] of assignments) { + const trustedOwned = new Set( + flattenAssignments(members.get(memberId)?.ownedPartitions ?? []) + .map(topicPartitionKey) + .filter(key => previousOwners.get(key) === memberId) + ) + const claimedOwned = new Set( + flattenAssignments(members.get(memberId)?.ownedPartitions ?? []) + .map(topicPartitionKey) + .filter(key => previousOwners.get(key) === memberId || contestedPreviousOwners.has(key)) + ) + const assigned = new Set(flattenAssignments(Array.from(memberAssignments.values())).map(topicPartitionKey)) + + for (const key of assigned) { + if (!trustedOwned.has(key)) { + added.set(key, memberId) + } + } + + for (const key of claimedOwned) { + if (!assigned.has(key)) { + revoked.add(key) + } + } + } + + for (const [key, memberId] of added) { + if (revoked.has(key)) { + removeAssignment(assignments, memberId, parseTopicPartitionKey(key)) + } + } +} + +function leastLoadedEligibleMember ( + assignments: Map>, + members: Map, + partition: TopicPartition +): string | null { + let selected: string | null = null + let selectedSize = Number.POSITIVE_INFINITY + + for (const memberId of sortedMemberIds(assignments)) { + const member = members.get(memberId)! + if (!member.topics?.includes(partition.topic)) { + continue + } + + const size = assignmentSize(assignments.get(memberId)!) + if (size < selectedSize) { + selected = memberId + selectedSize = size + } + } + + return selected +} + +function removablePartition ( + assignments: Map, + targetMember: ExtendedGroupProtocolSubscription +): TopicPartition | null { + const partitions = flattenAssignments(Array.from(assignments.values())).sort(compareTopicPartitions) + + for (const partition of partitions) { + if (targetMember.topics?.includes(partition.topic)) { + return partition + } + } + + return null +} + +function addAssignment ( + assignments: Map>, + memberId: string, + partition: TopicPartition +): void { + const memberAssignments = assignments.get(memberId)! + let topicAssignment = memberAssignments.get(partition.topic) + if (!topicAssignment) { + topicAssignment = { topic: partition.topic, partitions: [] } + memberAssignments.set(partition.topic, topicAssignment) + } + + if (!topicAssignment.partitions.includes(partition.partition)) { + topicAssignment.partitions.push(partition.partition) + topicAssignment.partitions.sort((a, b) => a - b) + } +} + +function removeAssignment ( + assignments: Map>, + memberId: string, + partition: TopicPartition +): void { + const memberAssignments = assignments.get(memberId) + const topicAssignment = memberAssignments?.get(partition.topic) + if (!topicAssignment) { + return + } + + topicAssignment.partitions = topicAssignment.partitions.filter(current => current !== partition.partition) + if (topicAssignment.partitions.length === 0) { + memberAssignments!.delete(partition.topic) + } +} + +function assignedPartitionKeys ( + assignments: Map> +): Set { + const assigned = new Set() + + for (const memberAssignments of assignments.values()) { + for (const partition of flattenAssignments(Array.from(memberAssignments.values()))) { + assigned.add(topicPartitionKey(partition)) + } + } + + return assigned +} + +function flattenAssignments (assignments: { topic: string; partitions: number[] }[]): TopicPartition[] { + const partitions: TopicPartition[] = [] + + for (const assignment of assignments) { + for (const partition of assignment.partitions) { + partitions.push({ topic: assignment.topic, partition }) + } + } + + return partitions +} + +function assignmentSize (assignments: Map): number { + let size = 0 + for (const assignment of assignments.values()) { + size += assignment.partitions.length + } + return size +} + +function sortedMemberIds (assignments: Map>): string[] { + return [...assignments.keys()].sort() +} + +function topicPartitionKey ({ topic, partition }: TopicPartition): string { + return `${topic}:${partition}` +} + +function parseTopicPartitionKey (key: string): TopicPartition { + const index = key.lastIndexOf(':') + return { + topic: key.slice(0, index), + partition: Number(key.slice(index + 1)) + } +} + +function compareTopicPartitions (left: TopicPartition, right: TopicPartition): number { + const topicComparison = left.topic.localeCompare(right.topic) + return topicComparison === 0 ? left.partition - right.partition : topicComparison +} diff --git a/src/clients/consumer/types.ts b/src/clients/consumer/types.ts index fe7b256a..fd2cdc1a 100644 --- a/src/clients/consumer/types.ts +++ b/src/clients/consumer/types.ts @@ -12,6 +12,11 @@ export interface GroupProtocolSubscription { metadata?: Buffer | string } +export interface TopicPartitionAssignment { + topic: string + partition: number +} + export interface GroupAssignment { topic: string partitions: number[] @@ -24,6 +29,9 @@ export interface GroupPartitionsAssignments { export interface ExtendedGroupProtocolSubscription extends Omit { topics?: string[] + ownedPartitions?: GroupAssignment[] + generationId?: number + rackId?: string | null // This is only used in responses memberId: string } @@ -178,6 +186,11 @@ export interface ConsumerGroupRebalancePayload { groupId: string } +export interface ConsumerGroupAutocommitErrorPayload { + groupId: string + error: Error +} + export interface ConsumerHeartbeatPayload { groupId?: string memberId?: string | null diff --git a/test/clients/admin/admin.test.ts b/test/clients/admin/admin.test.ts index e01c6609..932a5125 100644 --- a/test/clients/admin/admin.test.ts +++ b/test/clients/admin/admin.test.ts @@ -1665,7 +1665,10 @@ test('describeGroups should describe consumer groups and support diagnostic chan groupInstanceId: null, id, metadata: { + generationId: -1, metadata: EMPTY_BUFFER, + ownedPartitions: [], + rackId: null, topics: [testTopic], version: 1 } diff --git a/test/clients/consumer/consumer-protocol.test.ts b/test/clients/consumer/consumer-protocol.test.ts new file mode 100644 index 00000000..39f50165 --- /dev/null +++ b/test/clients/consumer/consumer-protocol.test.ts @@ -0,0 +1,90 @@ +import { deepStrictEqual, strictEqual } from 'node:assert' +import { test } from 'node:test' +import { + decodeCooperativeStickyGeneration, + decodeConsumerProtocolAssignment, + decodeConsumerProtocolSubscription, + encodeConsumerProtocolAssignment, + encodeConsumerProtocolSubscription +} from '../../../src/index.ts' + +test('consumer protocol subscription should encode and decode v3 fields', () => { + const encoded = encodeConsumerProtocolSubscription({ + version: 3, + topics: ['topic-b', 'topic-a'], + userData: Buffer.from('metadata'), + ownedPartitions: [ + { topic: 'topic-a', partitions: [2, 0] }, + { topic: 'topic-b', partitions: [1] } + ], + generationId: 42, + rackId: 'rack-a' + }) + + deepStrictEqual(decodeConsumerProtocolSubscription(encoded), { + version: 3, + topics: ['topic-a', 'topic-b'], + userData: Buffer.from('metadata'), + ownedPartitions: [ + { topic: 'topic-a', partitions: [0, 2] }, + { topic: 'topic-b', partitions: [1] } + ], + generationId: 42, + rackId: 'rack-a' + }) +}) + +test('consumer protocol subscription should preserve v0 compatibility', () => { + const encoded = encodeConsumerProtocolSubscription({ + version: 0, + topics: ['topic-a'], + userData: 'metadata', + ownedPartitions: [{ topic: 'topic-a', partitions: [0] }], + generationId: 42, + rackId: 'rack-a' + }) + const decoded = decodeConsumerProtocolSubscription(encoded) + + strictEqual(decoded.version, 0) + deepStrictEqual(decoded.topics, ['topic-a']) + deepStrictEqual(decoded.userData, Buffer.from('metadata')) + deepStrictEqual(decoded.ownedPartitions, []) + strictEqual(decoded.generationId, -1) + strictEqual(decoded.rackId, null) +}) + +test('consumer protocol assignment should encode and decode assigned partitions and user data', () => { + const encoded = encodeConsumerProtocolAssignment({ + version: 3, + assignedPartitions: [ + { topic: 'topic-b', partitions: [1] }, + { topic: 'topic-a', partitions: [2, 0] } + ], + userData: Buffer.from('assignment-metadata') + }) + + deepStrictEqual(decodeConsumerProtocolAssignment(encoded), { + version: 3, + assignedPartitions: [ + { topic: 'topic-a', partitions: [0, 2] }, + { topic: 'topic-b', partitions: [1] } + ], + userData: Buffer.from('assignment-metadata') + }) +}) + +test('consumer protocol assignment should decode empty assignment buffers', () => { + deepStrictEqual(decodeConsumerProtocolAssignment(Buffer.alloc(0)), { + version: 0, + assignedPartitions: [], + userData: Buffer.alloc(0) + }) +}) + +test('cooperative sticky generation should decode from v1 user data', () => { + const userData = Buffer.alloc(4) + userData.writeInt32BE(42) + + strictEqual(decodeCooperativeStickyGeneration(userData), 42) + strictEqual(decodeCooperativeStickyGeneration(Buffer.alloc(0)), -1) +}) diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index 1a900a16..42160fee 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -20,6 +20,7 @@ import { type ClusterMetadata, CompressionAlgorithms, ConfluentSchemaRegistry, + COOPERATIVE_STICKY_ASSIGNOR, type Connection, Consumer, consumerCommitsChannel, @@ -76,7 +77,8 @@ import { mockedOperationId, mockMetadata, mockMethod, - mockUnavailableAPI + mockUnavailableAPI, + waitFor } from '../../helpers.ts' import { kGetFetchNode } from '../../../src/symbols.ts' @@ -3679,6 +3681,285 @@ test('joinGroup should setup assignment with a round robin policy', async t => { } }) +test('joinGroup should setup assignment with a cooperative sticky policy', async t => { + const topic = await createTopic(t, true, 4) + const groupId = createGroupId() + const protocols = [{ name: COOPERATIVE_STICKY_ASSIGNOR, version: 3 }] + + const consumer1 = createConsumer(t, { groupId, protocols }) + const consumer2 = createConsumer(t, { groupId, protocols }) + + await consumer1.topics.trackAll(topic) + await consumer2.topics.trackAll(topic) + + await consumer1.joinGroup() + deepStrictEqual(consumer1.assignments, [{ topic, partitions: [0, 1, 2, 3] }]) + + const rejoinPromise = once(consumer1, 'consumer:group:join') + await consumer2.joinGroup() + await rejoinPromise + + await waitFor(() => { + const assignments1 = consumer1.assignments?.[0]?.partitions ?? [] + const assignments2 = consumer2.assignments?.[0]?.partitions ?? [] + deepStrictEqual([...assignments1, ...assignments2].sort((a, b) => a - b), [0, 1, 2, 3]) + strictEqual(assignments1.length, 2) + strictEqual(assignments2.length, 2) + }, { timeout: 10_000 }) +}) + +test('cooperative sticky rebalance should keep consuming after a second member joins', async t => { + const topic = await createTopic(t, true, 4) + const groupId = createGroupId() + const protocols = [{ name: COOPERATIVE_STICKY_ASSIGNOR, version: 3 }] + + await produceTestMessages({ + t, + messages: Array.from({ length: 8 }, (_, i) => ({ + topic, + key: `key-${i}`, + value: `value-${i}`, + partition: i % 4 + })) + }) + + const consumer1 = createConsumer(t, { groupId, protocols }) + const consumer2 = createConsumer(t, { groupId, protocols }) + + await consumer1.topics.trackAll(topic) + await consumer2.topics.trackAll(topic) + await consumer1.joinGroup() + + const stream1 = await consumer1.consume({ + topics: [topic], + mode: MessagesStreamModes.EARLIEST, + autocommit: true, + maxWaitTime: 2000 + }) + + const receivedByMember = new Map>([ + ['consumer-1', new Set()], + ['consumer-2', new Set()] + ]) + + stream1.on('data', message => { + receivedByMember.get('consumer-1')!.add(`${message.partition}:${message.offset}`) + }) + + await waitFor(() => strictEqual(receivedByMember.get('consumer-1')!.size >= 4, true), { timeout: 15_000 }) + + const rejoinPromise = once(consumer1, 'consumer:group:join') + await consumer2.joinGroup() + const stream2 = await consumer2.consume({ + topics: [topic], + mode: MessagesStreamModes.EARLIEST, + autocommit: true, + maxWaitTime: 2000 + }) + stream2.on('data', message => { + receivedByMember.get('consumer-2')!.add(`${message.partition}:${message.offset}`) + }) + + await rejoinPromise + + await waitFor(() => { + const assignments1 = consumer1.assignments?.[0]?.partitions ?? [] + const assignments2 = consumer2.assignments?.[0]?.partitions ?? [] + strictEqual(assignments1.length, 2) + strictEqual(assignments2.length, 2) + deepStrictEqual([...assignments1, ...assignments2].sort((a, b) => a - b), [0, 1, 2, 3]) + }, { timeout: 15_000 }) + + await waitFor(() => { + const total = + receivedByMember.get('consumer-1')!.size + receivedByMember.get('consumer-2')!.size + strictEqual(total >= 8, true) + }, { timeout: 20_000 }) + + await stream1.close() + await stream2.close() +}) + +test('cooperative sticky rebalance should autocommit revoked partition offsets before rejoin', async t => { + const topic = await createTopic(t, true, 4) + const groupId = createGroupId() + const protocols = [{ name: COOPERATIVE_STICKY_ASSIGNOR, version: 3 }] + + await produceTestMessages({ + t, + messages: Array.from({ length: 4 }, (_, i) => ({ + topic, + key: `key-${i}`, + value: `value-${i}`, + partition: i + })) + }) + + const consumer1 = createConsumer(t, { groupId, protocols }) + await consumer1.topics.trackAll(topic) + await consumer1.joinGroup() + + const stream1 = await consumer1.consume({ + topics: [topic], + mode: MessagesStreamModes.EARLIEST, + autocommit: true, + maxWaitTime: 2000 + }) + + let consumed = 0 + const consumedAll = new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('Timed out waiting for messages')), 15_000) + stream1.on('data', () => { + consumed++ + if (consumed >= 4) { + clearTimeout(timer) + resolve() + } + }) + }) + await consumedAll + strictEqual(consumed, 4) + + const consumer2 = createConsumer(t, { groupId, protocols }) + await consumer2.topics.trackAll(topic) + + const rejoinPromise = once(consumer1, 'consumer:group:join') + await consumer2.joinGroup() + await rejoinPromise + + await waitFor(() => { + strictEqual(consumer1.assignments?.[0]?.partitions.length, 2) + strictEqual(consumer2.assignments?.[0]?.partitions.length, 2) + }, { timeout: 15_000 }) + + await stream1.close() + + const commits = await consumer1.listCommittedOffsets({ + topics: [{ topic, partitions: [0, 1, 2, 3] }] + }) + const topicCommits = commits.get(topic)! + + strictEqual(topicCommits[0], 1n) + strictEqual(topicCommits[1], 1n) + strictEqual(topicCommits[2], 1n) + strictEqual(topicCommits[3], 1n) + + await consumer2.leaveGroup() +}) + +test('cooperative sticky rebalance should continue when autocommit fails', async t => { + const topic = await createTopic(t, true, 4) + const groupId = createGroupId() + const protocols = [{ name: COOPERATIVE_STICKY_ASSIGNOR, version: 3 }] + + await produceTestMessages({ + t, + messages: Array.from({ length: 4 }, (_, i) => ({ + topic, + key: `key-${i}`, + value: `value-${i}`, + partition: i + })) + }) + + const consumer1 = createConsumer(t, { groupId, protocols }) + await consumer1.topics.trackAll(topic) + await consumer1.joinGroup() + + const stream1 = await consumer1.consume({ + topics: [topic], + mode: MessagesStreamModes.EARLIEST, + autocommit: true, + maxWaitTime: 2000 + }) + + let consumed = 0 + await new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('Timed out waiting for messages')), 15_000) + stream1.on('data', () => { + consumed++ + if (consumed >= 4) { + clearTimeout(timer) + resolve() + } + }) + }) + + // Force the very next commit (the one issued during the cooperative revoke) to fail. + const originalCommit = consumer1.commit.bind(consumer1) + let commitFailures = 0 + t.mock.method(consumer1, 'commit', (options: any, callback?: any) => { + if (commitFailures === 0) { + commitFailures++ + const error = new UserError('Simulated autocommit failure during rebalance.') + if (typeof callback === 'function') { + callback(error) + return + } + return Promise.reject(error) + } + return originalCommit(options, callback) + }) + + const autocommitErrorPromise = once(consumer1, 'consumer:group:autocommit:error') + const consumer2 = createConsumer(t, { groupId, protocols }) + await consumer2.topics.trackAll(topic) + + const rejoinPromise = once(consumer1, 'consumer:group:join') + await consumer2.joinGroup() + await rejoinPromise + + // The rebalance must have continued despite the autocommit failure. + await waitFor(() => { + strictEqual(consumer1.assignments?.[0]?.partitions.length, 2) + strictEqual(consumer2.assignments?.[0]?.partitions.length, 2) + }, { timeout: 15_000 }) + + const [{ groupId: payloadGroupId, error: autocommitError }] = + (await autocommitErrorPromise) as [{ groupId: string; error: Error }] + strictEqual(payloadGroupId, groupId) + strictEqual(autocommitError instanceof Error, true) + strictEqual(commitFailures, 1) + + await stream1.close() + await consumer2.leaveGroup() +}) + +test('cooperative sticky rebalance should redistribute partitions when a member leaves', async t => { + const topic = await createTopic(t, true, 6) + const groupId = createGroupId() + const protocols = [{ name: COOPERATIVE_STICKY_ASSIGNOR, version: 3 }] + + const consumer1 = createConsumer(t, { groupId, protocols }) + const consumer2 = createConsumer(t, { groupId, protocols }) + const consumer3 = createConsumer(t, { groupId, protocols }) + + await consumer1.topics.trackAll(topic) + await consumer2.topics.trackAll(topic) + await consumer3.topics.trackAll(topic) + + await consumer1.joinGroup() + await consumer2.joinGroup() + await consumer3.joinGroup() + + await waitFor(() => { + const total = + (consumer1.assignments?.[0]?.partitions.length ?? 0) + + (consumer2.assignments?.[0]?.partitions.length ?? 0) + + (consumer3.assignments?.[0]?.partitions.length ?? 0) + strictEqual(total, 6) + }, { timeout: 15_000 }) + + const rejoinPromise = once(consumer1, 'consumer:group:join') + await consumer3.leaveGroup() + await rejoinPromise + + await waitFor(() => { + strictEqual(consumer1.assignments?.[0]?.partitions.length, 3) + strictEqual(consumer2.assignments?.[0]?.partitions.length, 3) + }, { timeout: 15_000 }) +}) + test('joinGroup should setup assignment with a custom policy', async t => { const topic = await createTopic(t, true, 3) const groupId = createGroupId() diff --git a/test/clients/consumer/messages-stream-rebalance.test.ts b/test/clients/consumer/messages-stream-rebalance.test.ts index 6dd5cb8d..ff66aada 100644 --- a/test/clients/consumer/messages-stream-rebalance.test.ts +++ b/test/clients/consumer/messages-stream-rebalance.test.ts @@ -5,7 +5,7 @@ import type { CallbackWithPromise } from '../../../src/apis/callbacks.ts' import type { FetchRequestTopic, FetchResponse } from '../../../src/apis/consumer/fetch-v17.ts' import { kConnections, kCreateConnectionPool, kGetApi, kOptions, kPrometheus } from '../../../src/clients/base/base.ts' import { type Consumer, MessagesStream, MessagesStreamFallbackModes, MessagesStreamModes } from '../../../src/index.ts' -import { kGetFetchNode } from '../../../src/symbols.ts' +import { kAutocommit, kGetFetchNode } from '../../../src/symbols.ts' import { createConsumer, mockConnectionPoolGet, mockMetadata, mockMethod } from '../../helpers.ts' const topic = 'test-topic' @@ -258,6 +258,41 @@ test('should not fetch while offsets are refreshing after a group rejoin', async stream.destroy() }) +test('autocommit callback should wait for commit completion', async t => { + const consumer = createConsumerMock(t, (_options, _callback) => {}) + const stream = createStream(consumer) + let commitFinished = false + let callbackCalled = false + + t.mock.method(consumer, 'commit', (_options: unknown, callback: CallbackWithPromise) => { + setTimeout(() => { + commitFinished = true + callback(null) + }, 20) + }) + + stream.offsetsToCommit.set(`${topic}:0`, { topic, partition: 0, offset: 10n, leaderEpoch: 0 }) + + const autocommitFinished = new Promise((resolve, reject) => { + stream[kAutocommit](error => { + callbackCalled = true + if (error) { + reject(error) + return + } + + strictEqual(commitFinished, true) + resolve() + }) + }) + + strictEqual(callbackCalled, false) + await autocommitFinished + strictEqual(callbackCalled, true) + + stream.destroy() +}) + test('should route follow-up fetches to preferred read replicas', async t => { const nodes: number[] = [] let resolveSecondFetch!: () => void diff --git a/test/clients/consumer/partitions-assigners.test.ts b/test/clients/consumer/partitions-assigners.test.ts new file mode 100644 index 00000000..52cac4a0 --- /dev/null +++ b/test/clients/consumer/partitions-assigners.test.ts @@ -0,0 +1,181 @@ +import { deepStrictEqual } from 'node:assert' +import { test } from 'node:test' +import { + cooperativeStickyAssigner, + type ClusterMetadata, + type ExtendedGroupProtocolSubscription, + type GroupPartitionsAssignments +} from '../../../src/index.ts' + +const topic = 'topic-a' +const otherTopic = 'topic-b' + +function createMetadata (partitionsCount: number): ClusterMetadata { + return { + id: 'cluster-id', + brokers: new Map(), + controllerId: 1, + topics: new Map([ + [ + topic, + { + id: 'topic-id', + partitions: Array.from({ length: partitionsCount }, () => ({ + leader: 1, + leaderEpoch: 0, + replicas: [1], + isr: [1], + offlineReplicas: [] + })), + partitionsCount, + lastUpdate: Date.now() + } + ] + ]), + lastUpdate: Date.now() + } +} + +function createMember ( + memberId: string, + ownedPartitions: number[] = [], + generationId = 1 +): [string, ExtendedGroupProtocolSubscription] { + return createMemberWithTopics(memberId, [topic], ownedPartitions.length > 0 ? [{ topic, partitions: ownedPartitions }] : [], generationId) +} + +function createMemberWithTopics ( + memberId: string, + topics: string[], + ownedPartitions: { topic: string; partitions: number[] }[] = [], + generationId = 1 +): [string, ExtendedGroupProtocolSubscription] { + return [ + memberId, + { + memberId, + version: 3, + topics, + metadata: Buffer.alloc(0), + ownedPartitions, + generationId, + rackId: null + } + ] +} + +function createMultiTopicMetadata (): ClusterMetadata { + const metadata = createMetadata(3) + metadata.topics.set(otherTopic, { + id: 'other-topic-id', + partitions: Array.from({ length: 3 }, () => ({ + leader: 1, + leaderEpoch: 0, + replicas: [1], + isr: [1], + offlineReplicas: [] + })), + partitionsCount: 3, + lastUpdate: Date.now() + }) + return metadata +} + +function assignmentsObject (assignments: GroupPartitionsAssignments[]): Record { + const result: Record = {} + + for (const assignment of assignments) { + result[assignment.memberId] = assignment.assignments.get(topic)?.partitions ?? [] + } + + return result +} + +test('cooperative sticky assignor should balance initial assignments', () => { + const members = new Map([createMember('consumer-1'), createMember('consumer-2')]) + const assignments = cooperativeStickyAssigner('consumer-1', members, new Set([topic]), createMetadata(4)) + + deepStrictEqual(assignmentsObject(assignments), { + 'consumer-1': [0, 2], + 'consumer-2': [1, 3] + }) +}) + +test('cooperative sticky assignor should not transfer ownership in the same rebalance', () => { + const members = new Map([createMember('consumer-1', [0, 1, 2, 3]), createMember('consumer-2')]) + const assignments = cooperativeStickyAssigner('consumer-1', members, new Set([topic]), createMetadata(4)) + + deepStrictEqual(assignmentsObject(assignments), { + 'consumer-1': [2, 3], + 'consumer-2': [] + }) +}) + +test('cooperative sticky assignor should complete transfer after revoked partitions are no longer owned', () => { + const members = new Map([createMember('consumer-1', [2, 3], 2), createMember('consumer-2', [], 2)]) + const assignments = cooperativeStickyAssigner('consumer-1', members, new Set([topic]), createMetadata(4)) + + deepStrictEqual(assignmentsObject(assignments), { + 'consumer-1': [2, 3], + 'consumer-2': [0, 1] + }) +}) + +test('cooperative sticky assignor should invalidate duplicate same-generation ownership claims', () => { + const members = new Map([ + createMember('consumer-1', [0, 1]), + createMember('consumer-2', [0, 2]), + createMember('consumer-3') + ]) + const assignments = cooperativeStickyAssigner('consumer-1', members, new Set([topic]), createMetadata(3)) + + deepStrictEqual(assignmentsObject(assignments), { + 'consumer-1': [1], + 'consumer-2': [2], + 'consumer-3': [] + }) +}) + +test('cooperative sticky assignor should revoke duplicate ownership before assigning to a claimant', () => { + const members = new Map([ + createMember('consumer-1', [0]), + createMember('consumer-2', [0, 1]), + createMember('consumer-3') + ]) + const assignments = cooperativeStickyAssigner('consumer-1', members, new Set([topic]), createMetadata(2)) + + deepStrictEqual(assignmentsObject(assignments), { + 'consumer-1': [], + 'consumer-2': [1], + 'consumer-3': [] + }) +}) + +test('cooperative sticky assignor should ignore stale generation ownership claims', () => { + const members = new Map([createMember('consumer-1', [0, 1], 1), createMember('consumer-2', [], 2)]) + const assignments = cooperativeStickyAssigner('consumer-1', members, new Set([topic]), createMetadata(2)) + + deepStrictEqual(assignmentsObject(assignments), { + 'consumer-1': [0], + 'consumer-2': [1] + }) +}) + +test('cooperative sticky assignor should respect mixed topic subscriptions', () => { + const members = new Map([ + createMemberWithTopics('z-a-owner', [topic], [{ topic, partitions: [0, 1, 2] }]), + createMemberWithTopics('y-b-owner', [otherTopic], [{ topic: otherTopic, partitions: [1, 2] }]), + createMemberWithTopics('a-under', [otherTopic], [{ topic: otherTopic, partitions: [0] }]) + ]) + const assignments = cooperativeStickyAssigner( + 'z-a-owner', + members, + new Set([topic, otherTopic]), + createMultiTopicMetadata() + ) + const byMember = new Map(assignments.map(assignment => [assignment.memberId, assignment.assignments])) + + deepStrictEqual(byMember.get('z-a-owner')?.get(topic)?.partitions, [0, 1, 2]) + deepStrictEqual(byMember.get('y-b-owner')?.get(otherTopic)?.partitions, [1, 2]) + deepStrictEqual(byMember.get('a-under')?.get(otherTopic)?.partitions, [0]) +})