diff --git a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 69f1959537..14a3d677c8 100644 --- a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -98,19 +98,11 @@ public class KafkaConnectSink implements Sink { // Thi is a workaround for https://github.com/apache/pulsar/issues/19922 private boolean collapsePartitionedTopics = false; - private final Cache sanitizedTopicCache = - CacheBuilder.newBuilder().maximumSize(1000) - .expireAfterAccess(30, TimeUnit.MINUTES).build(); - - // Can't really safely expire these entries. If we do, we could end up with - // a sanitized topic name that used in e.g. resume() after a long pause but can't be - // // re-resolved into a form usable for Pulsar. - private final Cache desanitizedTopicCache = - CacheBuilder.newBuilder().build(); - private int maxBatchBitsForOffset = 12; private boolean useIndexAsOffset = true; + private TopicPartitionResolver topicPartitionResolver; + @Override public void write(Record sourceRecord) { if (log.isDebugEnabled()) { @@ -198,19 +190,15 @@ public void open(Map config, SinkContext ctx) throws Exception { x.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic()); }); task = (SinkTask) taskClass.getConstructor().newInstance(); + + topicPartitionResolver = new TopicPartitionResolver( + topicName, + sanitizeTopicName, + collapsePartitionedTopics); + taskContext = - new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open, kafkaName -> { - if (sanitizeTopicName) { - String pulsarTopicName = desanitizedTopicCache.getIfPresent(kafkaName); - if (log.isDebugEnabled()) { - log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}", - kafkaName, pulsarTopicName); - } - return pulsarTopicName != null ? pulsarTopicName : kafkaName; - } else { - return kafkaName; - } - }); + new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open, + topicPartitionResolver::desanitizeTopicName); task.initialize(taskContext); task.start(configs.get(0)); @@ -264,6 +252,10 @@ public void flush() { ackUntil(lastNotFlushed, committedOffsets, Record::ack); log.info("Flush succeeded"); } catch (Throwable t) { + if (committedOffsets == null) { + log.error("preCommit failed — retrying to preserve ordering", t); + return; + } log.error("error flushing pending records", t); ackUntil(lastNotFlushed, committedOffsets, Record::fail); } finally { @@ -300,13 +292,13 @@ protected void ackUntil(Record lastNotFlushed, partitionOffset.put(tp.partition(), e.getValue().offset()); } + int ackRequestedCount = 0; for (Record r : pendingFlushQueue) { - final String topic = sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName); - final int partition = r.getPartitionIndex().orElse(0); + ResolvedTopicPartition resolved = topicPartitionResolver.resolve(r); Long lastCommittedOffset = null; - if (topicOffsets.containsKey(topic)) { - lastCommittedOffset = topicOffsets.get(topic).get(partition); + if (topicOffsets.containsKey(resolved.getTopic())) { + lastCommittedOffset = topicOffsets.get(resolved.getTopic()).get(resolved.getPartition()); } if (lastCommittedOffset == null) { @@ -326,15 +318,20 @@ protected void ackUntil(Record lastNotFlushed, } cb.accept(r); + ackRequestedCount++; pendingFlushQueue.remove(r); currentBatchSize.addAndGet(-1 * r.getMessage().get().size()); if (r == lastNotFlushed) { break; } } + if (log.isDebugEnabled()) { + log.debug("ackRequestedCount: {}, committedOffsets: {}", ackRequestedCount, committedOffsets); + } } - private long getMessageOffset(Record sourceRecord) { + @VisibleForTesting + long getMessageOffset(Record sourceRecord) { if (sourceRecord.getMessage().isPresent()) { // Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present. @@ -383,6 +380,11 @@ private long getMessageOffset(Record sourceRecord) { .orElse(-1L); } + @VisibleForTesting + TopicPartitionResolver getTopicPartitionResolver() { + return topicPartitionResolver; + } + @Getter @AllArgsConstructor static class BatchMessageSequenceRef { @@ -391,6 +393,94 @@ static class BatchMessageSequenceRef { int batchIdx; } + @Getter + @AllArgsConstructor + static class ResolvedTopicPartition { + private final String topic; + private final int partition; + } + + static class TopicPartitionResolver { + private final String topicName; + private final boolean sanitizeTopicName; + private final boolean collapsePartitionedTopics; + private final Cache sanitizedTopicCache = + CacheBuilder.newBuilder().maximumSize(1000) + .expireAfterAccess(30, TimeUnit.MINUTES).build(); + + // Can't really safely expire these entries. If we do, we could end up with + // a sanitized topic name that is used in e.g. resume() after a long pause but can't be + // re-resolved into a form usable for Pulsar. + private final Cache desanitizedTopicCache = + CacheBuilder.newBuilder().build(); + + private TopicPartitionResolver(String topicName, + boolean sanitizeTopicName, + boolean collapsePartitionedTopics) { + this.topicName = topicName; + this.sanitizeTopicName = sanitizeTopicName; + this.collapsePartitionedTopics = collapsePartitionedTopics; + } + + private ResolvedTopicPartition resolve(Record sourceRecord) { + final int partition; + final String topic; + + if (shouldCollapsePartitionedTopic(sourceRecord)) { + TopicName tn = TopicName.get(sourceRecord.getTopicName().get()); + partition = tn.getPartitionIndex(); + topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName()); + } else { + partition = sourceRecord.getPartitionIndex().orElse(0); + topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName)); + } + return new ResolvedTopicPartition(topic, partition); + } + + private String desanitizeTopicName(String kafkaName) { + if (sanitizeTopicName) { + String pulsarTopicName = desanitizedTopicCache.getIfPresent(kafkaName); + if (log.isDebugEnabled()) { + log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}", + kafkaName, pulsarTopicName); + } + return pulsarTopicName != null ? pulsarTopicName : kafkaName; + } else { + return kafkaName; + } + } + + // Replace all non-letter, non-digit characters with underscore. + // Append underscore in front of name if it does not begin with alphabet or underscore. + String sanitizeNameIfNeeded(String name) { + if (!sanitizeTopicName) { + return name; + } + + try { + return sanitizedTopicCache.get(name, () -> { + String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_"); + if (sanitizedName.matches("^[^a-zA-Z_].*")) { + sanitizedName = "_" + sanitizedName; + } + // do this once, sanitize() can be called on already sanitized name + // so avoid replacing with (sanitizedName -> sanitizedName). + desanitizedTopicCache.get(sanitizedName, () -> name); + return sanitizedName; + }); + } catch (ExecutionException e) { + log.error("Failed to get sanitized topic name for {}", name, e); + throw new IllegalStateException("Failed to get sanitized topic name for " + name, e); + } + } + + private boolean shouldCollapsePartitionedTopic(Record r) { + return collapsePartitionedTopics + && r.getTopicName().isPresent() + && TopicName.get(r.getTopicName().get()).isPartitioned(); + } + } + private static Method getMethodOfMessageId(MessageId messageId, String name) throws NoSuchMethodException { Class clazz = messageId.getClass(); NoSuchMethodException firstException = null; @@ -437,19 +527,11 @@ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId me @SuppressWarnings("rawtypes") protected SinkRecord toSinkRecord(Record sourceRecord) { - final int partition; - final String topic; - - if (collapsePartitionedTopics - && sourceRecord.getTopicName().isPresent() - && TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) { - TopicName tn = TopicName.get(sourceRecord.getTopicName().get()); - partition = tn.getPartitionIndex(); - topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName); - } else { - partition = sourceRecord.getPartitionIndex().orElse(0); - topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName); - } + + ResolvedTopicPartition resolved = topicPartitionResolver.resolve(sourceRecord); + final int partition = resolved.getPartition(); + final String topic = resolved.getTopic(); + final Object key; final Object value; final Schema keySchema; @@ -522,31 +604,6 @@ protected SinkRecord toSinkRecord(Record sourceRecord) { @VisibleForTesting protected long currentOffset(String topic, int partition) { - return taskContext.currentOffset(sanitizeNameIfNeeded(topic, sanitizeTopicName), partition); + return taskContext.currentOffset(topicPartitionResolver.sanitizeNameIfNeeded(topic), partition); } - - // Replace all non-letter, non-digit characters with underscore. - // Append underscore in front of name if it does not begin with alphabet or underscore. - protected String sanitizeNameIfNeeded(String name, boolean sanitize) { - if (!sanitize) { - return name; - } - - try { - return sanitizedTopicCache.get(name, () -> { - String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_"); - if (sanitizedName.matches("^[^a-zA-Z_].*")) { - sanitizedName = "_" + sanitizedName; - } - // do this once, sanitize() can be called on already sanitized name - // so avoid replacing with (sanitizedName -> sanitizedName). - desanitizedTopicCache.get(sanitizedName, () -> name); - return sanitizedName; - }); - } catch (ExecutionException e) { - log.error("Failed to get sanitized topic name for {}", name, e); - throw new IllegalStateException("Failed to get sanitized topic name for " + name, e); - } - } - -} +} \ No newline at end of file diff --git a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 8bcad683bb..74af038b31 100644 --- a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -344,7 +344,8 @@ public void seekPauseResumeWithSanitizeTest() throws Exception { assertEquals(status.get(), 1); - final TopicPartition tp = new TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0); + final TopicPartition tp = new TopicPartition(sink.getTopicPartitionResolver() + .sanitizeNameIfNeeded(pulsarTopicName), 0); assertNotEquals(FunctionCommon.getSequenceId(msgId), 0); assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId)); @@ -1574,7 +1575,7 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception { assertNull(ref); ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage( - new TopicMessageIdImpl("topic-0", new MessageIdImpl(ledgerId, entryId, 0)) + new TopicMessageIdImpl("topic-0", new MessageIdImpl(ledgerId, entryId, 0)) ); assertNull(ref); @@ -1648,7 +1649,7 @@ private void testCollapsePartitionedTopic(boolean isEnabled, props.put("collapsePartitionedTopics", Boolean.toString(isEnabled)); KafkaConnectSink sink = new KafkaConnectSink(); - sink.open(props, context); + sink.open(props, context); AvroSchema pulsarAvroSchema = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class); @@ -1682,6 +1683,67 @@ private void testCollapsePartitionedTopic(boolean isEnabled, sink.close(); } + @Test + public void testAckUntilWithCollapsePartitionedTopics() throws Exception { + testAckUntil(true, + "persistent://a/b/fake-topic-partition-0", + "persistent://a/b/fake-topic", + 0); + } + + @Test + public void testAckUntilWithoutCollapsePartitionedTopics() throws Exception { + // Note: Without collapsePartitionedTopics expectedPartition in the committedOffsets will always be 0 + testAckUntil(false, + "persistent://a/b/fake-topic-partition-1", + "persistent://a/b/fake-topic-partition-1", + 0); + } + + private void testAckUntil(boolean collapseEnabled, + String pulsarTopic, + String expectedKafkaTopic, + int expectedPartition) throws Exception { + // Setup sink with given collapseEnabled value + props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName()); + props.put("collapsePartitionedTopics", Boolean.toString(collapseEnabled)); + KafkaConnectSink sink = new KafkaConnectSink(); + sink.open(props, context); + + // Create pulsar record with given pulsarTopic and expectedPartition + Message msg = mock(MessageImpl.class); + when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 1, expectedPartition)); + when(msg.getValue()).thenReturn(null); + + AtomicInteger ackCount = new AtomicInteger(0); + + Record record = PulsarRecord.builder() + .topicName(pulsarTopic) + .message(msg) + .ackFunction(ackCount::incrementAndGet) + .failFunction(() -> {}) + .build(); + + // Add the pulsar record to pendingFlushQueue + sink.pendingFlushQueue.add(record); + + // Build committedOffsets with the given expectedKafkaTopic and expectedPartition + Map committedOffsets = new HashMap<>(); + committedOffsets.put( + new TopicPartition(expectedKafkaTopic, expectedPartition), + new OffsetAndMetadata(sink.getMessageOffset(record)) + ); + + // Trigger ackUntil manually + sink.ackUntil(record, committedOffsets, Record::ack); + + // Assert that the ackFunction runnable of the record is called and pendingFlushQueue is empty + Assert.assertEquals(ackCount.get(), 1); + Assert.assertTrue(sink.pendingFlushQueue.isEmpty()); + + sink.close(); + } + @SneakyThrows private java.util.Date getDateFromString(String dateInString) { SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");