Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,11 @@ public class KafkaConnectSink implements Sink<GenericObject> {
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
private boolean collapsePartitionedTopics = false;

private final Cache<String, String> sanitizedTopicCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();

// Can't really safely expire these entries. If we do, we could end up with
// a sanitized topic name that used in e.g. resume() after a long pause but can't be
// // re-resolved into a form usable for Pulsar.
private final Cache<String, String> desanitizedTopicCache =
CacheBuilder.newBuilder().build();

private int maxBatchBitsForOffset = 12;
private boolean useIndexAsOffset = true;

private TopicPartitionResolver topicPartitionResolver;

@Override
public void write(Record<GenericObject> sourceRecord) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -198,19 +190,15 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
x.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic());
});
task = (SinkTask) taskClass.getConstructor().newInstance();

topicPartitionResolver = new TopicPartitionResolver(
topicName,
sanitizeTopicName,
collapsePartitionedTopics);

taskContext =
new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open, kafkaName -> {
if (sanitizeTopicName) {
String pulsarTopicName = desanitizedTopicCache.getIfPresent(kafkaName);
if (log.isDebugEnabled()) {
log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}",
kafkaName, pulsarTopicName);
}
return pulsarTopicName != null ? pulsarTopicName : kafkaName;
} else {
return kafkaName;
}
});
new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open,
topicPartitionResolver::desanitizeTopicName);
task.initialize(taskContext);
task.start(configs.get(0));

Expand Down Expand Up @@ -264,6 +252,10 @@ public void flush() {
ackUntil(lastNotFlushed, committedOffsets, Record::ack);
log.info("Flush succeeded");
} catch (Throwable t) {
if (committedOffsets == null) {
log.error("preCommit failed — retrying to preserve ordering", t);
return;
}
log.error("error flushing pending records", t);
ackUntil(lastNotFlushed, committedOffsets, Record::fail);
} finally {
Expand Down Expand Up @@ -300,13 +292,13 @@ protected void ackUntil(Record<GenericObject> lastNotFlushed,
partitionOffset.put(tp.partition(), e.getValue().offset());
}

int ackRequestedCount = 0;
for (Record<GenericObject> r : pendingFlushQueue) {
final String topic = sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName);
final int partition = r.getPartitionIndex().orElse(0);
ResolvedTopicPartition resolved = topicPartitionResolver.resolve(r);

Long lastCommittedOffset = null;
if (topicOffsets.containsKey(topic)) {
lastCommittedOffset = topicOffsets.get(topic).get(partition);
if (topicOffsets.containsKey(resolved.getTopic())) {
lastCommittedOffset = topicOffsets.get(resolved.getTopic()).get(resolved.getPartition());
}

if (lastCommittedOffset == null) {
Expand All @@ -326,15 +318,20 @@ protected void ackUntil(Record<GenericObject> lastNotFlushed,
}

cb.accept(r);
ackRequestedCount++;
pendingFlushQueue.remove(r);
currentBatchSize.addAndGet(-1 * r.getMessage().get().size());
if (r == lastNotFlushed) {
break;
}
}
if (log.isDebugEnabled()) {
log.debug("ackRequestedCount: {}, committedOffsets: {}", ackRequestedCount, committedOffsets);
}
}

private long getMessageOffset(Record<GenericObject> sourceRecord) {
@VisibleForTesting
long getMessageOffset(Record<GenericObject> sourceRecord) {

if (sourceRecord.getMessage().isPresent()) {
// Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
Expand Down Expand Up @@ -383,6 +380,11 @@ private long getMessageOffset(Record<GenericObject> sourceRecord) {
.orElse(-1L);
}

@VisibleForTesting
TopicPartitionResolver getTopicPartitionResolver() {
return topicPartitionResolver;
}

@Getter
@AllArgsConstructor
static class BatchMessageSequenceRef {
Expand All @@ -391,6 +393,94 @@ static class BatchMessageSequenceRef {
int batchIdx;
}

@Getter
@AllArgsConstructor
static class ResolvedTopicPartition {
private final String topic;
private final int partition;
}

static class TopicPartitionResolver {
private final String topicName;
private final boolean sanitizeTopicName;
private final boolean collapsePartitionedTopics;
private final Cache<String, String> sanitizedTopicCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();

// Can't really safely expire these entries. If we do, we could end up with
// a sanitized topic name that is used in e.g. resume() after a long pause but can't be
// re-resolved into a form usable for Pulsar.
private final Cache<String, String> desanitizedTopicCache =
CacheBuilder.newBuilder().build();

private TopicPartitionResolver(String topicName,
boolean sanitizeTopicName,
boolean collapsePartitionedTopics) {
this.topicName = topicName;
this.sanitizeTopicName = sanitizeTopicName;
this.collapsePartitionedTopics = collapsePartitionedTopics;
}

private ResolvedTopicPartition resolve(Record<GenericObject> sourceRecord) {
final int partition;
final String topic;

if (shouldCollapsePartitionedTopic(sourceRecord)) {
TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
partition = tn.getPartitionIndex();
topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName());
} else {
partition = sourceRecord.getPartitionIndex().orElse(0);
topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName));
}
return new ResolvedTopicPartition(topic, partition);
}

private String desanitizeTopicName(String kafkaName) {
if (sanitizeTopicName) {
String pulsarTopicName = desanitizedTopicCache.getIfPresent(kafkaName);
if (log.isDebugEnabled()) {
log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}",
kafkaName, pulsarTopicName);
}
return pulsarTopicName != null ? pulsarTopicName : kafkaName;
} else {
return kafkaName;
}
}

// Replace all non-letter, non-digit characters with underscore.
// Append underscore in front of name if it does not begin with alphabet or underscore.
String sanitizeNameIfNeeded(String name) {
if (!sanitizeTopicName) {
return name;
}

try {
return sanitizedTopicCache.get(name, () -> {
String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_");
if (sanitizedName.matches("^[^a-zA-Z_].*")) {
sanitizedName = "_" + sanitizedName;
}
// do this once, sanitize() can be called on already sanitized name
// so avoid replacing with (sanitizedName -> sanitizedName).
desanitizedTopicCache.get(sanitizedName, () -> name);
return sanitizedName;
});
} catch (ExecutionException e) {
log.error("Failed to get sanitized topic name for {}", name, e);
throw new IllegalStateException("Failed to get sanitized topic name for " + name, e);
}
}

private boolean shouldCollapsePartitionedTopic(Record<GenericObject> r) {
return collapsePartitionedTopics
&& r.getTopicName().isPresent()
&& TopicName.get(r.getTopicName().get()).isPartitioned();
}
}

private static Method getMethodOfMessageId(MessageId messageId, String name) throws NoSuchMethodException {
Class<?> clazz = messageId.getClass();
NoSuchMethodException firstException = null;
Expand Down Expand Up @@ -437,19 +527,11 @@ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId me

@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition;
final String topic;

if (collapsePartitionedTopics
&& sourceRecord.getTopicName().isPresent()
&& TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
partition = tn.getPartitionIndex();
topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName);
} else {
partition = sourceRecord.getPartitionIndex().orElse(0);
topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
}

ResolvedTopicPartition resolved = topicPartitionResolver.resolve(sourceRecord);
final int partition = resolved.getPartition();
final String topic = resolved.getTopic();

final Object key;
final Object value;
final Schema keySchema;
Expand Down Expand Up @@ -522,31 +604,6 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {

@VisibleForTesting
protected long currentOffset(String topic, int partition) {
return taskContext.currentOffset(sanitizeNameIfNeeded(topic, sanitizeTopicName), partition);
return taskContext.currentOffset(topicPartitionResolver.sanitizeNameIfNeeded(topic), partition);
}

// Replace all non-letter, non-digit characters with underscore.
// Append underscore in front of name if it does not begin with alphabet or underscore.
protected String sanitizeNameIfNeeded(String name, boolean sanitize) {
if (!sanitize) {
return name;
}

try {
return sanitizedTopicCache.get(name, () -> {
String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_");
if (sanitizedName.matches("^[^a-zA-Z_].*")) {
sanitizedName = "_" + sanitizedName;
}
// do this once, sanitize() can be called on already sanitized name
// so avoid replacing with (sanitizedName -> sanitizedName).
desanitizedTopicCache.get(sanitizedName, () -> name);
return sanitizedName;
});
} catch (ExecutionException e) {
log.error("Failed to get sanitized topic name for {}", name, e);
throw new IllegalStateException("Failed to get sanitized topic name for " + name, e);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ public void seekPauseResumeWithSanitizeTest() throws Exception {

assertEquals(status.get(), 1);

final TopicPartition tp = new TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
final TopicPartition tp = new TopicPartition(sink.getTopicPartitionResolver()
.sanitizeNameIfNeeded(pulsarTopicName), 0);
assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId));

Expand Down Expand Up @@ -1574,7 +1575,7 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception {
assertNull(ref);

ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
new TopicMessageIdImpl("topic-0", new MessageIdImpl(ledgerId, entryId, 0))
new TopicMessageIdImpl("topic-0", new MessageIdImpl(ledgerId, entryId, 0))
);
assertNull(ref);

Expand Down Expand Up @@ -1648,7 +1649,7 @@ private void testCollapsePartitionedTopic(boolean isEnabled,
props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));

KafkaConnectSink sink = new KafkaConnectSink();
sink.open(props, context);
sink.open(props, context);

AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema =
AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
Expand Down Expand Up @@ -1682,6 +1683,67 @@ private void testCollapsePartitionedTopic(boolean isEnabled,
sink.close();
}

@Test
public void testAckUntilWithCollapsePartitionedTopics() throws Exception {
testAckUntil(true,
"persistent://a/b/fake-topic-partition-0",
"persistent://a/b/fake-topic",
0);
}

@Test
public void testAckUntilWithoutCollapsePartitionedTopics() throws Exception {
// Note: Without collapsePartitionedTopics expectedPartition in the committedOffsets will always be 0
testAckUntil(false,
"persistent://a/b/fake-topic-partition-1",
"persistent://a/b/fake-topic-partition-1",
0);
}

private void testAckUntil(boolean collapseEnabled,
String pulsarTopic,
String expectedKafkaTopic,
int expectedPartition) throws Exception {
// Setup sink with given collapseEnabled value
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
props.put("collapsePartitionedTopics", Boolean.toString(collapseEnabled));
KafkaConnectSink sink = new KafkaConnectSink();
sink.open(props, context);

// Create pulsar record with given pulsarTopic and expectedPartition
Message msg = mock(MessageImpl.class);
when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 1, expectedPartition));
when(msg.getValue()).thenReturn(null);

AtomicInteger ackCount = new AtomicInteger(0);

Record<GenericObject> record = PulsarRecord.<GenericObject>builder()
.topicName(pulsarTopic)
.message(msg)
.ackFunction(ackCount::incrementAndGet)
.failFunction(() -> {})
.build();

// Add the pulsar record to pendingFlushQueue
sink.pendingFlushQueue.add(record);

// Build committedOffsets with the given expectedKafkaTopic and expectedPartition
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
committedOffsets.put(
new TopicPartition(expectedKafkaTopic, expectedPartition),
new OffsetAndMetadata(sink.getMessageOffset(record))
);

// Trigger ackUntil manually
sink.ackUntil(record, committedOffsets, Record::ack);

// Assert that the ackFunction runnable of the record is called and pendingFlushQueue is empty
Assert.assertEquals(ackCount.get(), 1);
Assert.assertTrue(sink.pendingFlushQueue.isEmpty());

sink.close();
}

@SneakyThrows
private java.util.Date getDateFromString(String dateInString) {
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");
Expand Down