From 61d4da77a4d6c75d6aadc0d65274bdd2d866822f Mon Sep 17 00:00:00 2001 From: sandeep-mst Date: Wed, 1 Apr 2026 12:40:27 +0530 Subject: [PATCH] added check for collapsePartitionedTopic for sending acknowledgments as well and added integration tests for the same --- .../io/kafka/connect/KafkaConnectSink.java | 30 +++++++-- .../kafka/connect/KafkaConnectSinkTest.java | 61 +++++++++++++++++++ 2 files changed, 85 insertions(+), 6 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 69f1959537b87..c8085eb7c90a4 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -300,9 +300,18 @@ protected void ackUntil(Record lastNotFlushed, partitionOffset.put(tp.partition(), e.getValue().offset()); } + int ackRequestedCount = 0; for (Record r : pendingFlushQueue) { - final String topic = sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName); - final int partition = r.getPartitionIndex().orElse(0); + final String topic; + final int partition; + if (shouldCollapsePartitionedTopic(r)) { + TopicName tn = TopicName.get(r.getTopicName().get()); + partition = tn.getPartitionIndex(); + topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName); + } else { + partition = r.getPartitionIndex().orElse(0); + topic = sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName); + } Long lastCommittedOffset = null; if (topicOffsets.containsKey(topic)) { @@ -326,15 +335,26 @@ protected void ackUntil(Record lastNotFlushed, } cb.accept(r); + ackRequestedCount++; pendingFlushQueue.remove(r); currentBatchSize.addAndGet(-1 * r.getMessage().get().size()); if (r == lastNotFlushed) { break; } } + if (log.isDebugEnabled()) { + log.debug("ackRequestedCount: {}, committedOffsets: {}", ackRequestedCount, committedOffsets); + } } - private long getMessageOffset(Record sourceRecord) { + private boolean shouldCollapsePartitionedTopic(Record r) { + return collapsePartitionedTopics + && r.getTopicName().isPresent() + && TopicName.get(r.getTopicName().get()).isPartitioned(); + } + + @VisibleForTesting + long getMessageOffset(Record sourceRecord) { if (sourceRecord.getMessage().isPresent()) { // Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present. @@ -440,9 +460,7 @@ protected SinkRecord toSinkRecord(Record sourceRecord) { final int partition; final String topic; - if (collapsePartitionedTopics - && sourceRecord.getTopicName().isPresent() - && TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) { + if (shouldCollapsePartitionedTopic(sourceRecord)) { TopicName tn = TopicName.get(sourceRecord.getTopicName().get()); partition = tn.getPartitionIndex(); topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 8bcad683bb691..b1a286c5e2fbe 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -1682,6 +1682,67 @@ private void testCollapsePartitionedTopic(boolean isEnabled, sink.close(); } + @Test + public void testAckUntilWithCollapsePartitionedTopics() throws Exception { + testAckUntil(true, + "persistent://a/b/fake-topic-partition-0", + "persistent://a/b/fake-topic", + 0); + } + + @Test + public void testAckUntilWithoutCollapsePartitionedTopics() throws Exception { + // Note: Without collapsePartitionedTopics expectedPartition in the committedOffsets will always be 0 + testAckUntil(false, + "persistent://a/b/fake-topic-partition-1", + "persistent://a/b/fake-topic-partition-1", + 0); + } + + private void testAckUntil(boolean collapseEnabled, + String pulsarTopic, + String expectedKafkaTopic, + int expectedPartition) throws Exception { + // Setup sink with given collapseEnabled value + props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName()); + props.put("collapsePartitionedTopics", Boolean.toString(collapseEnabled)); + KafkaConnectSink sink = new KafkaConnectSink(); + sink.open(props, context); + + // Create pulsar record with given pulsarTopic and expectedPartition + Message msg = mock(MessageImpl.class); + when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 1, expectedPartition)); + when(msg.getValue()).thenReturn(null); + + AtomicInteger ackCount = new AtomicInteger(0); + + Record record = PulsarRecord.builder() + .topicName(pulsarTopic) + .message(msg) + .ackFunction(ackCount::incrementAndGet) + .failFunction(() -> {}) + .build(); + + // Add the pulsar record to pendingFlushQueue + sink.pendingFlushQueue.add(record); + + // Build committedOffsets with the given expectedKafkaTopic and expectedPartition + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition(expectedKafkaTopic, expectedPartition), + new OffsetAndMetadata(sink.getMessageOffset(record)) + ); + + // Trigger actUntil manually + sink.ackUntil(record, committedOffsets, Record::ack); + + // Assert that the ackFunction runnable of the record is called and pendingFlushQueue is empty + Assert.assertEquals(ackCount.get(), 1); + Assert.assertTrue(sink.pendingFlushQueue.isEmpty()); + + sink.close(); + } + @SneakyThrows private java.util.Date getDateFromString(String dateInString) { SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");