From eb9902ec1c01230d481f7f208219339d6d7b9074 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Wed, 11 Feb 2026 18:02:14 +0800 Subject: [PATCH 1/3] Fix ManagedLedgerImpl's pendingAddEntries leak. --- .../mledger/impl/ManagedLedgerImpl.java | 68 ++++++++++---- .../mledger/impl/ManagedLedgerTest.java | 88 +++++++++++++++++++ 2 files changed, 139 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 05a17fd1a77ef..ed2f80bc0cf13 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1652,6 +1652,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c factory.close(this); STATE_UPDATER.set(this, State.Closed); + clearPendingAddEntries(new ManagedLedgerAlreadyClosedException("Managed ledger is closed")); cancelScheduledTasks(); LedgerHandle lh = currentLedger; @@ -1746,19 +1747,39 @@ public void operationComplete(Void v, Stat stat) { log.debug().attr("version", stat).log("Updating of ledgers list after create complete"); ledgersStat = stat; synchronized (ManagedLedgerImpl.this) { - LedgerHandle originalCurrentLedger = currentLedger; - ledgers.put(lh.getId(), newLedger); - currentLedger = lh; - currentLedgerTimeoutTriggered = new AtomicBoolean(); - currentLedgerEntries = 0; - currentLedgerSize = 0; - updateLedgersIdsComplete(originalCurrentLedger); - mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS); - // May need to update the cursor position - maybeUpdateCursorBeforeTrimmingConsumedLedger(); + try { + State state = STATE_UPDATER.get(ManagedLedgerImpl.this); + if (state == State.Closed || state.isFenced()) { + log.debug() + .attr("name", name) + .log("skip ledger update after create complete ledger is closed or fenced"); + lh.closeAsync().exceptionally(e -> { + if (e != null) { + log.error() + .attr("ledgerName", name) + .attr("ledgerId", lh.getId()) + .attr("error", e.getMessage()) + .log("Failed to close ledger"); + } + return null; + }); + } else { + LedgerHandle originalCurrentLedger = currentLedger; + ledgers.put(lh.getId(), newLedger); + currentLedger = lh; + currentLedgerTimeoutTriggered = new AtomicBoolean(); + currentLedgerEntries = 0; + currentLedgerSize = 0; + updateLedgersIdsComplete(originalCurrentLedger); + mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() + - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS); + // May need to update the cursor position + maybeUpdateCursorBeforeTrimmingConsumedLedger(); + } + } finally { + metadataMutex.unlock(); + } } - metadataMutex.unlock(); } @Override @@ -1927,6 +1948,9 @@ synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) { // The managed ledger was closed during the write operation clearPendingAddEntries(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed")); return; + } else if (state.isFenced()) { + clearPendingAddEntries(new ManagedLedgerFencedException("Managed ledger is fenced")); + return; } else { // In case we get multiple write errors for different outstanding write request, we should close the ledger // just once @@ -2052,7 +2076,7 @@ public ManagedLedgerInterceptor getManagedLedgerInterceptor() { return managedLedgerInterceptor; } - void clearPendingAddEntries(ManagedLedgerException e) { + synchronized void clearPendingAddEntries(ManagedLedgerException e) { while (!pendingAddEntries.isEmpty()) { OpAddEntry op = pendingAddEntries.poll(); op.failed(e); @@ -4327,7 +4351,8 @@ public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } - private boolean currentLedgerIsFull() { + @VisibleForTesting + protected boolean currentLedgerIsFull() { if (!factory.isMetadataServiceAvailable()) { // We don't want to trigger metadata operations if we already know that we're currently disconnected return false; @@ -4416,13 +4441,22 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException { @VisibleForTesting public synchronized void setFenced() { - log.info("Moving to Fenced state"); - STATE_UPDATER.set(this, State.Fenced); + log.info().attr("ledgerName", name).log("Moving to Fenced state"); + if (STATE_UPDATER.get(this) != State.Fenced) { + STATE_UPDATER.set(this, State.Fenced); + clearPendingAddEntries(new ManagedLedgerFencedException("ManagedLedger " + + name + " is fenced")); + } } synchronized void setFencedForDeletion() { - log.info("Moving to FencedForDeletion state"); + log.info().attr("ledgerName", name).log("Moving to FencedForDeletion state"); STATE_UPDATER.set(this, State.FencedForDeletion); + if (STATE_UPDATER.get(this) != State.Fenced) { + STATE_UPDATER.set(this, State.Fenced); + clearPendingAddEntries(new ManagedLedgerFencedException("ManagedLedger " + + name + " is fenced")); + } } MetaStore getStore() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 7e756f1d994eb..877bd4051c72b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -764,6 +764,94 @@ public void invalidReadEntriesArg2() throws Exception { fail("Should have thrown an exception in the above line"); } + @Test(timeOut = 30000) + public void testCloseManagedLedgerAfterRollover() throws Exception { + ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); + config.setMaxCacheSize(0); + @Cleanup("shutdown") + ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, config); + ManagedLedgerImpl realLedger = (ManagedLedgerImpl) factory.open("my_test_ledger"); + ManagedLedgerImpl ledger = Mockito.spy(realLedger); + AtomicBoolean onlyOnce = new AtomicBoolean(false); + when(ledger.currentLedgerIsFull()).thenAnswer(invocation -> onlyOnce.compareAndSet(false, true)); + OpAddEntry realOp = OpAddEntry.createNoRetainBuffer(ledger, + ByteBufAllocator.DEFAULT.buffer(128), null, null, new AtomicBoolean()); + OpAddEntry op = spy(realOp); + CountDownLatch createLatch = new CountDownLatch(1); + CountDownLatch closeLatch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + // Simulate that before the rollover is completed, new write requests arrive, + // and after these write requests are added to pendingAddEntries, the ledger is closed. + log.info("before add, ledger state:{}", ledger.state); + for (int i = 0; i < 10; ++i) { + ledger.internalAsyncAddEntry(OpAddEntry.createNoRetainBuffer(ledger, + ByteBufAllocator.DEFAULT.buffer(128), null, null, new AtomicBoolean())); + } + ledger.asyncClose(new CloseCallback() { + @Override + public void closeComplete(Object ctx) { + log.info("closeComplete finished, ledger state:{}", ledger.state); + closeLatch.countDown(); + } + + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.info("closeFailed, ex:{}, state:{}", exception.getMessage(), ledger.state); + closeLatch.countDown(); + } + }, null); + log.info("after add, ledger state:{}", ledger.state); + return invocationOnMock.callRealMethod(); + }).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any()); + doAnswer(invocationOnMock -> { + Object o = invocationOnMock.callRealMethod(); + log.info("createComplete finished, state:{}", ledger.state); + ledger.executor.execute(createLatch::countDown); + return o; + }).when(ledger).createComplete(anyInt(), any(), any()); + ledger.internalAsyncAddEntry(op); + createLatch.await(); + closeLatch.await(); + Assert.assertEquals(ledger.pendingAddEntries.size(), 0); + } + + @Test(timeOut = 20000) + public void testFencedManagedLedgerAfterAdd() throws Exception { + @Cleanup("shutdown") + ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedgerImpl realLedger = (ManagedLedgerImpl) factory1.open("my_test_ledger"); + ManagedLedgerImpl ledger = spy(realLedger); + + int sendNum = 10; + CountDownLatch sendLatch = new CountDownLatch(sendNum); + CountDownLatch fencedLatch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + stopBookKeeper(); + stopMetadataStore(); + for (int i = 0; i < sendNum; ++i) { + ledger.internalAsyncAddEntry(OpAddEntry.createNoRetainBuffer(ledger, + ByteBufAllocator.DEFAULT.buffer(128), new AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + sendLatch.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + sendLatch.countDown(); + } + }, null, new AtomicBoolean())); + } + Object o = invocationOnMock.callRealMethod(); + fencedLatch.countDown(); + return o; + }).when(ledger).setFenced(); + ledger.setFenced(); + fencedLatch.await(); + sendLatch.await(); + assertEquals(ledger.pendingAddEntries.size(), 0); + } + @Test(timeOut = 20000) public void deleteAndReopen() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); From 13639188ae6941556046c671966246ff726f6525 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Thu, 30 Apr 2026 20:39:45 +0800 Subject: [PATCH 2/3] Fix comments. --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 ++--- .../pulsar/broker/service/persistent/PersistentTopic.java | 6 +++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ed2f80bc0cf13..f3087674b0f25 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -4451,9 +4451,8 @@ public synchronized void setFenced() { synchronized void setFencedForDeletion() { log.info().attr("ledgerName", name).log("Moving to FencedForDeletion state"); - STATE_UPDATER.set(this, State.FencedForDeletion); - if (STATE_UPDATER.get(this) != State.Fenced) { - STATE_UPDATER.set(this, State.Fenced); + if (STATE_UPDATER.get(this) != State.FencedForDeletion) { + STATE_UPDATER.set(this, State.FencedForDeletion); clearPendingAddEntries(new ManagedLedgerFencedException("ManagedLedger " + name + " is fenced")); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2e76254dc97ab..5370d611fa760 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -764,17 +764,21 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) Instead, we will rely on the service unit state channel's bundle(topic) transfer protocol. At the end of the transfer protocol, at Owned state, the source broker should close the topic properly. */ + PublishContext callback = (PublishContext) ctx; if (transferring) { log.debug() .exception(exception) .log("Failed to persist msg in store while transferring"); + callback.completed(new TopicClosedException(exception), -1, -1); + decrementPendingWriteOpsAndCheck(); return; } - PublishContext callback = (PublishContext) ctx; if (exception instanceof ManagedLedgerFencedException) { // If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen close(); + callback.completed(new TopicFencedException(exception.getMessage()), -1, -1); + decrementPendingWriteOpsAndCheck(); } else { // fence topic when failed to write a message to BK fence(); From b86304ac4b7c838b24d3344b054b876b9aa0bb88 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Fri, 1 May 2026 10:33:17 +0800 Subject: [PATCH 3/3] Fix test compilation: use string concatenation for logger instead of parameterized logging --- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 877bd4051c72b..a0cbd6519eefe 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -782,7 +782,7 @@ public void testCloseManagedLedgerAfterRollover() throws Exception { doAnswer(invocationOnMock -> { // Simulate that before the rollover is completed, new write requests arrive, // and after these write requests are added to pendingAddEntries, the ledger is closed. - log.info("before add, ledger state:{}", ledger.state); + log.info("before add, ledger state: " + ledger.getState()); for (int i = 0; i < 10; ++i) { ledger.internalAsyncAddEntry(OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128), null, null, new AtomicBoolean())); @@ -790,22 +790,22 @@ public void testCloseManagedLedgerAfterRollover() throws Exception { ledger.asyncClose(new CloseCallback() { @Override public void closeComplete(Object ctx) { - log.info("closeComplete finished, ledger state:{}", ledger.state); + log.info("closeComplete finished, ledger state: " + ledger.getState()); closeLatch.countDown(); } @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { - log.info("closeFailed, ex:{}, state:{}", exception.getMessage(), ledger.state); + log.info("closeFailed, ex: " + exception.getMessage() + ", state: " + ledger.getState()); closeLatch.countDown(); } }, null); - log.info("after add, ledger state:{}", ledger.state); + log.info("after add, ledger state: " + ledger.getState()); return invocationOnMock.callRealMethod(); }).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any()); doAnswer(invocationOnMock -> { Object o = invocationOnMock.callRealMethod(); - log.info("createComplete finished, state:{}", ledger.state); + log.info("createComplete finished, state: " + ledger.getState()); ledger.executor.execute(createLatch::countDown); return o; }).when(ledger).createComplete(anyInt(), any(), any());