Summary
When partitions are scheduled for streaming in TakePartitionForStreamingOperation, each partition's offset is resolved individually via PartitionFactory.getPartition() → PartitionOffsetProvider.getOffset(). This results in N sequential calls to the Kafka Connect offset storage for N partitions. During rebalancing scenarios with many partitions, this significantly slows down partition scheduling.
Additionally, the existing batch method PartitionOffsetProvider.getOffsets() calls offsetStorageReader.offsets() directly without any timeout protection. The single-partition path has a 5-second timeout via Future.get(), but the batch path can block indefinitely if the offset storage is slow or unresponsive.
Impact
- Partition scheduling latency scales linearly with partition count during rebalances
- A stuck offset storage reader can hang the task indefinitely on the batch path
Proposed Fix
- Add
PartitionFactory.getPartitions() to resolve all partition offsets in a single batch call
- Wire
TakePartitionForStreamingOperation to use the batch path instead of per-partition lookups
- Add 30-second timeout protection on the batch
offsetStorageReader.offsets() call
- Remove dead
getOffsetMap() method from PartitionOffsetProvider
- Return empty map instead of null from
PartitionOffset.getOffset() to prevent downstream NPEs
Summary
When partitions are scheduled for streaming in
TakePartitionForStreamingOperation, each partition's offset is resolved individually viaPartitionFactory.getPartition()→PartitionOffsetProvider.getOffset(). This results in N sequential calls to the Kafka Connect offset storage for N partitions. During rebalancing scenarios with many partitions, this significantly slows down partition scheduling.Additionally, the existing batch method
PartitionOffsetProvider.getOffsets()callsoffsetStorageReader.offsets()directly without any timeout protection. The single-partition path has a 5-second timeout viaFuture.get(), but the batch path can block indefinitely if the offset storage is slow or unresponsive.Impact
Proposed Fix
PartitionFactory.getPartitions()to resolve all partition offsets in a single batch callTakePartitionForStreamingOperationto use the batch path instead of per-partition lookupsoffsetStorageReader.offsets()callgetOffsetMap()method fromPartitionOffsetProviderPartitionOffset.getOffset()to prevent downstream NPEs