From 7c9ec4205f3197ed5ec6b19a1c57b5b7cf936795 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Wed, 1 Jul 2026 11:46:35 +0530 Subject: [PATCH 1/4] Start Offset must be zero for expanded partitions --- .../group/GroupMetadataManager.java | 39 ++++-- .../group/GroupMetadataManagerTest.java | 129 +++++++++++++++++- 2 files changed, 159 insertions(+), 9 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index ad669498d5bd8..afe59a48ec7d0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3133,18 +3133,41 @@ private Optional maybeCreateInitializeShare } addInitializingTopicsRecords(groupId, records, topicPartitionChangeMap); - return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap)); + + // Topics already known to the group (already initialized or already initializing) whose + // newly added partitions are now being initialized must start at offset 0 to avoid losing + // records produced before initialization completes. Brand-new topics (seen for the first + // time, present in neither map) use -1 so that the group's share.auto.offset.reset strategy + // applies. This is read before the records above are replayed, so a brand-new topic is + // correctly absent here. + ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId); + Set alreadyKnownTopics = new HashSet<>(); + if (info != null) { + alreadyKnownTopics.addAll(info.initializedTopics().keySet()); + alreadyKnownTopics.addAll(info.initializingTopics().keySet()); + } + + return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap, alreadyKnownTopics)); } - private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map topicPartitions) { + private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest( + String groupId, + int groupEpoch, + Map topicPartitions, + Set alreadyKnownTopics + ) { return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData( new GroupTopicPartitionData<>(groupId, topicPartitions.entrySet().stream() - .map(entry -> new TopicData<>( - entry.getKey(), - entry.getValue().partitions().stream() - .map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, -1)) - .toList()) - ).toList() + .map(entry -> { + // New partitions of an already-known topic start at 0; brand-new topics use -1 + // so that the group's share.auto.offset.reset strategy applies. + long startOffset = alreadyKnownTopics.contains(entry.getKey()) ? 0L : PartitionFactory.UNINITIALIZED_START_OFFSET; + return new TopicData<>( + entry.getKey(), + entry.getValue().partitions().stream() + .map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, startOffset)) + .toList()); + }).toList() )).build(); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index d4218bc5adf76..dbc2f4003d83d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -27184,12 +27184,15 @@ public void testShareGroupHeartbeatInitializeOnPartitionUpdate() { assertTrue(actual.isPresent()); assertRecordEquals(expected, actual.get()); + // t1 is already initialized, so its newly added partitions (2, 3) must start at offset 0 + // to avoid losing records produced before initialization completes. verifyShareGroupHeartbeatInitializeRequest( result.response().getValue(), Map.of( t1Uuid, Set.of(2, 3) ), + Map.of(t1Uuid, 0L), groupId, 3, true @@ -27199,6 +27202,114 @@ public void testShareGroupHeartbeatInitializeOnPartitionUpdate() { verify(context.metrics, times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME); } + @Test + public void testShareGroupHeartbeatInitializeMixedNewAndExpandedTopics() { + // A single heartbeat that both adds a brand-new topic and expands an already-initialized + // topic must produce one initialize request where the new topic uses the uninitialized (-1) + // start offset (so share.auto.offset.reset applies) while the expanded topic's new + // partitions use a fixed start offset of 0 (to avoid losing records). + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + Uuid t1Uuid = Uuid.randomUuid(); + String t1Name = "t1"; + Uuid t2Uuid = Uuid.randomUuid(); + String t2Name = "t2"; + Uuid t3Uuid = Uuid.randomUuid(); + String t3Name = "t3"; + CoordinatorMetadataImage image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 2) + .addTopic(t2Uuid, t2Name, 2) + .buildCoordinatorMetadataImage(); + + String groupId = "share-group"; + + context.groupMetadataManager.onMetadataUpdate(mock(CoordinatorMetadataDelta.class), image); + + Uuid memberId = Uuid.randomUuid(); + CoordinatorResult>, CoordinatorRecord> result = context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId.toString()) + .setMemberEpoch(0) + .setSubscribedTopicNames(List.of(t1Name, t2Name))); + + // Both topics are brand-new, so both initialize at the uninitialized (-1) start offset. + verifyShareGroupHeartbeatInitializeRequest( + result.response().getValue(), + Map.of( + t1Uuid, Set.of(0, 1), + t2Uuid, Set.of(0, 1) + ), + groupId, + 2, + true + ); + + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of()) + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t1Uuid) + .setTopicName(t1Name) + .setPartitions(List.of(0, 1)), + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t2Uuid) + .setTopicName(t2Name) + .setPartitions(List.of(0, 1)) + )) + .setDeletingTopics(List.of()) + ); + + // t1 expands from 2 to 4 partitions and a brand-new topic t3 (3 partitions) is added. + image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 4) + .addTopic(t2Uuid, t2Name, 2) + .addTopic(t3Uuid, t3Name, 3) + .buildCoordinatorMetadataImage(); + + context.groupMetadataManager.onMetadataUpdate(mock(CoordinatorMetadataDelta.class), image); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of( + memberId.toString(), + new MemberAssignmentImpl( + Map.of( + t1Uuid, Set.of(0, 1), + t2Uuid, Set.of(0, 1) + ) + ) + ) + )); + + result = context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId.toString()) + .setMemberEpoch(2) + .setSubscribedTopicNames(List.of(t1Name, t2Name, t3Name))); + + // Single request: t1's new partitions {2,3} (already-initialized topic) start at 0, while + // brand-new topic t3's partitions {0,1,2} use the uninitialized (-1) start offset. + verifyShareGroupHeartbeatInitializeRequest( + result.response().getValue(), + Map.of( + t1Uuid, Set.of(2, 3), + t3Uuid, Set.of(0, 1, 2) + ), + Map.of(t1Uuid, 0L), + groupId, + 3, + true + ); + } + @Test public void testShareGroupHeartbeatDoesNotBumpGroupEpochDuringAssignmentDelay() { Uuid t1Uuid = Uuid.randomUuid(); @@ -27378,9 +27489,12 @@ public void testShareGroupHeartbeatPersisterRequestWithInitializing() { )) ); + // t1 was already in the initializing state, so retrying its initialization uses a fixed + // start offset of 0 rather than re-deferring to the share.auto.offset.reset strategy. verifyShareGroupHeartbeatInitializeRequest( result.response().getValue(), Map.of(t1Uuid, Set.of(0, 1)), + Map.of(t1Uuid, 0L), groupId, 3, true @@ -29056,6 +29170,18 @@ private void verifyShareGroupHeartbeatInitializeRequest( String groupId, int stateEpoch, boolean shouldExist + ) { + // Brand-new topics are expected to use the uninitialized (-1) start offset. + verifyShareGroupHeartbeatInitializeRequest(initRequest, expectedTopicPartitionsMap, Map.of(), groupId, stateEpoch, shouldExist); + } + + private void verifyShareGroupHeartbeatInitializeRequest( + Optional initRequest, + Map> expectedTopicPartitionsMap, + Map expectedStartOffsetByTopic, + String groupId, + int stateEpoch, + boolean shouldExist ) { if (shouldExist) { assertTrue(initRequest.isPresent()); @@ -29063,10 +29189,11 @@ private void verifyShareGroupHeartbeatInitializeRequest( assertEquals(groupId, request.groupTopicPartitionData().groupId()); Map> actualTopicPartitionsMap = new HashMap<>(); for (TopicData topicData : request.groupTopicPartitionData().topicsData()) { + long expectedStartOffset = expectedStartOffsetByTopic.getOrDefault(topicData.topicId(), -1L); actualTopicPartitionsMap.computeIfAbsent(topicData.topicId(), k -> new HashSet<>()) .addAll(topicData.partitions().stream().map(partitionData -> { assertEquals(stateEpoch, partitionData.stateEpoch()); - assertEquals(-1, partitionData.startOffset()); + assertEquals(expectedStartOffset, partitionData.startOffset()); return partitionData.partition(); }).toList()); } From 06de01834626454082bddc6ce6f1abc2611f0314 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Wed, 1 Jul 2026 12:50:07 +0530 Subject: [PATCH 2/4] Add integ test --- .../clients/consumer/ShareConsumerTest.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index a9f45bf73b1de..59399e06cd0e2 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -19,6 +19,7 @@ import kafka.server.KafkaBroker; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; import org.apache.kafka.clients.producer.Producer; @@ -2033,4 +2034,61 @@ public void testDescribeShareGroupOffsetsForEmptySharePartition() { throw new RuntimeException(e); } } + + @ClusterTest + public void testAddedPartitionsOfKnownTopicStartAtOffsetZero() throws InterruptedException, ExecutionException { + String groupId = "expand-group"; + String topic = "expand-topic"; + createTopic(topic, 1, 1); + + alterShareAutoOffsetReset(groupId, "latest"); + + try (Producer producer = createProducer(); + Admin admin = createAdminClient(); + ShareConsumer shareConsumer = createShareConsumer(groupId)) { + + // Produce 10 records to partition 0 before anyone subscribes. + for (int i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>(topic, 0, null, ("p0-key-" + i).getBytes(), ("p0-value-" + i).getBytes())); + } + producer.flush(); + + // The consumer joins and polls: with "latest" it starts partition 0 at the log end, so it receives + // none of the pre-existing records. This also makes the group aware of (initializes) the topic. + shareConsumer.subscribe(Set.of(topic)); + int received = 0; + long deadline = System.currentTimeMillis() + 10000L; + while (System.currentTimeMillis() < deadline) { + received += shareConsumer.poll(Duration.ofMillis(1000)).count(); + } + assertEquals(0, received, "latest reset should skip records produced before the topic was known"); + + // Expand the topic and produce 10 records to the brand-new partition 1, each with a distinct, + // known value so we can assert all of them are delivered (none skipped by the latest reset). + admin.createPartitions(Map.of(topic, NewPartitions.increaseTo(2))).all().get(); + cluster.waitTopicCreation(topic, 2); + + List expectedValues = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String value = "p1-value-" + i; + expectedValues.add(value); + producer.send(new ProducerRecord<>(topic, 1, null, ("p1-key-" + i).getBytes(), value.getBytes())); + } + producer.flush(); + + // Partition 1 belongs to an already-known topic, so it must be initialized at offset 0 and deliver + // every record produced above, in order — even though it was created under a "latest" reset. + List receivedValues = new ArrayList<>(); + waitForCondition(() -> { + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord record : records) { + if (record.partition() == 1) { + receivedValues.add(new String(record.value())); + } + } + return receivedValues.size() >= expectedValues.size(); + }, 30000L, 500L, () -> "did not receive all records from the newly added partition"); + assertEquals(expectedValues, receivedValues); + } + } } From 13c982e3711b09f4a983da855de453a2e8c11b78 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Wed, 1 Jul 2026 13:11:08 +0530 Subject: [PATCH 3/4] Fix flaky --- .../apache/kafka/clients/consumer/ShareConsumerTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 59399e06cd0e2..d4865ab6cf837 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -2077,8 +2077,10 @@ public void testAddedPartitionsOfKnownTopicStartAtOffsetZero() throws Interrupte producer.flush(); // Partition 1 belongs to an already-known topic, so it must be initialized at offset 0 and deliver - // every record produced above, in order — even though it was created under a "latest" reset. - List receivedValues = new ArrayList<>(); + // every record produced above — even though it was created under a "latest" reset. Share consumers + // deliver at-least-once, so records may be redelivered; track distinct values in a set so + // duplicates don't cause spurious failures, then assert the count of distinct records. + Set receivedValues = new HashSet<>(); waitForCondition(() -> { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { @@ -2088,7 +2090,7 @@ public void testAddedPartitionsOfKnownTopicStartAtOffsetZero() throws Interrupte } return receivedValues.size() >= expectedValues.size(); }, 30000L, 500L, () -> "did not receive all records from the newly added partition"); - assertEquals(expectedValues, receivedValues); + assertEquals(expectedValues.size(), receivedValues.size()); } } } From 86d28ff0eee82373d9de82d80765640224661863 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Wed, 1 Jul 2026 19:10:33 +0530 Subject: [PATCH 4/4] Address comments --- .../kafka/coordinator/group/GroupMetadataManager.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index afe59a48ec7d0..45cd92bf3d4c4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3140,14 +3140,15 @@ private Optional maybeCreateInitializeShare // time, present in neither map) use -1 so that the group's share.auto.offset.reset strategy // applies. This is read before the records above are replayed, so a brand-new topic is // correctly absent here. - ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId); - Set alreadyKnownTopics = new HashSet<>(); - if (info != null) { + Set alreadyKnownTopics = null; + if (shareGroupStatePartitionMetadata.containsKey(groupId)) { + ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId); + alreadyKnownTopics = new HashSet<>(); alreadyKnownTopics.addAll(info.initializedTopics().keySet()); alreadyKnownTopics.addAll(info.initializingTopics().keySet()); } - return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap, alreadyKnownTopics)); + return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap, alreadyKnownTopics != null ? alreadyKnownTopics : Set.of())); } private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(