KAFKA-20742: Start offset for newly initialized share partitions of known topics should be 0#22714
Conversation
smjn
left a comment
There was a problem hiding this comment.
Thanks for the PR. Lets add an integ test to verify the change since it is in fundamental path. The scenario could be something like:
- create a topic with 1 partition and produce 10 records
- init share group with LATEST (do it explicitly)
- share consumer subscribes and polls - should not receive anything
- alter partitions (add one more)
- produce to new partition 10 records
- poll and assert on the new partitions 10 records
| // time, present in neither map) use -1 so that the group's share.auto.offset.reset strategy | ||
| // applies. This is read before the records above are replayed, so a brand-new topic is | ||
| // correctly absent here. | ||
| ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId); |
There was a problem hiding this comment.
This looks good since if and when a request with new partitions on existing topic comes, it implies that previous requests have already produced the group coord records which have been updated in the info map.
| } | ||
| return receivedValues.size() >= expectedValues.size(); | ||
| }, 30000L, 500L, () -> "did not receive all records from the newly added partition"); | ||
| assertEquals(expectedValues, receivedValues); |
There was a problem hiding this comment.
can this cause flakiness if there are duplicate records?
There was a problem hiding this comment.
Thanks for the review! Yes, I noticed that given the at-least-once semantics of Share Group, we may have flakes here. I have updated the same to assert on the size of Set.
smjn
left a comment
There was a problem hiding this comment.
Thanks for the change, LGTM
apoorvmittal10
left a comment
There was a problem hiding this comment.
LGTM, minor nit. Let me know if you still prefer the current code.
| ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId); | ||
| Set<Uuid> alreadyKnownTopics = new HashSet<>(); | ||
| if (info != null) { | ||
| alreadyKnownTopics.addAll(info.initializedTopics().keySet()); | ||
| alreadyKnownTopics.addAll(info.initializingTopics().keySet()); | ||
| } |
There was a problem hiding this comment.
nit: Just to avoid unneccessary HashSet.
Set<Uuid> alreadyKnownTopics = null;
if (shareGroupStatePartitionMetadata.containsKey(groupId)) {
ShareGroupStatePartitionMetadataInfo info = shareGroupStatePartitionMetadata.get(groupId);
alreadyKnownTopics = new HashSet<>();
alreadyKnownTopics.addAll(info.initializedTopics().keySet());
alreadyKnownTopics.addAll(info.initializingTopics().keySet());
}
There was a problem hiding this comment.
Thanks for the review. I have updated the PR accordingly.
| alreadyKnownTopics.addAll(info.initializingTopics().keySet()); | ||
| } | ||
|
|
||
| return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap, alreadyKnownTopics)); |
There was a problem hiding this comment.
nit: and here if above change looks good
| return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap, alreadyKnownTopics)); | |
| return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap, alreadyKnownTopics != null ? alreadyKnownTopics : Set.of())); |
Summary
When a share group heartbeat detects subscribed-but-uninitialized
topic-partitions, the group coordinator builds an
InitializeShareGroupStateParametersrequest. Previously it sentstartOffset = -1(PartitionFactory.UNINITIALIZED_START_OFFSET) forevery partition.
-1is a sentinel meaning "not yet decided":SharePartition.startOffsetDuringInitialization()resolves the realoffset from the group's
share.auto.offset.resetstrategy (defaultLATEST). This is correct for a brand-new topic subscription, butincorrect when new partitions are added to a topic the group already
knows about. Resolving those new partitions via
LATESTcan skiprecords produced before initialization completes, causing data loss.
This change makes newly initialized partitions of an already-known
topic (present in the group's
initializedTopicsorinitializingTopics) start at offset0, guaranteeing no records aremissed. Partitions of a topic seen for the first time keep
-1so thatshare.auto.offset.resetstill applies to fresh subscriptions.Change
In
GroupMetadataManager:maybeCreateInitializeShareGroupStateRequestcomputes the set ofalready-known topics (
initializedTopics ∪ initializingTopics) from thecurrent persisted metadata. This is read before the heartbeat's records
are replayed, so a brand-new topic is correctly absent.
buildInitializeShareGroupStateRequestselects the start offset pertopic:
0for an already-known topic,-1for a first-sighting topic.Testing
testShareGroupHeartbeatInitializeMixedNewAndExpandedTopics: asingle heartbeat that both adds a brand-new topic and expands an
already-initialized topic produces one request where the new topic uses
-1and the expanded topic's new partitions use0— verifying thedecision is per-topic, not per-request.
Reviewers: Sushant Mahajan smahajan@confluent.io, Apoorv Mittal
apoorvmittal10@gmail.com