Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,8 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;

/**
Expand Down Expand Up @@ -55,7 +58,7 @@ public class SharedConsumerAssignor {
private final Subscription subscription;

public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata> entryAndMetadataList,
final int numConsumers) {
final int numConsumers) {
assert numConsumers >= 0;
consumerToPermits.clear();
final Map<Consumer, List<EntryAndMetadata>> consumerToEntries = new IdentityHashMap<>();
Expand Down Expand Up @@ -86,17 +89,13 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata>
availablePermits = consumer.getAvailablePermits();
}

if (metadata == null || !metadata.hasUuid() || !metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) {
consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entryAndMetadata);
if (!isChunkedMessage(metadata)) {
addEntry(consumerToEntries, consumer, entryAndMetadata);
availablePermits--;
} else {
final Consumer consumerForUuid = getConsumerForUuid(metadata, consumer, availablePermits);
if (consumerForUuid == null) {
unassignedMessageProcessor.accept(entryAndMetadata);
continue;
}
consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new ArrayList<>()).add(entryAndMetadata);
availablePermits = assignChunk(entryAndMetadata, metadata, consumer, consumerToEntries,
availablePermits);
}
availablePermits--;
}

for (; index < entryAndMetadataList.size(); index++) {
Expand All @@ -106,6 +105,64 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata>
return consumerToEntries;
}

private int assignChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata, Consumer consumer,
Map<Consumer, List<EntryAndMetadata>> consumerToEntries, int availablePermits) {
final String uuid = metadata.getUuid();
Consumer consumerForUuid = uuidToConsumer.get(uuid);
if (consumerForUuid == null) {
if (skipChunk(entryAndMetadata, metadata)) {
return availablePermits;
}
consumerForUuid = consumer;
uuidToConsumer.put(uuid, consumerForUuid);
}
Comment on lines +112 to +118
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this solution would only skip the first entry of possibly multiple chunk entries.
Let's say if entry with chunkId 0 got lost and there would be subsequent entries chunkId 1, chunkId 2 and chunkId 3. The entries with chunkId 2 and chunkId 3 would get delivered to the client, causing a similar issue.


final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits);
if (permits <= 0) {
unassignedMessageProcessor.accept(entryAndMetadata);
return availablePermits;
}
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the uuid from the cache.
uuidToConsumer.remove(uuid);
}

addEntry(consumerToEntries, consumerForUuid, entryAndMetadata);
consumerToPermits.put(consumerForUuid, permits - 1);
if (consumerForUuid == consumer) {
return availablePermits - 1;
}
return availablePermits;
}

private boolean isChunkedMessage(MessageMetadata metadata) {
return metadata != null && metadata.hasUuid() && metadata.hasChunkId() && metadata.hasNumChunksFromMsg();
}

private void addEntry(Map<Consumer, List<EntryAndMetadata>> consumerToEntries, Consumer consumer,
EntryAndMetadata entry) {
consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entry);
}

private boolean skipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipChunk sounds like a command, it's better to rename it to make it sound like a query.

Suggested change
private boolean skipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) {
private boolean shouldSkipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) {

if (metadata.getChunkId() != 0) {
if (subscription != null) {
log.warn("[{}][{}] Skip the message because it is not the first chunk."
+ " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message.
if (!(subscription instanceof PulsarCompactorSubscription)) {
subscription.acknowledgeMessage(Collections.singletonList(
entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap());
}
}
entryAndMetadata.release();
return true;
Comment on lines +149 to +161
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's a potential for data loss, acknowledging the messages should be active only if autoSkipNonRecoverableData is set in broker.conf. The log message should be logged with ERROR level when autoSkipNonRecoverableData isn't set and the message shouldn't get acknowledged. I think it's fine to skip the message in that case so that processing the subscription continues, but there will be a backlog left behind due to the unacked messages.

}
return false;
}

private Consumer getConsumer(final int numConsumers) {
for (int i = 0; i < numConsumers; i++) {
final Consumer consumer = defaultSelector.get();
Expand All @@ -119,29 +176,4 @@ private Consumer getConsumer(final int numConsumers) {
}
return null;
}

private Consumer getConsumerForUuid(final MessageMetadata metadata,
final Consumer defaultConsumer,
final int currentAvailablePermits) {
final String uuid = metadata.getUuid();
Consumer consumer = uuidToConsumer.get(uuid);
if (consumer == null) {
if (metadata.getChunkId() != 0) {
// Not the first chunk, skip it
return null;
}
consumer = defaultConsumer;
uuidToConsumer.put(uuid, consumer);
}
final int permits = consumerToPermits.computeIfAbsent(consumer, Consumer::getAvailablePermits);
if (permits <= 0) {
return null;
}
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the cache
uuidToConsumer.remove(uuid);
}
consumerToPermits.put(consumer, currentAvailablePermits - 1);
return consumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
Expand All @@ -37,6 +43,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -294,4 +301,50 @@ public void testChunkMessagesNotBeLostNoConsumer() {
assertTrue(assignor.getUuidToConsumer().isEmpty());
}

/**
* Simulate the occurrence of chunk messages. When a message with chunk ID 0 is abnormally lost, subsequent chunk
* messages for that batch should be skipped instead of blocking the entire subscription.
*/
@Test
public void testSkipOrphanChunk() {
releaseEntries();
Subscription subscription = mock(Subscription.class);
when(subscription.getTopicName()).thenReturn("test-topic");
when(subscription.getName()).thenReturn("test-sub");

assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, subscription);

final Consumer consumer = new Consumer("C1", 10);
roundRobinConsumerSelector.addConsumers(consumer);

List<EntryAndMetadata> entries = new ArrayList<>();
AtomicLong entryId = new AtomicLong(0);
MockProducer producer = new MockProducer("P", entryId, entries);

// Send conventional message: "0:0@P-0"
producer.sendMessage();

// Simulate the sending of chunk messages with missing chunkId '0'
producer.sendChunk(1, 3);
producer.sendChunk(2, 3);

// Send conventional message: "0:3@P-2"
producer.sendMessage();

Map<Consumer, List<EntryAndMetadata>> result = assignor.assign(entries, 1);

List<EntryAndMetadata> entryAndMetadataList = result.get(consumer);
assertEquals(entryAndMetadataList.size(), 2);
assertEquals(entryAndMetadataList.get(0).toString(), "0:0@P-0");
assertEquals(entryAndMetadataList.get(1).toString(), "0:3@P-2");

verify(subscription, times(2)).acknowledgeMessage(any(), eq(AckType.Individual), any());

assertTrue(replayQueue.isEmpty());
assertTrue(assignor.getUuidToConsumer().isEmpty());

cleanupQueue.addAll(entryAndMetadataList);
}


}
Loading