From 27ccbce351030159eb64cacf1d70e1c60369cab9 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 30 Dec 2025 16:21:56 +0800 Subject: [PATCH 1/7] [fix][broker]Fixed an issue where the entire subscription would be blocked when a chunk message with an ID of zero did not exist. --- .../service/SharedConsumerAssignor.java | 67 +++++++++++-------- ...PersistentDispatcherMultipleConsumers.java | 2 +- ...entDispatcherMultipleConsumersClassic.java | 2 +- .../service/SharedConsumerAssignorTest.java | 2 +- 4 files changed, 41 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index 2161e418dff00..637cc63111289 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -27,11 +28,16 @@ import java.util.function.Supplier; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription; +import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; /** * The assigner to assign entries to the proper {@link Consumer} in the shared subscription. */ + +@Slf4j @RequiredArgsConstructor public class SharedConsumerAssignor { @@ -50,6 +56,8 @@ public class SharedConsumerAssignor { // Process the unassigned messages, e.g. adding them to the replay queue private final java.util.function.Consumer unassignedMessageProcessor; + private final Subscription subscription; + public Map> assign(final List entryAndMetadataList, final int numConsumers) { assert numConsumers >= 0; @@ -80,15 +88,41 @@ public Map> assign(final List if (metadata == null || !metadata.hasUuid() || !metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) { consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entryAndMetadata); + availablePermits--; } else { - final Consumer consumerForUuid = getConsumerForUuid(metadata, consumer, availablePermits); + final String uuid = metadata.getUuid(); + Consumer consumerForUuid = uuidToConsumer.get(uuid); if (consumerForUuid == null) { - unassignedMessageProcessor.accept(entryAndMetadata); - continue; + if (metadata.getChunkId() != 0) { + if (subscription != null) { + log.warn("[{}][{}] Skip the message because of it not the first chunk." + + " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", + subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), + metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); + // Directly ack the message + if (!(subscription instanceof PulsarCompactorSubscription)) { + subscription.acknowledgeMessage(Collections.singletonList( + entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap()); + entryAndMetadata.release(); + } + } + } + consumerForUuid = consumer; + uuidToConsumer.put(uuid, consumerForUuid); + } + + final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits); + if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) { + // The last chunk is received, we should remove the uuid + uuidToConsumer.remove(uuid); } + consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new ArrayList<>()).add(entryAndMetadata); + consumerToPermits.put(consumerForUuid, permits - 1); + if (consumerForUuid == consumer) { + availablePermits--; + } } - availablePermits--; } for (; index < entryAndMetadataList.size(); index++) { @@ -111,29 +145,4 @@ private Consumer getConsumer(final int numConsumers) { } return null; } - - private Consumer getConsumerForUuid(final MessageMetadata metadata, - final Consumer defaultConsumer, - final int currentAvailablePermits) { - final String uuid = metadata.getUuid(); - Consumer consumer = uuidToConsumer.get(uuid); - if (consumer == null) { - if (metadata.getChunkId() != 0) { - // Not the first chunk, skip it - return null; - } - consumer = defaultConsumer; - uuidToConsumer.put(uuid, consumer); - } - final int permits = consumerToPermits.computeIfAbsent(consumer, Consumer::getAvailablePermits); - if (permits <= 0) { - return null; - } - if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) { - // The last chunk is received, we should remove the cache - uuidToConsumer.remove(uuid); - } - consumerToPermits.put(consumer, currentAvailablePermits - 1); - return consumer; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 097ab9cd0febf..4f33e3e379bf8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -163,7 +163,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.initializeDispatchRateLimiterIfNeeded(); - this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay); + this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay, subscription); ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration(); this.readFailureBackoff = new Backoff( serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 0f496e461b85c..0746b7215b167 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -157,7 +157,7 @@ public PersistentDispatcherMultipleConsumersClassic(PersistentTopic topic, Manag : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.initializeDispatchRateLimiterIfNeeded(); - this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay); + this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay, subscription); this.readFailureBackoff = new Backoff( topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java index 1b253df0f3772..92ff38ca03f1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java @@ -58,7 +58,7 @@ public void prepareData() { roundRobinConsumerSelector.clear(); entryAndMetadataList.clear(); replayQueue.clear(); - assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add); + assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null); final AtomicLong entryId = new AtomicLong(0L); final MockProducer producerA = new MockProducer("A", entryId, entryAndMetadataList); final MockProducer producerB = new MockProducer("B", entryId, entryAndMetadataList); From 9d2d82f0e50f355295d039c2634ce5843e96a677 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Tue, 20 Jan 2026 17:33:04 +0800 Subject: [PATCH 2/7] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../apache/pulsar/broker/service/SharedConsumerAssignor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index 676da4b7b7247..c6ecd34080a54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -28,7 +29,6 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; From df84d98163f7ac3485ec7dcffcf8246626a1f070 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Tue, 20 Jan 2026 17:37:26 +0800 Subject: [PATCH 3/7] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../pulsar/broker/service/SharedConsumerAssignor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index c6ecd34080a54..48f9817997fda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -98,11 +98,11 @@ public Map> assign(final List if (consumerForUuid == null) { if (metadata.getChunkId() != 0) { if (subscription != null) { - log.warn("[{}][{}] Skip the message because of it not the first chunk." + log.warn("[{}][{}] Skip the message because it is not the first chunk." + " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); - // Directly ack the message + // Directly ack the message. if (!(subscription instanceof PulsarCompactorSubscription)) { subscription.acknowledgeMessage(Collections.singletonList( entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap()); @@ -116,7 +116,7 @@ public Map> assign(final List final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits); if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) { - // The last chunk is received, we should remove the uuid + // The last chunk is received, we should remove the uuid from the cache. uuidToConsumer.remove(uuid); } From 1cc432e282c2ad5a80b6f601052c89b3a9558de6 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Tue, 20 Jan 2026 17:55:19 +0800 Subject: [PATCH 4/7] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../org/apache/pulsar/broker/service/SharedConsumerAssignor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index 48f9817997fda..20c71c4f39c82 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -109,6 +109,7 @@ public Map> assign(final List entryAndMetadata.release(); } } + continue; } consumerForUuid = consumer; uuidToConsumer.put(uuid, consumerForUuid); From 5a17585f4f28fa1cc374d64b9510dc74274d3863 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Tue, 27 Jan 2026 18:01:16 +0800 Subject: [PATCH 5/7] [fix][broker] Fix chunked message loss when no consumers are available --- .../service/SharedConsumerAssignor.java | 4 ++ .../service/SharedConsumerAssignorTest.java | 55 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index 20c71c4f39c82..459c2ab169cfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -116,6 +116,10 @@ public Map> assign(final List } final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits); + if (permits <= 0) { + unassignedMessageProcessor.accept(entryAndMetadata); + continue; + } if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) { // The last chunk is received, we should remove the uuid from the cache. uuidToConsumer.remove(uuid); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java index d9e675fa7e543..11d71b4d686bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java @@ -18,6 +18,12 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; @@ -37,6 +43,7 @@ import lombok.RequiredArgsConstructor; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; import org.testng.annotations.AfterMethod; @@ -294,4 +301,52 @@ public void testChunkMessagesNotBeLostNoConsumer() { assertTrue(assignor.getUuidToConsumer().isEmpty()); } + /** + * Simulate the occurrence of chunk messages. When a message with chunk ID 0 is abnormally lost, subsequent chunk + * messages for that batch should be skipped instead of blocking the entire subscription. + */ + @Test + public void testSkipOrphanChunk() { + cleanupQueue.clear(); + Subscription subscription = mock(Subscription.class); + when(subscription.getTopicName()).thenReturn("test-topic"); + when(subscription.getName()).thenReturn("test-sub"); + + assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, subscription); + + final Consumer consumer = new Consumer("C1", 10); + roundRobinConsumerSelector.addConsumers(consumer); + + List entries = new ArrayList<>(); + AtomicLong entryId = new AtomicLong(0); + MockProducer producer = new MockProducer("P", entryId, entries); + + // 0:0@P-0 + producer.sendMessage(); + + // Simulate the sending of chunk messages with missing chunkId '0' + producer.sendChunk(1, 3); + producer.sendChunk(2, 3); + + // 0:3@P-2 + producer.sendMessage(); + + // Add to cleanupQueue but skip the orphan chunk as it will be released by assignor + cleanupQueue.add(entries.get(0)); + cleanupQueue.add(entries.get(3)); + + Map> result = assignor.assign(entries, 1); + + List assigned = result.get(consumer); + assertEquals(assigned.size(), 2); + assertEquals(assigned.get(0).toString(), "0:0@P-0"); + assertEquals(assigned.get(1).toString(), "0:3@P-2"); + + verify(subscription, times(2)).acknowledgeMessage(any(), eq(AckType.Individual), any()); + + assertTrue(replayQueue.isEmpty()); + assertTrue(assignor.getUuidToConsumer().isEmpty()); + } + + } From ee76a338beef1c5a450cde0773165ae4d706931a Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Tue, 27 Jan 2026 21:05:20 +0800 Subject: [PATCH 6/7] [fix][broker]Fixed an issue where the entire subscription would be blocked when a chunk message with an ID of zero did not exist. --- .../service/SharedConsumerAssignor.java | 2 +- .../service/SharedConsumerAssignorTest.java | 20 +++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index 459c2ab169cfb..d3c2c83ec3856 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -106,9 +106,9 @@ public Map> assign(final List if (!(subscription instanceof PulsarCompactorSubscription)) { subscription.acknowledgeMessage(Collections.singletonList( entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap()); - entryAndMetadata.release(); } } + entryAndMetadata.release(); continue; } consumerForUuid = consumer; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java index 11d71b4d686bc..311fe454dcd0f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java @@ -307,7 +307,7 @@ public void testChunkMessagesNotBeLostNoConsumer() { */ @Test public void testSkipOrphanChunk() { - cleanupQueue.clear(); + releaseEntries(); Subscription subscription = mock(Subscription.class); when(subscription.getTopicName()).thenReturn("test-topic"); when(subscription.getName()).thenReturn("test-sub"); @@ -321,31 +321,29 @@ public void testSkipOrphanChunk() { AtomicLong entryId = new AtomicLong(0); MockProducer producer = new MockProducer("P", entryId, entries); - // 0:0@P-0 + // Send conventional message: "0:0@P-0" producer.sendMessage(); // Simulate the sending of chunk messages with missing chunkId '0' producer.sendChunk(1, 3); producer.sendChunk(2, 3); - // 0:3@P-2 + // Send conventional message: "0:3@P-2" producer.sendMessage(); - // Add to cleanupQueue but skip the orphan chunk as it will be released by assignor - cleanupQueue.add(entries.get(0)); - cleanupQueue.add(entries.get(3)); - Map> result = assignor.assign(entries, 1); - List assigned = result.get(consumer); - assertEquals(assigned.size(), 2); - assertEquals(assigned.get(0).toString(), "0:0@P-0"); - assertEquals(assigned.get(1).toString(), "0:3@P-2"); + List entryAndMetadataList = result.get(consumer); + assertEquals(entryAndMetadataList.size(), 2); + assertEquals(entryAndMetadataList.get(0).toString(), "0:0@P-0"); + assertEquals(entryAndMetadataList.get(1).toString(), "0:3@P-2"); verify(subscription, times(2)).acknowledgeMessage(any(), eq(AckType.Individual), any()); assertTrue(replayQueue.isEmpty()); assertTrue(assignor.getUuidToConsumer().isEmpty()); + + cleanupQueue.addAll(entryAndMetadataList); } From 6e2ab6e8ab5e850619f120ef534c0ea10638bd8c Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Wed, 4 Feb 2026 21:13:25 +0800 Subject: [PATCH 7/7] [fix][broker]Fixed an issue where the entire subscription would be blocked when a chunk message with an ID of zero did not exist. --- .../service/SharedConsumerAssignor.java | 103 +++++++++++------- 1 file changed, 63 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index d3c2c83ec3856..184e7aaefe41d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -58,7 +58,7 @@ public class SharedConsumerAssignor { private final Subscription subscription; public Map> assign(final List entryAndMetadataList, - final int numConsumers) { + final int numConsumers) { assert numConsumers >= 0; consumerToPermits.clear(); final Map> consumerToEntries = new IdentityHashMap<>(); @@ -89,47 +89,12 @@ public Map> assign(final List availablePermits = consumer.getAvailablePermits(); } - if (metadata == null || !metadata.hasUuid() || !metadata.hasChunkId() || !metadata.hasNumChunksFromMsg()) { - consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entryAndMetadata); + if (!isChunkedMessage(metadata)) { + addEntry(consumerToEntries, consumer, entryAndMetadata); availablePermits--; } else { - final String uuid = metadata.getUuid(); - Consumer consumerForUuid = uuidToConsumer.get(uuid); - if (consumerForUuid == null) { - if (metadata.getChunkId() != 0) { - if (subscription != null) { - log.warn("[{}][{}] Skip the message because it is not the first chunk." - + " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", - subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), - metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); - // Directly ack the message. - if (!(subscription instanceof PulsarCompactorSubscription)) { - subscription.acknowledgeMessage(Collections.singletonList( - entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap()); - } - } - entryAndMetadata.release(); - continue; - } - consumerForUuid = consumer; - uuidToConsumer.put(uuid, consumerForUuid); - } - - final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits); - if (permits <= 0) { - unassignedMessageProcessor.accept(entryAndMetadata); - continue; - } - if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) { - // The last chunk is received, we should remove the uuid from the cache. - uuidToConsumer.remove(uuid); - } - - consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new ArrayList<>()).add(entryAndMetadata); - consumerToPermits.put(consumerForUuid, permits - 1); - if (consumerForUuid == consumer) { - availablePermits--; - } + availablePermits = assignChunk(entryAndMetadata, metadata, consumer, consumerToEntries, + availablePermits); } } @@ -140,6 +105,64 @@ public Map> assign(final List return consumerToEntries; } + private int assignChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata, Consumer consumer, + Map> consumerToEntries, int availablePermits) { + final String uuid = metadata.getUuid(); + Consumer consumerForUuid = uuidToConsumer.get(uuid); + if (consumerForUuid == null) { + if (skipChunk(entryAndMetadata, metadata)) { + return availablePermits; + } + consumerForUuid = consumer; + uuidToConsumer.put(uuid, consumerForUuid); + } + + final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits); + if (permits <= 0) { + unassignedMessageProcessor.accept(entryAndMetadata); + return availablePermits; + } + if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) { + // The last chunk is received, we should remove the uuid from the cache. + uuidToConsumer.remove(uuid); + } + + addEntry(consumerToEntries, consumerForUuid, entryAndMetadata); + consumerToPermits.put(consumerForUuid, permits - 1); + if (consumerForUuid == consumer) { + return availablePermits - 1; + } + return availablePermits; + } + + private boolean isChunkedMessage(MessageMetadata metadata) { + return metadata != null && metadata.hasUuid() && metadata.hasChunkId() && metadata.hasNumChunksFromMsg(); + } + + private void addEntry(Map> consumerToEntries, Consumer consumer, + EntryAndMetadata entry) { + consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entry); + } + + private boolean skipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) { + if (metadata.getChunkId() != 0) { + if (subscription != null) { + log.warn("[{}][{}] Skip the message because it is not the first chunk." + + " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", + subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), + metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); + // Directly ack the message. + if (!(subscription instanceof PulsarCompactorSubscription)) { + subscription.acknowledgeMessage(Collections.singletonList( + entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap()); + } + } + entryAndMetadata.release(); + return true; + } + return false; + } + private Consumer getConsumer(final int numConsumers) { for (int i = 0; i < numConsumers; i++) { final Consumer consumer = defaultSelector.get();