[fix][io] Fix acknowledgments not being sent when collapsePartitionedTopics=true in kafka-connect-adapter#11
Conversation
|
@sandeep-mst Please rebase / merge latest master. I didn't have permission to do that for your PR branch. ❯ gh pr update-branch 11
GraphQL: user doesn't have permission to update head repository (updatePullRequestBranch) |
9884c9c to
b4c15e3
Compare
|
@lhotari done. |
There was a problem hiding this comment.
Pull request overview
Fixes missing acknowledgments in the Kafka Connect sink when collapsePartitionedTopics=true by ensuring ackUntil() derives the Kafka topic/partition the same way as toSinkRecord(), avoiding topic-name mismatches for partitioned topics.
Changes:
- Update
ackUntil()to compute topic + partition using the same collapse logic astoSinkRecord(). - Factor collapse detection into a shared helper and improve debug logging around acknowledgments.
- Add unit tests covering
ackUntil()behavior withcollapsePartitionedTopicsenabled/disabled.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java | Align ackUntil() topic/partition derivation with toSinkRecord() via a shared helper; add debug logging. |
| kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java | Add focused tests validating ackUntil() ack behavior for collapsed vs non-collapsed partitioned topics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@sandeep-mst Please check the review comments |
…as well and added integration tests for the same
b4c15e3 to
696d014
Compare
|
@lhotari All the review comments have been resolved. Please review when possible. Thanks and Regards |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Fixes 25450
Motivation
Fix acknowledgments not being sent when the
collapsePartitionedTopics=trueon kafka-connect-adapter. The topic name stored in sinkrecord and currentOffsets is different from the topic name stored in the pulsar record.The topic name used in SinkRecord / currentOffsets is collapsed (base topic name) and the topic name in the original Pulsar Record remains non-collapsed (includes -partition-X)
As a result, during
ackUntil()nullbecause of topic name mismatch.Modifications
Updated
ackUntil()to derive topic and partition from the source Record using the same logic astoSinkRecord()(i.e., respectingcollapsePartitionedTopics)Added a debug log and
ackRequestedCountfor better logging clarity.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository:
cognitree#1