Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import kafka.server.KafkaBroker;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -2033,4 +2034,63 @@ public void testDescribeShareGroupOffsetsForEmptySharePartition() {
throw new RuntimeException(e);
}
}

@ClusterTest
public void testAddedPartitionsOfKnownTopicStartAtOffsetZero() throws InterruptedException, ExecutionException {
String groupId = "expand-group";
String topic = "expand-topic";
createTopic(topic, 1, 1);

alterShareAutoOffsetReset(groupId, "latest");

try (Producer<byte[], byte[]> producer = createProducer();
Admin admin = createAdminClient();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId)) {

// Produce 10 records to partition 0 before anyone subscribes.
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>(topic, 0, null, ("p0-key-" + i).getBytes(), ("p0-value-" + i).getBytes()));
}
producer.flush();

// The consumer joins and polls: with "latest" it starts partition 0 at the log end, so it receives
// none of the pre-existing records. This also makes the group aware of (initializes) the topic.
shareConsumer.subscribe(Set.of(topic));
int received = 0;
long deadline = System.currentTimeMillis() + 10000L;
while (System.currentTimeMillis() < deadline) {
received += shareConsumer.poll(Duration.ofMillis(1000)).count();
}
assertEquals(0, received, "latest reset should skip records produced before the topic was known");

// Expand the topic and produce 10 records to the brand-new partition 1, each with a distinct,
// known value so we can assert all of them are delivered (none skipped by the latest reset).
admin.createPartitions(Map.of(topic, NewPartitions.increaseTo(2))).all().get();
cluster.waitTopicCreation(topic, 2);

List<String> expectedValues = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String value = "p1-value-" + i;
expectedValues.add(value);
producer.send(new ProducerRecord<>(topic, 1, null, ("p1-key-" + i).getBytes(), value.getBytes()));
}
producer.flush();

// Partition 1 belongs to an already-known topic, so it must be initialized at offset 0 and deliver
// every record produced above — even though it was created under a "latest" reset. Share consumers
// deliver at-least-once, so records may be redelivered; track distinct values in a set so
// duplicates don't cause spurious failures, then assert the count of distinct records.
Set<String> receivedValues = new HashSet<>();
waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<byte[], byte[]> record : records) {
if (record.partition() == 1) {
receivedValues.add(new String(record.value()));
}
}
return receivedValues.size() >= expectedValues.size();
}, 30000L, 500L, () -> "did not receive all records from the newly added partition");
assertEquals(expectedValues.size(), receivedValues.size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3133,18 +3133,42 @@ private Optional<InitializeShareGroupStateParameters> maybeCreateInitializeShare
}

addInitializingTopicsRecords(groupId, records, topicPartitionChangeMap);
return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap));

// Topics already known to the group (already initialized or already initializing) whose
// newly added partitions are now being initialized must start at offset 0 to avoid losing
// records produced before initialization completes. Brand-new topics (seen for the first
// time, present in neither map) use -1 so that the group's share.auto.offset.reset strategy
// applies. This is read before the records above are replayed, so a brand-new topic is
// correctly absent here.
Set<Uuid> alreadyKnownTopics = null;
if (shareGroupStatePartitionMetadata.containsKey(groupId)) {
ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId);
alreadyKnownTopics = new HashSet<>();
alreadyKnownTopics.addAll(info.initializedTopics().keySet());
alreadyKnownTopics.addAll(info.initializingTopics().keySet());
}

return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap, alreadyKnownTopics != null ? alreadyKnownTopics : Set.of()));
}

private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map<Uuid, InitMapValue> topicPartitions) {
private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(
String groupId,
int groupEpoch,
Map<Uuid, InitMapValue> topicPartitions,
Set<Uuid> alreadyKnownTopics
) {
return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId, topicPartitions.entrySet().stream()
.map(entry -> new TopicData<>(
entry.getKey(),
entry.getValue().partitions().stream()
.map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, -1))
.toList())
).toList()
.map(entry -> {
// New partitions of an already-known topic start at 0; brand-new topics use -1
// so that the group's share.auto.offset.reset strategy applies.
long startOffset = alreadyKnownTopics.contains(entry.getKey()) ? 0L : PartitionFactory.UNINITIALIZED_START_OFFSET;
return new TopicData<>(
entry.getKey(),
entry.getValue().partitions().stream()
.map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, startOffset))
.toList());
}).toList()
)).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27184,12 +27184,15 @@ public void testShareGroupHeartbeatInitializeOnPartitionUpdate() {
assertTrue(actual.isPresent());
assertRecordEquals(expected, actual.get());

// t1 is already initialized, so its newly added partitions (2, 3) must start at offset 0
// to avoid losing records produced before initialization completes.
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
t1Uuid,
Set.of(2, 3)
),
Map.of(t1Uuid, 0L),
groupId,
3,
true
Expand All @@ -27199,6 +27202,114 @@ public void testShareGroupHeartbeatInitializeOnPartitionUpdate() {
verify(context.metrics, times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
}

@Test
public void testShareGroupHeartbeatInitializeMixedNewAndExpandedTopics() {
// A single heartbeat that both adds a brand-new topic and expands an already-initialized
// topic must produce one initialize request where the new topic uses the uninitialized (-1)
// start offset (so share.auto.offset.reset applies) while the expanded topic's new
// partitions use a fixed start offset of 0 (to avoid losing records).
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();

Uuid t1Uuid = Uuid.randomUuid();
String t1Name = "t1";
Uuid t2Uuid = Uuid.randomUuid();
String t2Name = "t2";
Uuid t3Uuid = Uuid.randomUuid();
String t3Name = "t3";
CoordinatorMetadataImage image = new MetadataImageBuilder()
.addTopic(t1Uuid, t1Name, 2)
.addTopic(t2Uuid, t2Name, 2)
.buildCoordinatorMetadataImage();

String groupId = "share-group";

context.groupMetadataManager.onMetadataUpdate(mock(CoordinatorMetadataDelta.class), image);

Uuid memberId = Uuid.randomUuid();
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(t1Name, t2Name)));

// Both topics are brand-new, so both initialize at the uninitialized (-1) start offset.
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
t1Uuid, Set.of(0, 1),
t2Uuid, Set.of(0, 1)
),
groupId,
2,
true
);

context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of())
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Uuid)
.setTopicName(t1Name)
.setPartitions(List.of(0, 1)),
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t2Uuid)
.setTopicName(t2Name)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
);

// t1 expands from 2 to 4 partitions and a brand-new topic t3 (3 partitions) is added.
image = new MetadataImageBuilder()
.addTopic(t1Uuid, t1Name, 4)
.addTopic(t2Uuid, t2Name, 2)
.addTopic(t3Uuid, t3Name, 3)
.buildCoordinatorMetadataImage();

context.groupMetadataManager.onMetadataUpdate(mock(CoordinatorMetadataDelta.class), image);

assignor.prepareGroupAssignment(new GroupAssignment(
Map.of(
memberId.toString(),
new MemberAssignmentImpl(
Map.of(
t1Uuid, Set.of(0, 1),
t2Uuid, Set.of(0, 1)
)
)
)
));

result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(2)
.setSubscribedTopicNames(List.of(t1Name, t2Name, t3Name)));

// Single request: t1's new partitions {2,3} (already-initialized topic) start at 0, while
// brand-new topic t3's partitions {0,1,2} use the uninitialized (-1) start offset.
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
t1Uuid, Set.of(2, 3),
t3Uuid, Set.of(0, 1, 2)
),
Map.of(t1Uuid, 0L),
groupId,
3,
true
);
}

@Test
public void testShareGroupHeartbeatDoesNotBumpGroupEpochDuringAssignmentDelay() {
Uuid t1Uuid = Uuid.randomUuid();
Expand Down Expand Up @@ -27378,9 +27489,12 @@ public void testShareGroupHeartbeatPersisterRequestWithInitializing() {
))
);

// t1 was already in the initializing state, so retrying its initialization uses a fixed
// start offset of 0 rather than re-deferring to the share.auto.offset.reset strategy.
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(t1Uuid, Set.of(0, 1)),
Map.of(t1Uuid, 0L),
groupId,
3,
true
Expand Down Expand Up @@ -29056,17 +29170,30 @@ private void verifyShareGroupHeartbeatInitializeRequest(
String groupId,
int stateEpoch,
boolean shouldExist
) {
// Brand-new topics are expected to use the uninitialized (-1) start offset.
verifyShareGroupHeartbeatInitializeRequest(initRequest, expectedTopicPartitionsMap, Map.of(), groupId, stateEpoch, shouldExist);
}

private void verifyShareGroupHeartbeatInitializeRequest(
Optional<InitializeShareGroupStateParameters> initRequest,
Map<Uuid, Set<Integer>> expectedTopicPartitionsMap,
Map<Uuid, Long> expectedStartOffsetByTopic,
String groupId,
int stateEpoch,
boolean shouldExist
) {
if (shouldExist) {
assertTrue(initRequest.isPresent());
InitializeShareGroupStateParameters request = initRequest.get();
assertEquals(groupId, request.groupTopicPartitionData().groupId());
Map<Uuid, Set<Integer>> actualTopicPartitionsMap = new HashMap<>();
for (TopicData<PartitionStateData> topicData : request.groupTopicPartitionData().topicsData()) {
long expectedStartOffset = expectedStartOffsetByTopic.getOrDefault(topicData.topicId(), -1L);
actualTopicPartitionsMap.computeIfAbsent(topicData.topicId(), k -> new HashSet<>())
.addAll(topicData.partitions().stream().map(partitionData -> {
assertEquals(stateEpoch, partitionData.stateEpoch());
assertEquals(-1, partitionData.startOffset());
assertEquals(expectedStartOffset, partitionData.startOffset());
return partitionData.partition();
}).toList());
}
Expand Down
Loading