From 185c3213cf52f7aa520096192ecc401d3665e0f1 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 4 Mar 2026 16:10:41 +0800 Subject: [PATCH 1/9] Support Bookkeeper Batch Read API --- .../mledger/ManagedLedgerConfig.java | 30 +++++++ .../impl/cache/RangeEntryCacheImpl.java | 4 +- .../mledger/impl/cache/ReadEntryUtils.java | 85 +++++++++++++++++++ .../pulsar/broker/ServiceConfiguration.java | 6 ++ .../pulsar/broker/service/BrokerService.java | 3 + .../client/PulsarMockReadHandle.java | 7 ++ 6 files changed, 134 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 8fbf1cfda0d2b..2be2d76fe9cfa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -91,6 +91,36 @@ public class ManagedLedgerConfig { @Getter @Setter private boolean cacheEvictionByExpectedReadCount = true; + + /** + * Enable batch read API when reading entries from bookkeeper. + * Batch read allows reading multiple entries in a single RPC call, reducing network overhead. + * Note: Batch read is only effective when ensembleSize equals writeQuorumSize (non-striped ledgers). + */ + @Setter + private boolean batchReadEnabled = false; + + /** + * Max size in bytes for batch read requests. If set to 0 or negative, + * uses the netty max frame size (default 5MB). + * Batch read may return fewer entries if total size exceeds this limit. + */ + @Getter + @Setter + private long batchReadMaxSizeBytes = 0; + + /** + * Returns whether batch read is enabled for this managed ledger. + * Batch read is only enabled when both conditions are met: + * 1. batchReadEnabled is set to true + * 2. ensembleSize equals writeQuorumSize (non-striped ledger) + * + * @return true if batch read should be used + */ + public boolean isBatchReadEnabled() { + return ensembleSize == writeQuorumSize && batchReadEnabled; + } + @Getter private long continueCachingAddedEntriesAfterLastActiveCursorLeavesMillis; private int minimumBacklogCursorsForCaching = 0; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 4dc60b6434ff9..1d3bd3ea90d70 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -526,7 +526,9 @@ CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, l private CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, boolean allowRetry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; - CompletableFuture> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) + ManagedLedgerConfig mlConfig = ml.getConfig(); + CompletableFuture> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, + mlConfig.isBatchReadEnabled(), mlConfig.getBatchReadMaxSizeBytes()) .thenApply( ledgerEntries -> { requireNonNull(ml.getName()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 5cf5f053f0ce7..74b4ddfdab3c5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -18,16 +18,28 @@ */ package org.apache.bookkeeper.mledger.impl.cache; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class ReadEntryUtils { + private static final Logger log = LoggerFactory.getLogger(ReadEntryUtils.class); static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, long lastEntry) { + return readAsync(ml, handle, firstEntry, lastEntry, false, 0); + } + + static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, + long lastEntry, boolean batchReadEnabled, long batchReadMaxSize) { if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with // the LAC of that read handle. Specifically, it happens when this method is called by a @@ -49,6 +61,79 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is " + lastConfirmedEntry + " when reading entry " + lastEntry)); } + + int numberOfEntries = (int) (lastEntry - firstEntry + 1); + + // Use batch read for multiple entries when enabled + if (batchReadEnabled && numberOfEntries > 1) { + if (log.isDebugEnabled()) { + log.debug("Using batch read for ledger {} entries {}-{}, maxCount={}, maxSize={}", + handle.getId(), firstEntry, lastEntry, numberOfEntries, batchReadMaxSize); + } + return batchReadWithAutoRefill(handle, firstEntry, lastEntry, numberOfEntries, batchReadMaxSize); + } + return handle.readUnconfirmedAsync(firstEntry, lastEntry); } + + private static CompletableFuture batchReadWithAutoRefill( + ReadHandle handle, long firstEntry, long lastEntry, + int maxCount, long maxSize) { + + return handle.batchReadAsync(firstEntry, maxCount, maxSize) + .exceptionallyCompose(ex -> { + // Fallback to readUnconfirmedAsync if batch read fails + log.warn("Batch read failed for ledger {} entries {}-{}, falling back to regular read: {}", + handle.getId(), firstEntry, lastEntry, ex.getMessage()); + return handle.readUnconfirmedAsync(firstEntry, lastEntry); + }) + .thenCompose(entries -> { + // Collect entries and find the last received entry id in a single pass + List receivedList = new ArrayList<>(); + long lastReceivedEntryId = -1; + for (LedgerEntry e : entries) { + receivedList.add(e); + lastReceivedEntryId = e.getEntryId(); + } + int receivedCount = receivedList.size(); + + // All entries received, return as-is + if (receivedCount >= maxCount) { + return CompletableFuture.completedFuture(entries); + } + + // Partial result: need to read remaining entries + if (receivedCount == 0) { + // Edge case: no entries returned, use regular read + entries.close(); + log.warn("Batch read returned 0 entries for ledger {} entries {}-{}, falling back to regular read", + handle.getId(), firstEntry, lastEntry); + return handle.readUnconfirmedAsync(firstEntry, lastEntry); + } + + // Close the original entries since we've collected them into receivedList + entries.close(); + + long nextEntryId = lastReceivedEntryId + 1; + int remainingCount = (int) (lastEntry - nextEntryId + 1); + + if (log.isDebugEnabled()) { + log.debug("Batch read partial result for ledger {}: received {}/{}, reading remaining {}-{}", + handle.getId(), receivedCount, maxCount, nextEntryId, lastEntry); + } + + // Recursively read remaining entries + return batchReadWithAutoRefill(handle, nextEntryId, lastEntry, remainingCount, maxSize) + .thenApply(remainingEntries -> { + // Combine received and remaining entries + List combined = new ArrayList<>(receivedCount + remainingCount); + combined.addAll(receivedList); + for (LedgerEntry e : remainingEntries) { + combined.add(e); + } + remainingEntries.close(); + return LedgerEntriesImpl.create(combined); + }); + }); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 7e3520dda426b..899b9990d9ced 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2332,6 +2332,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed = true; + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Enable batch read API when reading entries from bookkeeper. " + + "Batch read allows reading multiple entries in a single RPC call, " + + "reducing network overhead for sequential reads.") + private boolean managedLedgerBatchReadEnabled = true; + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'" + " and thus should be set as inactive.\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e4b2f33502378..7817871a54f81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2177,6 +2177,9 @@ public CompletableFuture getManagedLedgerConfig(@NonNull To serviceConfig.isCacheEvictionByMarkDeletedPosition()); managedLedgerConfig.setCacheEvictionByExpectedReadCount(false); } + managedLedgerConfig.setBatchReadEnabled( + serviceConfig.isBookkeeperUseV2WireProtocol() && serviceConfig.isManagedLedgerBatchReadEnabled()); + managedLedgerConfig.setBatchReadMaxSizeBytes(serviceConfig.getMaxMessageSize()); managedLedgerConfig.setMinimumBacklogCursorsForCaching( serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching()); managedLedgerConfig.setMinimumBacklogEntriesForCaching( diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index 62d84dbfc40a0..bd749c9c600c7 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -82,6 +82,13 @@ public CompletableFuture readUnconfirmedAsync(long firstEntry, lo return readAsync(firstEntry, lastEntry); } + @Override + public CompletableFuture batchReadAsync(long firstEntry, int maxCount, long maxSize) { + // Mock implementation: just read the entries without size limit enforcement + long lastEntry = Math.min(firstEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(firstEntry, lastEntry); + } + @Override public CompletableFuture readLastAddConfirmedAsync() { return CompletableFuture.completedFuture(getLastAddConfirmed()); From bfdbea1e3685d32caee171d19d4f724aca1684af Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 4 Mar 2026 16:15:43 +0800 Subject: [PATCH 2/9] Support Bookkeeper Batch Read API --- .../apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 74b4ddfdab3c5..a4d25b16c6f3d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -106,8 +106,8 @@ private static CompletableFuture batchReadWithAutoRefill( if (receivedCount == 0) { // Edge case: no entries returned, use regular read entries.close(); - log.warn("Batch read returned 0 entries for ledger {} entries {}-{}, falling back to regular read", - handle.getId(), firstEntry, lastEntry); + log.warn("Batch read returned 0 entries for ledger {} entries {}-{}, falling back to " + + "regular read", handle.getId(), firstEntry, lastEntry); return handle.readUnconfirmedAsync(firstEntry, lastEntry); } From 2f4be0d25b598539d00bf54df0f1a837122cd627 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 4 Mar 2026 17:56:51 +0800 Subject: [PATCH 3/9] Fix tests --- .../mledger/impl/cache/RangeEntryCacheImplTest.java | 5 +++++ .../apache/bookkeeper/client/PulsarMockBookKeeper.java | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java index 42ae262e8ddb8..7bee8d14017f7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl; @@ -193,6 +194,10 @@ public void testReadFromStorageRetriesWhenHandleClosed() { ManagedLedgerFactoryMBeanImpl mlFactoryMBean = mock(ManagedLedgerFactoryMBeanImpl.class); when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean); ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class); + ManagedLedgerConfig conf = mock(ManagedLedgerConfig.class); + when(conf.isBatchReadEnabled()).thenReturn(false); + when(conf.getBatchReadMaxSizeBytes()).thenReturn((long) 0); + when(mockManagedLedger.getConfig()).thenReturn(conf); ManagedLedgerMBeanImpl mockManagedLedgerMBean = mock(ManagedLedgerMBeanImpl.class); when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean); when(mockManagedLedger.getName()).thenReturn("testManagedLedger"); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index c1f53d0adff1e..f39022ffbd3c7 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -79,6 +79,7 @@ public class PulsarMockBookKeeper extends BookKeeper { final ScheduledExecutorService scheduler; private volatile long defaultAddEntryDelayMillis = 1L; private volatile long defaultReadEntriesDelayMillis = 1L; + private final EnsemblePlacementPolicy defaultPlacementPolicy = new DefaultEnsemblePlacementPolicy(); @Override public ClientConfiguration getConf() { @@ -112,6 +113,13 @@ public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception { scheduler = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("mock-bk-scheduler")); } + public EnsemblePlacementPolicy getPlacementPolicy() { + if (placementPolicy == null) { + return defaultPlacementPolicy; + } + return placementPolicy; + } + @Override public OrderedExecutor getMainWorkerPool() { return orderedExecutor; From a91789117829fe083a65037220c80f4ab271f27b Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 24 Mar 2026 11:52:28 +0800 Subject: [PATCH 4/9] Fix code --- .../mledger/ManagedLedgerConfig.java | 2 +- .../impl/cache/CombinedLedgerEntriesImpl.java | 83 +++++++++++++ .../impl/cache/EntryCacheDisabled.java | 16 ++- .../mledger/impl/cache/ReadEntryUtils.java | 111 +++++++++--------- .../client/PulsarMockLedgerHandle.java | 6 + 5 files changed, 158 insertions(+), 60 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CombinedLedgerEntriesImpl.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 2be2d76fe9cfa..0b43dbeebd6de 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -107,7 +107,7 @@ public class ManagedLedgerConfig { */ @Getter @Setter - private long batchReadMaxSizeBytes = 0; + private int batchReadMaxSizeBytes = 0; /** * Returns whether batch read is enabled for this managed ledger. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CombinedLedgerEntriesImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CombinedLedgerEntriesImpl.java new file mode 100644 index 0000000000000..129c9e7814ba0 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CombinedLedgerEntriesImpl.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import io.netty.util.Recycler; +import java.util.Iterator; +import java.util.List; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; + +public class CombinedLedgerEntriesImpl implements LedgerEntries { + private List entries; + private List ledgerEntries; + private final Recycler.Handle recyclerHandle; + + private CombinedLedgerEntriesImpl(Recycler.Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler RECYCLER = new Recycler<>() { + @Override + protected CombinedLedgerEntriesImpl newObject(Recycler.Handle handle) { + return new CombinedLedgerEntriesImpl(handle); + } + }; + + public static LedgerEntries create(List entries, List ledgerEntries) { + checkArgument(!entries.isEmpty(), "entries for create should not be empty."); + checkArgument(!ledgerEntries.isEmpty(), "ledgerEntries for create should not be empty."); + CombinedLedgerEntriesImpl instance = RECYCLER.get(); + instance.entries = entries; + instance.ledgerEntries = ledgerEntries; + return instance; + } + + private void recycle() { + ledgerEntries.forEach(LedgerEntries::close); + entries = null; + ledgerEntries = null; + recyclerHandle.recycle(this); + } + + @Override + public LedgerEntry getEntry(long entryId) { + checkNotNull(entries, "entries has been recycled"); + long firstId = entries.get(0).getEntryId(); + long lastId = entries.get(entries.size() - 1).getEntryId(); + if (entryId < firstId || entryId > lastId) { + throw new IndexOutOfBoundsException("required index: " + entryId + + " is out of bounds: [ " + firstId + ", " + lastId + " ]."); + } + return entries.get((int) (entryId - firstId)); + } + + @Override + public Iterator iterator() { + checkNotNull(entries, "entries has been recycled"); + return entries.iterator(); + } + + @Override + public void close() { + recycle(); + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index b5a45415a4fe1..99b3c7c14b62b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; @@ -70,8 +71,11 @@ public void clear() { @Override public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync( - ledgerEntries -> { + ManagedLedgerConfig config = ml.getConfig(); + boolean isBatchReadEnabled = config.isBatchReadEnabled(); + int batchReadMaxBytes = config.getBatchReadMaxSizeBytes(); + ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, isBatchReadEnabled, batchReadMaxBytes) + .thenAcceptAsync(ledgerEntries -> { List entries = new ArrayList<>(); long totalSize = 0; try { @@ -99,8 +103,12 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, IntSu @Override public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync( - (ledgerEntries, exception) -> { + ManagedLedgerConfig config = ml.getConfig(); + boolean isBatchReadEnabled = config.isBatchReadEnabled(); + int batchReadMaxBytes = config.getBatchReadMaxSizeBytes(); + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), + isBatchReadEnabled, batchReadMaxBytes) + .whenCompleteAsync((ledgerEntries, exception) -> { if (exception != null) { ml.invalidateLedgerHandle(lh); callback.readEntryFailed(createManagedLedgerException(exception), ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index a4d25b16c6f3d..b3e1c17c9b54a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -19,12 +19,15 @@ package org.apache.bookkeeper.mledger.impl.cache; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; import java.util.concurrent.CompletableFuture; + +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.slf4j.Logger; @@ -39,7 +42,7 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h } static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, - long lastEntry, boolean batchReadEnabled, long batchReadMaxSize) { + long lastEntry, boolean batchReadEnabled, int batchReadMaxSize) { if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with // the LAC of that read handle. Specifically, it happens when this method is called by a @@ -64,76 +67,74 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h int numberOfEntries = (int) (lastEntry - firstEntry + 1); - // Use batch read for multiple entries when enabled - if (batchReadEnabled && numberOfEntries > 1) { + // Use batch read for multiple entries when enabled. + if (batchReadEnabled && numberOfEntries > 1 && batchReadMaxSize > 1 && handle instanceof LedgerHandle lh) { if (log.isDebugEnabled()) { log.debug("Using batch read for ledger {} entries {}-{}, maxCount={}, maxSize={}", handle.getId(), firstEntry, lastEntry, numberOfEntries, batchReadMaxSize); } - return batchReadWithAutoRefill(handle, firstEntry, lastEntry, numberOfEntries, batchReadMaxSize); + return batchReadWithAutoRefill(lh, firstEntry, numberOfEntries, batchReadMaxSize); } return handle.readUnconfirmedAsync(firstEntry, lastEntry); } - private static CompletableFuture batchReadWithAutoRefill( - ReadHandle handle, long firstEntry, long lastEntry, - int maxCount, long maxSize) { + private static CompletableFuture batchReadWithAutoRefill(LedgerHandle lh, long firstEntry, + int maxCount, int maxSize) { + List receivedEntries = new ArrayList<>(maxCount); + List ledgerEntries = new ArrayList<>(4); - return handle.batchReadAsync(firstEntry, maxCount, maxSize) - .exceptionallyCompose(ex -> { - // Fallback to readUnconfirmedAsync if batch read fails - log.warn("Batch read failed for ledger {} entries {}-{}, falling back to regular read: {}", - handle.getId(), firstEntry, lastEntry, ex.getMessage()); - return handle.readUnconfirmedAsync(firstEntry, lastEntry); - }) - .thenCompose(entries -> { - // Collect entries and find the last received entry id in a single pass - List receivedList = new ArrayList<>(); - long lastReceivedEntryId = -1; - for (LedgerEntry e : entries) { - receivedList.add(e); - lastReceivedEntryId = e.getEntryId(); + CompletableFuture future = new CompletableFuture<>(); + batchRead(lh, firstEntry, maxCount, maxCount, maxSize, receivedEntries, ledgerEntries) + .whenComplete((v, t) -> { + if (t != null) { + ledgerEntries.forEach(LedgerEntries::close); + future.completeExceptionally(t); + } else if (receivedEntries.isEmpty()) { + ledgerEntries.forEach(LedgerEntries::close); + future.completeExceptionally(new ManagedLedgerException( + "Batch read returned no entries for ledger " + lh.getId() + + " starting from entry " + firstEntry)); + } else { + future.complete(CombinedLedgerEntriesImpl.create(receivedEntries, ledgerEntries)); } - int receivedCount = receivedList.size(); + }); + return future; + } - // All entries received, return as-is - if (receivedCount >= maxCount) { - return CompletableFuture.completedFuture(entries); - } - // Partial result: need to read remaining entries - if (receivedCount == 0) { - // Edge case: no entries returned, use regular read - entries.close(); - log.warn("Batch read returned 0 entries for ledger {} entries {}-{}, falling back to " - + "regular read", handle.getId(), firstEntry, lastEntry); - return handle.readUnconfirmedAsync(firstEntry, lastEntry); - } + private static CompletableFuture batchRead(LedgerHandle lh, long firstEntry, int maxCount, + int entriesToRead, int maxSize, List receivedEntries, + List ledgerEntries) { + lh.asyncBatchReadUnconfirmedEntries(firstEntry, entriesToRead, maxSize, (rc, lh1, seq, ctx) -> { - // Close the original entries since we've collected them into receivedList - entries.close(); + }, null); - long nextEntryId = lastReceivedEntryId + 1; - int remainingCount = (int) (lastEntry - nextEntryId + 1); - if (log.isDebugEnabled()) { - log.debug("Batch read partial result for ledger {}: received {}/{}, reading remaining {}-{}", - handle.getId(), receivedCount, maxCount, nextEntryId, lastEntry); + return lh.batchReadAsync(firstEntry, entriesToRead, maxSize) + .thenCompose(entries -> { + long lastReceivedEntry = -1; + int prevReceivedCount = receivedEntries.size(); + for (LedgerEntry entry : entries) { + receivedEntries.add(entry); + lastReceivedEntry = entry.getEntryId(); } - - // Recursively read remaining entries - return batchReadWithAutoRefill(handle, nextEntryId, lastEntry, remainingCount, maxSize) - .thenApply(remainingEntries -> { - // Combine received and remaining entries - List combined = new ArrayList<>(receivedCount + remainingCount); - combined.addAll(receivedList); - for (LedgerEntry e : remainingEntries) { - combined.add(e); - } - remainingEntries.close(); - return LedgerEntriesImpl.create(combined); - }); + // Add LedgerEntries, it needs recycle. + ledgerEntries.add(entries); + int currentReceivedCount = receivedEntries.size(); + // Return if we have enough entries or no more entries available + if (currentReceivedCount >= maxCount) { + return CompletableFuture.completedFuture(null); + } + // If no entries returned, we've reached the end of available entries + if (prevReceivedCount == currentReceivedCount) { + return CompletableFuture.completedFuture(null); + } + // If it still has more entries to read. + long nextReadEntry = lastReceivedEntry + 1; + int nextRoundEntriesToRead = maxCount - currentReceivedCount; + return batchRead(lh, nextReadEntry, maxCount, nextRoundEntriesToRead, maxSize, + receivedEntries, ledgerEntries); }); } } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 74dab3ebbfeb3..32f94ea54e800 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -274,6 +274,12 @@ public CompletableFuture readLastAddConfirmedAndEntryAsyn return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + + } + + private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd) { List ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble()); return LedgerMetadataBuilder.create() From e01bfffb4aa99c24cb2a0b05a03f5c3c085da070 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 4 Apr 2026 03:44:16 +0800 Subject: [PATCH 5/9] fix code --- ...l.java => CompositeLedgerEntriesImpl.java} | 14 +- .../mledger/impl/cache/ReadEntryUtils.java | 12 +- .../cache/CompositeLedgerEntriesImplTest.java | 203 +++++++++++++++ .../impl/cache/RangeEntryCacheImplTest.java | 2 +- .../impl/cache/ReadEntryUtilsTest.java | 237 ++++++++++++++++++ .../pulsar/broker/ServiceConfiguration.java | 2 +- .../client/PulsarMockLedgerHandle.java | 2 +- 7 files changed, 452 insertions(+), 20 deletions(-) rename managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/{CombinedLedgerEntriesImpl.java => CompositeLedgerEntriesImpl.java} (82%) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CombinedLedgerEntriesImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java similarity index 82% rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CombinedLedgerEntriesImpl.java rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java index 129c9e7814ba0..a5b397f96b938 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CombinedLedgerEntriesImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java @@ -26,26 +26,26 @@ import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; -public class CombinedLedgerEntriesImpl implements LedgerEntries { +public class CompositeLedgerEntriesImpl implements LedgerEntries { private List entries; private List ledgerEntries; - private final Recycler.Handle recyclerHandle; + private final Recycler.Handle recyclerHandle; - private CombinedLedgerEntriesImpl(Recycler.Handle recyclerHandle) { + private CompositeLedgerEntriesImpl(Recycler.Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } - private static final Recycler RECYCLER = new Recycler<>() { + private static final Recycler RECYCLER = new Recycler<>() { @Override - protected CombinedLedgerEntriesImpl newObject(Recycler.Handle handle) { - return new CombinedLedgerEntriesImpl(handle); + protected CompositeLedgerEntriesImpl newObject(Recycler.Handle handle) { + return new CompositeLedgerEntriesImpl(handle); } }; public static LedgerEntries create(List entries, List ledgerEntries) { checkArgument(!entries.isEmpty(), "entries for create should not be empty."); checkArgument(!ledgerEntries.isEmpty(), "ledgerEntries for create should not be empty."); - CombinedLedgerEntriesImpl instance = RECYCLER.get(); + CompositeLedgerEntriesImpl instance = RECYCLER.get(); instance.entries = entries; instance.ledgerEntries = ledgerEntries; return instance; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index b3e1c17c9b54a..5124f4edf7039 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -19,11 +19,8 @@ package org.apache.bookkeeper.mledger.impl.cache; import java.util.ArrayList; -import java.util.Enumeration; import java.util.List; import java.util.concurrent.CompletableFuture; - -import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -68,7 +65,7 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h int numberOfEntries = (int) (lastEntry - firstEntry + 1); // Use batch read for multiple entries when enabled. - if (batchReadEnabled && numberOfEntries > 1 && batchReadMaxSize > 1 && handle instanceof LedgerHandle lh) { + if (batchReadEnabled && numberOfEntries > 1 && batchReadMaxSize > 0 && handle instanceof LedgerHandle lh) { if (log.isDebugEnabled()) { log.debug("Using batch read for ledger {} entries {}-{}, maxCount={}, maxSize={}", handle.getId(), firstEntry, lastEntry, numberOfEntries, batchReadMaxSize); @@ -96,7 +93,7 @@ private static CompletableFuture batchReadWithAutoRefill(LedgerHa "Batch read returned no entries for ledger " + lh.getId() + " starting from entry " + firstEntry)); } else { - future.complete(CombinedLedgerEntriesImpl.create(receivedEntries, ledgerEntries)); + future.complete(CompositeLedgerEntriesImpl.create(receivedEntries, ledgerEntries)); } }); return future; @@ -106,11 +103,6 @@ private static CompletableFuture batchReadWithAutoRefill(LedgerHa private static CompletableFuture batchRead(LedgerHandle lh, long firstEntry, int maxCount, int entriesToRead, int maxSize, List receivedEntries, List ledgerEntries) { - lh.asyncBatchReadUnconfirmedEntries(firstEntry, entriesToRead, maxSize, (rc, lh1, seq, ctx) -> { - - }, null); - - return lh.batchReadAsync(firstEntry, entriesToRead, maxSize) .thenCompose(entries -> { long lastReceivedEntry = -1; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java new file mode 100644 index 0000000000000..8e74a0612ac59 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.testng.annotations.Test; + +public class CompositeLedgerEntriesImplTest { + + private LedgerEntryImpl createEntry(long ledgerId, long entryId, byte[] data) { + return LedgerEntryImpl.create(ledgerId, entryId, data.length, Unpooled.wrappedBuffer(data)); + } + + @Test + public void testCreateAndIterate() { + LedgerEntryImpl e0 = createEntry(1L, 0L, new byte[]{0}); + LedgerEntryImpl e1 = createEntry(1L, 1L, new byte[]{1}); + LedgerEntryImpl e2 = createEntry(1L, 2L, new byte[]{2}); + + List entries = new ArrayList<>(); + entries.add(e0); + entries.add(e1); + entries.add(e2); + + List containers = new ArrayList<>(); + // Wrap in a simple LedgerEntries mock-like container using a real impl + // For test, we use a list-based approach + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + // Verify iteration + Iterator it = combined.iterator(); + assertThat(it.hasNext()).isTrue(); + assertThat(it.next().getEntryId()).isEqualTo(0L); + assertThat(it.next().getEntryId()).isEqualTo(1L); + assertThat(it.next().getEntryId()).isEqualTo(2L); + assertThat(it.hasNext()).isFalse(); + + combined.close(); + } + + @Test + public void testGetEntry() { + LedgerEntryImpl e0 = createEntry(1L, 5L, new byte[]{0}); + LedgerEntryImpl e1 = createEntry(1L, 6L, new byte[]{1}); + LedgerEntryImpl e2 = createEntry(1L, 7L, new byte[]{2}); + + List entries = new ArrayList<>(); + entries.add(e0); + entries.add(e1); + entries.add(e2); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + assertThat(combined.getEntry(5L).getEntryId()).isEqualTo(5L); + assertThat(combined.getEntry(6L).getEntryId()).isEqualTo(6L); + assertThat(combined.getEntry(7L).getEntryId()).isEqualTo(7L); + + combined.close(); + } + + @Test + public void testGetEntryOutOfRange() { + LedgerEntryImpl e0 = createEntry(1L, 0L, new byte[]{0}); + LedgerEntryImpl e1 = createEntry(1L, 1L, new byte[]{1}); + + List entries = new ArrayList<>(); + entries.add(e0); + entries.add(e1); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + // Out of lower bound + assertThatThrownBy(() -> combined.getEntry(-1L)) + .isInstanceOf(IndexOutOfBoundsException.class); + // Out of upper bound + assertThatThrownBy(() -> combined.getEntry(2L)) + .isInstanceOf(IndexOutOfBoundsException.class); + + combined.close(); + } + + @Test + public void testCloseAndRecycle() { + LedgerEntryImpl e0 = createEntry(1L, 0L, new byte[]{0}); + + List entries = new ArrayList<>(); + entries.add(e0); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + // Should be usable before close + assertThat(combined.iterator().hasNext()).isTrue(); + + combined.close(); + + // After close, iterator should throw + assertThatThrownBy(combined::iterator) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void testMultipleLedgerEntriesContainers() { + // Simulate entries from 2 separate batch reads + LedgerEntryImpl e0 = createEntry(1L, 0L, new byte[]{0}); + LedgerEntryImpl e1 = createEntry(1L, 1L, new byte[]{1}); + LedgerEntryImpl e2 = createEntry(1L, 2L, new byte[]{2}); + LedgerEntryImpl e3 = createEntry(1L, 3L, new byte[]{3}); + + List batch1 = new ArrayList<>(); + batch1.add(e0); + batch1.add(e1); + + List batch2 = new ArrayList<>(); + batch2.add(e2); + batch2.add(e3); + + List allEntries = new ArrayList<>(); + allEntries.addAll(batch1); + allEntries.addAll(batch2); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(batch1)); + containers.add(new MockLedgerEntries(batch2)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + allEntries, containers); + + // Verify all entries accessible + assertThat(combined.getEntry(0L).getEntryId()).isEqualTo(0L); + assertThat(combined.getEntry(1L).getEntryId()).isEqualTo(1L); + assertThat(combined.getEntry(2L).getEntryId()).isEqualTo(2L); + assertThat(combined.getEntry(3L).getEntryId()).isEqualTo(3L); + + combined.close(); + } + + /** + * Simple LedgerEntries implementation for testing. + */ + private static class MockLedgerEntries implements LedgerEntries { + private final List entries; + private boolean closed = false; + + MockLedgerEntries(List entries) { + this.entries = entries; + } + + @Override + public LedgerEntry getEntry(long entryId) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return entries.iterator(); + } + + @Override + public void close() { + closed = true; + entries.forEach(LedgerEntry::close); + } + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java index 7bee8d14017f7..ad576815c989d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java @@ -196,7 +196,7 @@ public void testReadFromStorageRetriesWhenHandleClosed() { ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class); ManagedLedgerConfig conf = mock(ManagedLedgerConfig.class); when(conf.isBatchReadEnabled()).thenReturn(false); - when(conf.getBatchReadMaxSizeBytes()).thenReturn((long) 0); + when(conf.getBatchReadMaxSizeBytes()).thenReturn(0); when(mockManagedLedger.getConfig()).thenReturn(conf); ManagedLedgerMBeanImpl mockManagedLedgerMBean = mock(ManagedLedgerMBeanImpl.class); when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java new file mode 100644 index 0000000000000..434deb554bb83 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ReadEntryUtilsTest { + + private ManagedLedger ml; + private LedgerHandle lh; + + @BeforeMethod + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setup() { + ml = mock(ManagedLedger.class); + lh = mock(LedgerHandle.class); + when(lh.getId()).thenReturn(1L); + Position lastConfirmedEntry = PositionFactory.create(1L, 99L); + when(ml.getLastConfirmedEntry()).thenReturn(lastConfirmedEntry); + // Return non-empty Optional to take the normal managed ledger path + when(ml.getOptionalLedgerInfo(1L)).thenReturn((Optional) Optional.of(new Object())); + } + + @Test + public void testBatchReadSingleBatch() { + LedgerEntries batchResult = createLedgerEntries(1L, 0, 1, 2, 3, 4); + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batchResult)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompleted(); + LedgerEntries result = future.getNow(null); + try { + List entryIds = new ArrayList<>(); + for (LedgerEntry e : result) { + entryIds.add(e.getEntryId()); + } + assertThat(entryIds).containsExactly(0L, 1L, 2L, 3L, 4L); + } finally { + result.close(); + } + + verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + + @Test + public void testBatchReadMultipleBatches() { + // First batch returns entries 0-2, second returns 3-4 + LedgerEntries batch1 = createLedgerEntries(1L, 0, 1, 2); + LedgerEntries batch2 = createLedgerEntries(1L, 3, 4); + + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batch1)); + when(lh.batchReadAsync(eq(3L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batch2)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompleted(); + LedgerEntries result = future.getNow(null); + try { + List entryIds = new ArrayList<>(); + for (LedgerEntry e : result) { + entryIds.add(e.getEntryId()); + } + assertThat(entryIds).containsExactly(0L, 1L, 2L, 3L, 4L); + } finally { + result.close(); + } + } + + @Test + public void testBatchReadEmptyResult() { + LedgerEntries emptyBatch = createLedgerEntries(1L); + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(emptyBatch)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + } + + @Test + public void testBatchReadFailure() { + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK read failed"))); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + } + + @Test + public void testBatchReadDisabledFallback() { + LedgerEntries mockEntries = createLedgerEntries(1L, 0, 1, 2, 3, 4); + when(lh.readUnconfirmedAsync(0L, 4L)) + .thenReturn(CompletableFuture.completedFuture(mockEntries)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, false, 1024); + + assertThat(future).isCompleted(); + verify(lh).readUnconfirmedAsync(0L, 4L); + verify(lh, never()).batchReadAsync(anyLong(), anyInt(), anyLong()); + + future.getNow(null).close(); + } + + @Test + public void testBatchReadSingleEntryFallback() { + LedgerEntries mockEntries = createLedgerEntries(1L, 0); + when(lh.readUnconfirmedAsync(0L, 0L)) + .thenReturn(CompletableFuture.completedFuture(mockEntries)); + + // Single entry reads should use readUnconfirmedAsync even with batch read enabled + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 0L, true, 1024); + + assertThat(future).isCompleted(); + verify(lh).readUnconfirmedAsync(0L, 0L); + verify(lh, never()).batchReadAsync(anyLong(), anyInt(), anyLong()); + + future.getNow(null).close(); + } + + @Test + public void testBatchReadWithNonLedgerHandle() { + ReadHandle rh = mock(ReadHandle.class); + when(rh.getId()).thenReturn(1L); + LedgerEntries mockEntries = createLedgerEntries(1L, 0, 1, 2); + when(rh.readUnconfirmedAsync(0L, 2L)) + .thenReturn(CompletableFuture.completedFuture(mockEntries)); + + // ReadHandle (not LedgerHandle) should use readUnconfirmedAsync even with batch read enabled + CompletableFuture future = + ReadEntryUtils.readAsync(ml, rh, 0L, 2L, true, 1024); + + assertThat(future).isCompleted(); + verify(rh).readUnconfirmedAsync(0L, 2L); + + future.getNow(null).close(); + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testReadOnlyManagedLedgerFallback() { + when(ml.getOptionalLedgerInfo(1L)).thenReturn((Optional) Optional.empty()); + + ReadHandle rh = mock(ReadHandle.class); + when(rh.getId()).thenReturn(1L); + LedgerEntries mockEntries = createLedgerEntries(1L, 0, 1); + when(rh.readAsync(0L, 1L)).thenReturn(CompletableFuture.completedFuture(mockEntries)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, rh, 0L, 1L, true, 1024); + + assertThat(future).isCompleted(); + verify(rh).readAsync(0L, 1L); + + future.getNow(null).close(); + } + + // --- helper --- + + private static LedgerEntries createLedgerEntries(long ledgerId, long... entryIds) { + List entries = new ArrayList<>(); + for (long entryId : entryIds) { + entries.add(LedgerEntryImpl.create(ledgerId, entryId, 1, + Unpooled.wrappedBuffer(new byte[]{(byte) entryId}))); + } + return new LedgerEntries() { + @Override + public LedgerEntry getEntry(long eid) { + for (LedgerEntry e : entries) { + if (e.getEntryId() == eid) { + return e; + } + } + throw new IndexOutOfBoundsException("Entry " + eid + " not found"); + } + + @Override + public Iterator iterator() { + return entries.iterator(); + } + + @Override + public void close() { + entries.forEach(LedgerEntry::close); + } + }; + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0642c30d389ff..659d6d65a2d7c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2336,7 +2336,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "Enable batch read API when reading entries from bookkeeper. " + "Batch read allows reading multiple entries in a single RPC call, " + "reducing network overhead for sequential reads.") - private boolean managedLedgerBatchReadEnabled = true; + private boolean managedLedgerBatchReadEnabled = false; @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'" diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 32f94ea54e800..58a3e8949eba5 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -276,7 +276,7 @@ public CompletableFuture readLastAddConfirmedAndEntryAsyn @Override public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { - + return readHandle.batchReadAsync(startEntry, maxCount, maxSize); } From 026dfc2c7219c5b18cea5f352686dfdf14e248ed Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 4 Apr 2026 15:32:19 +0800 Subject: [PATCH 6/9] fix code --- .../cache/CompositeLedgerEntriesImpl.java | 8 +- .../mledger/impl/cache/ReadEntryUtils.java | 18 +- .../cache/CompositeLedgerEntriesImplTest.java | 29 +++ .../impl/cache/RangeEntryCacheImplTest.java | 87 +++++++++ .../impl/cache/ReadEntryUtilsTest.java | 169 +++++++++++++++++- .../client/PulsarMockReadHandle.java | 19 +- 6 files changed, 325 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java index a5b397f96b938..693a34e34ada4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java @@ -67,7 +67,13 @@ public LedgerEntry getEntry(long entryId) { throw new IndexOutOfBoundsException("required index: " + entryId + " is out of bounds: [ " + firstId + ", " + lastId + " ]."); } - return entries.get((int) (entryId - firstId)); + int index = (int) (entryId - firstId); + LedgerEntry entry = entries.get(index); + if (entry.getEntryId() != entryId) { + throw new IllegalStateException("Non-contiguous entries detected: expected entryId " + + entryId + " at index " + index + " but found entryId " + entry.getEntryId()); + } + return entry; } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 5124f4edf7039..26547b5e41a78 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -86,7 +86,23 @@ private static CompletableFuture batchReadWithAutoRefill(LedgerHa .whenComplete((v, t) -> { if (t != null) { ledgerEntries.forEach(LedgerEntries::close); - future.completeExceptionally(t); + if (receivedEntries.isEmpty()) { + long lastEntry = firstEntry + maxCount - 1; + // First batch failed with no data received, fall back to readUnconfirmedAsync + log.warn("Batch read failed for ledger {} entries {}-{}, falling back to readUnconfirmed", + lh.getId(), firstEntry, lastEntry, t); + lh.readUnconfirmedAsync(firstEntry, lastEntry) + .whenComplete((result, fallbackError) -> { + if (fallbackError != null) { + future.completeExceptionally(fallbackError); + } else { + future.complete(result); + } + }); + } else { + // Partial data received before failure, propagate the error + future.completeExceptionally(t); + } } else if (receivedEntries.isEmpty()) { ledgerEntries.forEach(LedgerEntries::close); future.completeExceptionally(new ManagedLedgerException( diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java index 8e74a0612ac59..3bad19c04e5d9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImplTest.java @@ -173,6 +173,35 @@ public void testMultipleLedgerEntriesContainers() { combined.close(); } + @Test + public void testGetEntryWithNonContiguousEntries() { + // Create entries with a gap: IDs 5, 6, 8 (missing 7) + LedgerEntryImpl e5 = createEntry(1L, 5L, new byte[]{0}); + LedgerEntryImpl e6 = createEntry(1L, 6L, new byte[]{1}); + LedgerEntryImpl e8 = createEntry(1L, 8L, new byte[]{2}); + + List entries = new ArrayList<>(); + entries.add(e5); + entries.add(e6); + entries.add(e8); + + List containers = new ArrayList<>(); + containers.add(new MockLedgerEntries(entries)); + + CompositeLedgerEntriesImpl combined = (CompositeLedgerEntriesImpl) CompositeLedgerEntriesImpl.create( + entries, containers); + + // Valid entries should still work + assertThat(combined.getEntry(5L).getEntryId()).isEqualTo(5L); + assertThat(combined.getEntry(6L).getEntryId()).isEqualTo(6L); + // Entry 7 computes index 2 (7-5=2), but entries.get(2) has ID 8, not 7 — should throw + assertThatThrownBy(() -> combined.getEntry(7L)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Non-contiguous"); + + combined.close(); + } + /** * Simple LedgerEntries implementation for testing. */ diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java index ad576815c989d..5ee427f0aac32 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java @@ -20,20 +20,24 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.buffer.Unpooled; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntSupplier; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -42,6 +46,8 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -244,6 +250,87 @@ public void testReadFromStorageRetriesWhenHandleClosed() { assertThat(readAttempts.get()).isEqualTo(2); } + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testReadFromStorageWithBatchReadEnabled() { + RangeEntryCacheManagerImpl mockEntryCacheManager = mock(RangeEntryCacheManagerImpl.class); + ManagedLedgerFactoryMBeanImpl mlFactoryMBean = mock(ManagedLedgerFactoryMBeanImpl.class); + when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean); + + ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class); + ManagedLedgerConfig conf = mock(ManagedLedgerConfig.class); + when(conf.isBatchReadEnabled()).thenReturn(true); + when(conf.getBatchReadMaxSizeBytes()).thenReturn(1024 * 1024); + when(mockManagedLedger.getConfig()).thenReturn(conf); + + ManagedLedgerMBeanImpl mockManagedLedgerMBean = mock(ManagedLedgerMBeanImpl.class); + when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean); + when(mockManagedLedger.getName()).thenReturn("testManagedLedger"); + when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class)); + Position lastConfirmedEntry = PositionFactory.create(1L, 99L); + when(mockManagedLedger.getLastConfirmedEntry()).thenReturn(lastConfirmedEntry); + when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn((Optional) Optional.of(new Object())); + + RangeCacheRemovalQueue mockRangeCacheRemovalQueue = mock(RangeCacheRemovalQueue.class); + when(mockRangeCacheRemovalQueue.addEntry(any())).thenReturn(true); + InflightReadsLimiter inflightReadsLimiter = mock(InflightReadsLimiter.class); + when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter); + doAnswer(invocation -> { + long permits = invocation.getArgument(0); + InflightReadsLimiter.Handle handle = + new InflightReadsLimiter.Handle(permits, System.currentTimeMillis(), true); + return Optional.of(handle); + }).when(inflightReadsLimiter).acquire(anyLong(), any()); + + RangeEntryCacheImpl cache = new RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false, + mockRangeCacheRemovalQueue, EntryLengthFunction.DEFAULT, mock(PendingReadsManager.class)); + + // Use LedgerHandle mock (not ReadHandle) so instanceof LedgerHandle check passes + LedgerHandle ledgerHandle = mock(LedgerHandle.class); + when(ledgerHandle.getId()).thenReturn(1L); + + // Create test entries for batch read + List entryList = new ArrayList<>(); + for (long i = 0; i <= 4; i++) { + entryList.add(LedgerEntryImpl.create(1L, i, 1, Unpooled.wrappedBuffer(new byte[]{(byte) i}))); + } + LedgerEntries batchResult = new LedgerEntries() { + @Override + public LedgerEntry getEntry(long eid) { + for (LedgerEntry e : entryList) { + if (e.getEntryId() == eid) { + return e; + } + } + throw new IndexOutOfBoundsException("Entry " + eid + " not found"); + } + + @Override + public Iterator iterator() { + return entryList.iterator(); + } + + @Override + public void close() { + entryList.forEach(LedgerEntry::close); + } + }; + + when(ledgerHandle.batchReadAsync(eq(0L), eq(5), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batchResult)); + + CompletableFuture> future = cache.readFromStorage(ledgerHandle, 0L, 4L, () -> 1); + + assertThat(future).isCompleted(); + List entries = future.getNow(null); + assertThat(entries).hasSize(5); + for (int i = 0; i < 5; i++) { + assertThat(entries.get(i).getEntryId()).isEqualTo(i); + } + // Verify batch read was used, not readUnconfirmedAsync + verify(ledgerHandle, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + private void performReadAndValidateResult() { CompletableFuture> future = new CompletableFuture<>(); rangeEntryCache.asyncReadEntry(lh, 0, 99, expectedReadCount, new AsyncCallbacks.ReadEntriesCallback() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java index 434deb554bb83..c97a5da67a1c2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl.cache; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -32,6 +33,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -127,6 +129,9 @@ public void testBatchReadEmptyResult() { public void testBatchReadFailure() { when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK read failed"))); + // Fallback also fails to verify the exception propagates + when(lh.readUnconfirmedAsync(0L, 4L)) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK read failed"))); CompletableFuture future = ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); @@ -204,7 +209,155 @@ public void testReadOnlyManagedLedgerFallback() { future.getNow(null).close(); } - // --- helper --- + @Test + public void testAutoRefillWithSizeLimitedReturns() { + // Simulate: first batch returns only entries 0-1 (size-limited), + // second batch returns entries 2-4 to complete the read + LedgerEntries batch1 = createLedgerEntriesWithSizes(1L, + new long[]{0, 1}, new int[]{256, 256}); + LedgerEntries batch2 = createLedgerEntriesWithSizes(1L, + new long[]{2, 3, 4}, new int[]{128, 128, 128}); + + when(lh.batchReadAsync(eq(0L), eq(5), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batch1)); + when(lh.batchReadAsync(eq(2L), eq(3), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batch2)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompleted(); + LedgerEntries result = future.getNow(null); + try { + List entryIds = new ArrayList<>(); + for (LedgerEntry e : result) { + entryIds.add(e.getEntryId()); + } + assertThat(entryIds).containsExactly(0L, 1L, 2L, 3L, 4L); + } finally { + result.close(); + } + } + + @Test + public void testBatchReadFailureFallsBackToReadUnconfirmed() { + // First batch read fails + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK batch read failed"))); + // Fallback succeeds + LedgerEntries fallbackResult = createLedgerEntries(1L, 0, 1, 2, 3, 4); + when(lh.readUnconfirmedAsync(0L, 4L)) + .thenReturn(CompletableFuture.completedFuture(fallbackResult)); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompleted(); + LedgerEntries result = future.getNow(null); + try { + List entryIds = new ArrayList<>(); + for (LedgerEntry e : result) { + entryIds.add(e.getEntryId()); + } + assertThat(entryIds).containsExactly(0L, 1L, 2L, 3L, 4L); + } finally { + result.close(); + } + verify(lh).readUnconfirmedAsync(0L, 4L); + } + + @Test + public void testBatchReadFailureWithPartialDataDoesNotFallback() { + // First batch succeeds with entries 0-2 + LedgerEntries batch1 = createLedgerEntries(1L, 0, 1, 2); + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batch1)); + // Second batch fails + when(lh.batchReadAsync(eq(3L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Second batch failed"))); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + // Should fail without falling back to readUnconfirmedAsync + assertThat(future).isCompletedExceptionally(); + verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + + @Test + public void testBatchReadFailureFallbackAlsoFails() { + // Batch read fails + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK batch read failed"))); + // Fallback also fails + when(lh.readUnconfirmedAsync(0L, 4L)) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("readUnconfirmed also failed"))); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get) + .hasCauseInstanceOf(RuntimeException.class) + .hasRootCauseMessage("readUnconfirmed also failed"); + } + + @Test + public void testBatchReadMidBatchFailureCleansUpResources() { + // Track whether the first batch's LedgerEntries is closed + AtomicBoolean batch1Closed = new AtomicBoolean(false); + LedgerEntries batch1Inner = createLedgerEntries(1L, 0, 1, 2); + LedgerEntries trackedBatch1 = new LedgerEntries() { + @Override + public LedgerEntry getEntry(long eid) { + return batch1Inner.getEntry(eid); + } + + @Override + public Iterator iterator() { + return batch1Inner.iterator(); + } + + @Override + public void close() { + batch1Closed.set(true); + batch1Inner.close(); + } + }; + + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(trackedBatch1)); + when(lh.batchReadAsync(eq(3L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Second batch failed"))); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + // Verify first batch resources were cleaned up + assertThat(batch1Closed.get()).isTrue(); + // Verify no fallback since partial data was received + verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); + } + + @Test + public void testBatchReadMidBatchFailurePreservesOriginalException() { + LedgerEntries batch1 = createLedgerEntries(1L, 0, 1, 2); + when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(batch1)); + when(lh.batchReadAsync(eq(3L), anyInt(), anyLong())) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Second batch read failed"))); + + CompletableFuture future = + ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); + + assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get) + .hasCauseInstanceOf(RuntimeException.class) + .hasRootCauseMessage("Second batch read failed"); + } + + // --- helpers --- private static LedgerEntries createLedgerEntries(long ledgerId, long... entryIds) { List entries = new ArrayList<>(); @@ -212,6 +365,20 @@ private static LedgerEntries createLedgerEntries(long ledgerId, long... entryIds entries.add(LedgerEntryImpl.create(ledgerId, entryId, 1, Unpooled.wrappedBuffer(new byte[]{(byte) entryId}))); } + return wrapLedgerEntries(entries); + } + + private static LedgerEntries createLedgerEntriesWithSizes(long ledgerId, long[] entryIds, int[] sizes) { + List entries = new ArrayList<>(); + for (int i = 0; i < entryIds.length; i++) { + byte[] data = new byte[sizes[i]]; + entries.add(LedgerEntryImpl.create(ledgerId, entryIds[i], sizes[i], + Unpooled.wrappedBuffer(data))); + } + return wrapLedgerEntries(entries); + } + + private static LedgerEntries wrapLedgerEntries(List entries) { return new LedgerEntries() { @Override public LedgerEntry getEntry(long eid) { diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index bd749c9c600c7..55ee7bec04c7d 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -84,8 +84,23 @@ public CompletableFuture readUnconfirmedAsync(long firstEntry, lo @Override public CompletableFuture batchReadAsync(long firstEntry, int maxCount, long maxSize) { - // Mock implementation: just read the entries without size limit enforcement - long lastEntry = Math.min(firstEntry + maxCount - 1, getLastAddConfirmed()); + long lastEntryByCount = Math.min(firstEntry + maxCount - 1, getLastAddConfirmed()); + if (lastEntryByCount < firstEntry) { + return readAsync(firstEntry, firstEntry - 1); + } + long accumulatedSize = 0; + long lastEntry = firstEntry - 1; + for (long eid = firstEntry; eid <= lastEntryByCount; eid++) { + long entrySize = entries.get((int) eid).getLength(); + if (accumulatedSize > 0 && accumulatedSize + entrySize > maxSize) { + break; + } + accumulatedSize += entrySize; + lastEntry = eid; + } + if (lastEntry < firstEntry) { + lastEntry = firstEntry; + } return readAsync(firstEntry, lastEntry); } From 136b23b1ac10d0837151540c736815ce7df73ef1 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 4 Apr 2026 15:59:27 +0800 Subject: [PATCH 7/9] fix code --- .../cache/CompositeLedgerEntriesImpl.java | 3 + .../mledger/impl/cache/ReadEntryUtils.java | 102 +++++++++--------- 2 files changed, 53 insertions(+), 52 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java index 693a34e34ada4..3df6f1c639fa4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/CompositeLedgerEntriesImpl.java @@ -52,6 +52,9 @@ public static LedgerEntries create(List entries, List readAsync(ManagedLedger ml, ReadHandle h private static CompletableFuture batchReadWithAutoRefill(LedgerHandle lh, long firstEntry, int maxCount, int maxSize) { + CompletableFuture future = new CompletableFuture<>(); List receivedEntries = new ArrayList<>(maxCount); List ledgerEntries = new ArrayList<>(4); - - CompletableFuture future = new CompletableFuture<>(); - batchRead(lh, firstEntry, maxCount, maxCount, maxSize, receivedEntries, ledgerEntries) - .whenComplete((v, t) -> { - if (t != null) { - ledgerEntries.forEach(LedgerEntries::close); - if (receivedEntries.isEmpty()) { - long lastEntry = firstEntry + maxCount - 1; - // First batch failed with no data received, fall back to readUnconfirmedAsync - log.warn("Batch read failed for ledger {} entries {}-{}, falling back to readUnconfirmed", - lh.getId(), firstEntry, lastEntry, t); - lh.readUnconfirmedAsync(firstEntry, lastEntry) - .whenComplete((result, fallbackError) -> { - if (fallbackError != null) { - future.completeExceptionally(fallbackError); - } else { - future.complete(result); - } - }); - } else { - // Partial data received before failure, propagate the error - future.completeExceptionally(t); - } - } else if (receivedEntries.isEmpty()) { - ledgerEntries.forEach(LedgerEntries::close); - future.completeExceptionally(new ManagedLedgerException( - "Batch read returned no entries for ledger " + lh.getId() - + " starting from entry " + firstEntry)); - } else { - future.complete(CompositeLedgerEntriesImpl.create(receivedEntries, ledgerEntries)); - } - }); + doBatchRead(lh, firstEntry, maxCount, maxSize, receivedEntries, ledgerEntries, future); return future; } - - private static CompletableFuture batchRead(LedgerHandle lh, long firstEntry, int maxCount, - int entriesToRead, int maxSize, List receivedEntries, - List ledgerEntries) { - return lh.batchReadAsync(firstEntry, entriesToRead, maxSize) - .thenCompose(entries -> { + private static void doBatchRead(LedgerHandle lh, long firstEntry, int maxCount, int maxSize, + List receivedEntries, List ledgerEntries, + CompletableFuture future) { + lh.batchReadAsync(firstEntry, maxCount - receivedEntries.size(), maxSize) + .whenComplete((entries, throwable) -> { + if (throwable != null) { + onBatchReadComplete(lh, firstEntry, maxCount, receivedEntries, ledgerEntries, future, + throwable); + return; + } long lastReceivedEntry = -1; int prevReceivedCount = receivedEntries.size(); for (LedgerEntry entry : entries) { receivedEntries.add(entry); lastReceivedEntry = entry.getEntryId(); } - // Add LedgerEntries, it needs recycle. ledgerEntries.add(entries); - int currentReceivedCount = receivedEntries.size(); - // Return if we have enough entries or no more entries available - if (currentReceivedCount >= maxCount) { - return CompletableFuture.completedFuture(null); + if (receivedEntries.size() >= maxCount || prevReceivedCount == receivedEntries.size()) { + onBatchReadComplete(lh, firstEntry, maxCount, receivedEntries, ledgerEntries, future, null); + return; } - // If no entries returned, we've reached the end of available entries - if (prevReceivedCount == currentReceivedCount) { - return CompletableFuture.completedFuture(null); - } - // If it still has more entries to read. - long nextReadEntry = lastReceivedEntry + 1; - int nextRoundEntriesToRead = maxCount - currentReceivedCount; - return batchRead(lh, nextReadEntry, maxCount, nextRoundEntriesToRead, maxSize, - receivedEntries, ledgerEntries); + doBatchRead(lh, lastReceivedEntry + 1, maxCount, maxSize, + receivedEntries, ledgerEntries, future); }); } + + private static void onBatchReadComplete(LedgerHandle lh, long firstEntry, int maxCount, + List receivedEntries, List ledgerEntries, + CompletableFuture future, Throwable error) { + if (error != null && receivedEntries.isEmpty()) { + ledgerEntries.forEach(LedgerEntries::close); + long lastEntry = firstEntry + maxCount - 1; + log.warn("Batch read failed for ledger {} entries {}-{}, falling back to readUnconfirmed", + lh.getId(), firstEntry, lastEntry, error); + lh.readUnconfirmedAsync(firstEntry, lastEntry) + .whenComplete((result, fallbackError) -> { + if (fallbackError != null) { + future.completeExceptionally(fallbackError); + } else { + future.complete(result); + } + }); + return; + } + if (error != null) { + ledgerEntries.forEach(LedgerEntries::close); + future.completeExceptionally(error); + return; + } + if (receivedEntries.isEmpty()) { + ledgerEntries.forEach(LedgerEntries::close); + future.completeExceptionally(new ManagedLedgerException( + "Batch read returned no entries for ledger " + lh.getId() + + " starting from entry " + firstEntry)); + return; + } + future.complete(CompositeLedgerEntriesImpl.create(receivedEntries, ledgerEntries)); + } } From 43189f052785f21dd6d4e3112ef21aa4dc68c391 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 4 Apr 2026 16:07:25 +0800 Subject: [PATCH 8/9] fix code --- .../impl/cache/ReadEntryUtilsTest.java | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java index c97a5da67a1c2..845c49c7c91a5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java @@ -131,12 +131,15 @@ public void testBatchReadFailure() { .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK read failed"))); // Fallback also fails to verify the exception propagates when(lh.readUnconfirmedAsync(0L, 4L)) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK read failed"))); + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("readUnconfirmed also failed"))); CompletableFuture future = ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get) + .hasCauseInstanceOf(RuntimeException.class) + .hasRootCauseMessage("readUnconfirmed also failed"); } @Test @@ -284,24 +287,6 @@ public void testBatchReadFailureWithPartialDataDoesNotFallback() { verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); } - @Test - public void testBatchReadFailureFallbackAlsoFails() { - // Batch read fails - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK batch read failed"))); - // Fallback also fails - when(lh.readUnconfirmedAsync(0L, 4L)) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("readUnconfirmed also failed"))); - - CompletableFuture future = - ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); - - assertThat(future).isCompletedExceptionally(); - assertThatThrownBy(future::get) - .hasCauseInstanceOf(RuntimeException.class) - .hasRootCauseMessage("readUnconfirmed also failed"); - } - @Test public void testBatchReadMidBatchFailureCleansUpResources() { // Track whether the first batch's LedgerEntries is closed From 9fc9f22ee9a91886da78d0f26abda3ac98fb5a14 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 5 Apr 2026 05:12:26 +0800 Subject: [PATCH 9/9] fix code --- .../mledger/impl/cache/ReadEntryUtils.java | 46 ++++- .../impl/cache/RangeEntryCacheImplTest.java | 52 ++--- .../impl/cache/ReadEntryUtilsTest.java | 190 +++++++++--------- .../auth/MockedPulsarServiceBaseTest.java | 1 + .../client/PulsarMockLedgerHandle.java | 28 +++ 5 files changed, 194 insertions(+), 123 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 70ac3a4b46cf3..d2dcca742c17f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -21,17 +21,20 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +@Slf4j class ReadEntryUtils { - private static final Logger log = LoggerFactory.getLogger(ReadEntryUtils.class); static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, long lastEntry) { @@ -70,13 +73,13 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h log.debug("Using batch read for ledger {} entries {}-{}, maxCount={}, maxSize={}", handle.getId(), firstEntry, lastEntry, numberOfEntries, batchReadMaxSize); } - return batchReadWithAutoRefill(lh, firstEntry, numberOfEntries, batchReadMaxSize); + return batchReadUnconfirmedWithAutoRefill(lh, firstEntry, numberOfEntries, batchReadMaxSize); } return handle.readUnconfirmedAsync(firstEntry, lastEntry); } - private static CompletableFuture batchReadWithAutoRefill(LedgerHandle lh, long firstEntry, + private static CompletableFuture batchReadUnconfirmedWithAutoRefill(LedgerHandle lh, long firstEntry, int maxCount, int maxSize) { CompletableFuture future = new CompletableFuture<>(); List receivedEntries = new ArrayList<>(maxCount); @@ -88,7 +91,7 @@ private static CompletableFuture batchReadWithAutoRefill(LedgerHa private static void doBatchRead(LedgerHandle lh, long firstEntry, int maxCount, int maxSize, List receivedEntries, List ledgerEntries, CompletableFuture future) { - lh.batchReadAsync(firstEntry, maxCount - receivedEntries.size(), maxSize) + batchReadUnconfirmed(lh, firstEntry, maxCount - receivedEntries.size(), maxSize) .whenComplete((entries, throwable) -> { if (throwable != null) { onBatchReadComplete(lh, firstEntry, maxCount, receivedEntries, ledgerEntries, future, @@ -143,4 +146,35 @@ private static void onBatchReadComplete(LedgerHandle lh, long firstEntry, int ma } future.complete(CompositeLedgerEntriesImpl.create(receivedEntries, ledgerEntries)); } + + + private static CompletableFuture batchReadUnconfirmed(LedgerHandle lh, long firstEntry, + int maxCount, int maxSize) { + CompletableFuture f = new CompletableFuture<>(); + + lh.asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, (rc, ignore, seq, ctx) -> { + if (rc != BKException.Code.OK) { + f.completeExceptionally(BKException.create(rc)); + return; + } + List entries = new ArrayList<>(maxCount); + while (seq.hasMoreElements()) { + var oldEntry = seq.nextElement(); + entries.add(LedgerEntryImpl.create( + oldEntry.getLedgerId(), + oldEntry.getEntryId(), + oldEntry.getLength(), + oldEntry.getEntryBuffer())); + } + if (entries.isEmpty()) { + f.completeExceptionally(new ManagedLedgerException( + "Batch read returned no entries for ledger " + lh.getId() + + " starting from entry " + firstEntry)); + return; + } + f.complete(LedgerEntriesImpl.create(entries)); + }, null); + + return f; + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java index 5ee427f0aac32..a40f731dcff0d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -31,12 +30,15 @@ import static org.mockito.Mockito.when; import io.netty.buffer.Unpooled; import java.util.ArrayList; +import java.util.Enumeration; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntSupplier; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -294,30 +296,32 @@ public void testReadFromStorageWithBatchReadEnabled() { for (long i = 0; i <= 4; i++) { entryList.add(LedgerEntryImpl.create(1L, i, 1, Unpooled.wrappedBuffer(new byte[]{(byte) i}))); } - LedgerEntries batchResult = new LedgerEntries() { - @Override - public LedgerEntry getEntry(long eid) { - for (LedgerEntry e : entryList) { - if (e.getEntryId() == eid) { - return e; - } + doAnswer(invocation -> { + ReadCallback cb = invocation.getArgument(3); + Object ctx = invocation.getArgument(4); + Iterator it = entryList.iterator(); + Enumeration enumeration = new Enumeration<>() { + @Override + public boolean hasMoreElements() { + return it.hasNext(); } - throw new IndexOutOfBoundsException("Entry " + eid + " not found"); - } - - @Override - public Iterator iterator() { - return entryList.iterator(); - } - - @Override - public void close() { - entryList.forEach(LedgerEntry::close); - } - }; - - when(ledgerHandle.batchReadAsync(eq(0L), eq(5), anyLong())) - .thenReturn(CompletableFuture.completedFuture(batchResult)); + @Override + public org.apache.bookkeeper.client.LedgerEntry nextElement() { + LedgerEntry apiEntry = it.next(); + org.apache.bookkeeper.client.LedgerEntry mockEntry = + mock(org.apache.bookkeeper.client.LedgerEntry.class); + when(mockEntry.getLedgerId()).thenReturn(apiEntry.getLedgerId()); + when(mockEntry.getEntryId()).thenReturn(apiEntry.getEntryId()); + when(mockEntry.getLength()).thenReturn(apiEntry.getLength()); + when(mockEntry.getEntryBuffer()).thenReturn( + io.netty.buffer.Unpooled.wrappedBuffer(apiEntry.getEntryBuffer())); + return mockEntry; + } + }; + cb.readComplete(BKException.Code.OK, ledgerHandle, enumeration, ctx); + return null; + }).when(ledgerHandle).asyncBatchReadUnconfirmedEntries(eq(0L), eq(5), anyLong(), any(ReadCallback.class), + any()); CompletableFuture> future = cache.readFromStorage(ledgerHandle, 0L, 4L, () -> 1); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java index 845c49c7c91a5..4fbcdf1315db8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtilsTest.java @@ -20,20 +20,27 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.buffer.Unpooled; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -42,6 +49,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.mockito.stubbing.Answer; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -64,23 +72,19 @@ public void setup() { @Test public void testBatchReadSingleBatch() { - LedgerEntries batchResult = createLedgerEntries(1L, 0, 1, 2, 3, 4); - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(batchResult)); + doAnswer(successBatchReadAnswer(1L, 0, 1, 2, 3, 4)).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(0L), anyInt(), anyLong(), any(), any()); CompletableFuture future = ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); assertThat(future).isCompleted(); - LedgerEntries result = future.getNow(null); - try { + try (LedgerEntries result = future.getNow(null)) { List entryIds = new ArrayList<>(); for (LedgerEntry e : result) { entryIds.add(e.getEntryId()); } assertThat(entryIds).containsExactly(0L, 1L, 2L, 3L, 4L); - } finally { - result.close(); } verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); @@ -89,13 +93,10 @@ public void testBatchReadSingleBatch() { @Test public void testBatchReadMultipleBatches() { // First batch returns entries 0-2, second returns 3-4 - LedgerEntries batch1 = createLedgerEntries(1L, 0, 1, 2); - LedgerEntries batch2 = createLedgerEntries(1L, 3, 4); - - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(batch1)); - when(lh.batchReadAsync(eq(3L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(batch2)); + doAnswer(successBatchReadAnswer(1L, 0, 1, 2)).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(0L), anyInt(), anyLong(), any(), any()); + doAnswer(successBatchReadAnswer(1L, 3, 4)).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(3L), anyInt(), anyLong(), any(), any()); CompletableFuture future = ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); @@ -115,9 +116,17 @@ public void testBatchReadMultipleBatches() { @Test public void testBatchReadEmptyResult() { - LedgerEntries emptyBatch = createLedgerEntries(1L); - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(emptyBatch)); + doAnswer(invocation -> { + ReadCallback callback = invocation.getArgument(3); + Object ctx = invocation.getArgument(4); + Enumeration empty = Collections.emptyEnumeration(); + callback.readComplete(BKException.Code.OK, null, empty, ctx); + return null; + }).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(0L), anyInt(), anyLong(), any(), any()); + // Empty result triggers fallback to readUnconfirmedAsync + when(lh.readUnconfirmedAsync(0L, 4L)) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("No entries"))); CompletableFuture future = ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); @@ -127,8 +136,8 @@ public void testBatchReadEmptyResult() { @Test public void testBatchReadFailure() { - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK read failed"))); + doAnswer(failureBatchReadAnswer()).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(0L), anyInt(), anyLong(), any(), any()); // Fallback also fails to verify the exception propagates when(lh.readUnconfirmedAsync(0L, 4L)) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("readUnconfirmed also failed"))); @@ -153,7 +162,7 @@ public void testBatchReadDisabledFallback() { assertThat(future).isCompleted(); verify(lh).readUnconfirmedAsync(0L, 4L); - verify(lh, never()).batchReadAsync(anyLong(), anyInt(), anyLong()); + verify(lh, never()).asyncBatchReadUnconfirmedEntries(anyLong(), anyInt(), anyLong(), any(), any()); future.getNow(null).close(); } @@ -170,7 +179,7 @@ public void testBatchReadSingleEntryFallback() { assertThat(future).isCompleted(); verify(lh).readUnconfirmedAsync(0L, 0L); - verify(lh, never()).batchReadAsync(anyLong(), anyInt(), anyLong()); + verify(lh, never()).asyncBatchReadUnconfirmedEntries(anyLong(), anyInt(), anyLong(), any(), any()); future.getNow(null).close(); } @@ -216,15 +225,12 @@ public void testReadOnlyManagedLedgerFallback() { public void testAutoRefillWithSizeLimitedReturns() { // Simulate: first batch returns only entries 0-1 (size-limited), // second batch returns entries 2-4 to complete the read - LedgerEntries batch1 = createLedgerEntriesWithSizes(1L, - new long[]{0, 1}, new int[]{256, 256}); - LedgerEntries batch2 = createLedgerEntriesWithSizes(1L, - new long[]{2, 3, 4}, new int[]{128, 128, 128}); - - when(lh.batchReadAsync(eq(0L), eq(5), anyLong())) - .thenReturn(CompletableFuture.completedFuture(batch1)); - when(lh.batchReadAsync(eq(2L), eq(3), anyLong())) - .thenReturn(CompletableFuture.completedFuture(batch2)); + doAnswer(successBatchReadAnswerWithSizes(1L, + new long[]{0, 1}, new int[]{256, 256})).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(0L), eq(5), anyLong(), any(), any()); + doAnswer(successBatchReadAnswerWithSizes(1L, + new long[]{2, 3, 4}, new int[]{128, 128, 128})).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(2L), eq(3), anyLong(), any(), any()); CompletableFuture future = ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); @@ -245,8 +251,8 @@ public void testAutoRefillWithSizeLimitedReturns() { @Test public void testBatchReadFailureFallsBackToReadUnconfirmed() { // First batch read fails - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("BK batch read failed"))); + doAnswer(failureBatchReadAnswer()).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(0L), anyInt(), anyLong(), any(), any()); // Fallback succeeds LedgerEntries fallbackResult = createLedgerEntries(1L, 0, 1, 2, 3, 4); when(lh.readUnconfirmedAsync(0L, 4L)) @@ -272,12 +278,11 @@ public void testBatchReadFailureFallsBackToReadUnconfirmed() { @Test public void testBatchReadFailureWithPartialDataDoesNotFallback() { // First batch succeeds with entries 0-2 - LedgerEntries batch1 = createLedgerEntries(1L, 0, 1, 2); - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(batch1)); + doAnswer(successBatchReadAnswer(1L, 0, 1, 2)).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(0L), anyInt(), anyLong(), any(), any()); // Second batch fails - when(lh.batchReadAsync(eq(3L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Second batch failed"))); + doAnswer(failureBatchReadAnswer()).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(3L), anyInt(), anyLong(), any(), any()); CompletableFuture future = ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); @@ -287,59 +292,21 @@ public void testBatchReadFailureWithPartialDataDoesNotFallback() { verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); } - @Test - public void testBatchReadMidBatchFailureCleansUpResources() { - // Track whether the first batch's LedgerEntries is closed - AtomicBoolean batch1Closed = new AtomicBoolean(false); - LedgerEntries batch1Inner = createLedgerEntries(1L, 0, 1, 2); - LedgerEntries trackedBatch1 = new LedgerEntries() { - @Override - public LedgerEntry getEntry(long eid) { - return batch1Inner.getEntry(eid); - } - - @Override - public Iterator iterator() { - return batch1Inner.iterator(); - } - - @Override - public void close() { - batch1Closed.set(true); - batch1Inner.close(); - } - }; - - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(trackedBatch1)); - when(lh.batchReadAsync(eq(3L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Second batch failed"))); - - CompletableFuture future = - ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); - - assertThat(future).isCompletedExceptionally(); - // Verify first batch resources were cleaned up - assertThat(batch1Closed.get()).isTrue(); - // Verify no fallback since partial data was received - verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong()); - } - @Test public void testBatchReadMidBatchFailurePreservesOriginalException() { - LedgerEntries batch1 = createLedgerEntries(1L, 0, 1, 2); - when(lh.batchReadAsync(eq(0L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(batch1)); - when(lh.batchReadAsync(eq(3L), anyInt(), anyLong())) - .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Second batch read failed"))); + // First batch succeeds with entries 0-2 + doAnswer(successBatchReadAnswer(1L, 0, 1, 2)).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(0L), anyInt(), anyLong(), any(), any()); + // Second batch fails + doAnswer(failureBatchReadAnswer()).when(lh) + .asyncBatchReadUnconfirmedEntries(eq(3L), anyInt(), anyLong(), any(), any()); CompletableFuture future = ReadEntryUtils.readAsync(ml, lh, 0L, 4L, true, 1024); assertThat(future).isCompletedExceptionally(); assertThatThrownBy(future::get) - .hasCauseInstanceOf(RuntimeException.class) - .hasRootCauseMessage("Second batch read failed"); + .hasCauseInstanceOf(BKException.class); } // --- helpers --- @@ -353,16 +320,6 @@ private static LedgerEntries createLedgerEntries(long ledgerId, long... entryIds return wrapLedgerEntries(entries); } - private static LedgerEntries createLedgerEntriesWithSizes(long ledgerId, long[] entryIds, int[] sizes) { - List entries = new ArrayList<>(); - for (int i = 0; i < entryIds.length; i++) { - byte[] data = new byte[sizes[i]]; - entries.add(LedgerEntryImpl.create(ledgerId, entryIds[i], sizes[i], - Unpooled.wrappedBuffer(data))); - } - return wrapLedgerEntries(entries); - } - private static LedgerEntries wrapLedgerEntries(List entries) { return new LedgerEntries() { @Override @@ -386,4 +343,51 @@ public void close() { } }; } + + private static Answer successBatchReadAnswer(long ledgerId, long... entryIds) { + return invocation -> { + ReadCallback callback = invocation.getArgument(3); + Object ctx = invocation.getArgument(4); + Queue queue = new ArrayDeque<>(); + for (long eid : entryIds) { + org.apache.bookkeeper.client.LedgerEntry e = + mock(org.apache.bookkeeper.client.LedgerEntry.class); + when(e.getEntryId()).thenReturn(eid); + when(e.getLedgerId()).thenReturn(ledgerId); + when(e.getLength()).thenReturn(1L); + when(e.getEntryBuffer()).thenReturn(Unpooled.wrappedBuffer(new byte[]{(byte) eid})); + queue.add(e); + } + callback.readComplete(BKException.Code.OK, null, Collections.enumeration(queue), ctx); + return null; + }; + } + + private static Answer successBatchReadAnswerWithSizes(long ledgerId, long[] entryIds, int[] sizes) { + return invocation -> { + ReadCallback callback = invocation.getArgument(3); + Object ctx = invocation.getArgument(4); + Queue queue = new ArrayDeque<>(); + for (int i = 0; i < entryIds.length; i++) { + org.apache.bookkeeper.client.LedgerEntry e = + mock(org.apache.bookkeeper.client.LedgerEntry.class); + when(e.getEntryId()).thenReturn(entryIds[i]); + when(e.getLedgerId()).thenReturn(ledgerId); + when(e.getLength()).thenReturn((long) sizes[i]); + when(e.getEntryBuffer()).thenReturn(Unpooled.wrappedBuffer(new byte[sizes[i]])); + queue.add(e); + } + callback.readComplete(BKException.Code.OK, null, Collections.enumeration(queue), ctx); + return null; + }; + } + + private static Answer failureBatchReadAnswer() { + return invocation -> { + ReadCallback callback = invocation.getArgument(3); + Object ctx = invocation.getArgument(4); + callback.readComplete(BKException.Code.BookieHandleNotAvailableException, null, null, ctx); + return null; + }; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 47b272fdaee67..37ebda6bc3b16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -604,6 +604,7 @@ protected ServiceConfiguration getDefaultConf() { configuration.setNumExecutorThreadPoolSize(5); configuration.setBrokerMaxConnections(0); configuration.setBrokerMaxConnectionsPerIp(0); + configuration.setManagedLedgerBatchReadEnabled(true); return configuration; } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 58a3e8949eba5..55998d3f6d131 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -279,6 +279,34 @@ public CompletableFuture batchReadAsync(long startEntry, int maxC return readHandle.batchReadAsync(startEntry, maxCount, maxSize); } + @Override + public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx) { + batchReadAsync(startEntry, maxCount, maxSize) + .whenCompleteAsync((ledgerEntries, t) -> { + if (t != null) { + cb.readComplete(PulsarMockBookKeeper.getExceptionCode(t), PulsarMockLedgerHandle.this, + null, ctx); + } else { + Queue seq = new ArrayDeque<>(); + for (var entry : ledgerEntries) { + seq.add(new LedgerEntry(LedgerEntryImpl.duplicate(entry))); + } + ledgerEntries.close(); + Enumeration enumeration = new Enumeration<>() { + @Override + public boolean hasMoreElements() { + return !seq.isEmpty(); + } + @Override + public LedgerEntry nextElement() { + return seq.remove(); + } + }; + cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, enumeration, ctx); + } + }, bk.executor); + } private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd) { List ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble());