-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix] Fix ManagedLedgerImpl's pendingAddEntries leak. #25240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR addresses pendingAddEntries leaks in ManagedLedgerImpl when the managed ledger transitions to Closed or Fenced while there are queued add operations that were never initiated.
Changes:
- Add
OpAddEntry.closeIfNotInitiated()and use it to fail/remove queued-but-not-initiated add operations on close/fence. - Schedule the post-rollover metadata-update completion to run on the managed ledger executor thread.
- Clear pending writes when
ledgerClosed()is invoked while inFencedstate.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | Adds a helper to atomically close an OpAddEntry if it has not been initiated. |
| managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | Adds selective clearing of not-initiated pending adds on close/fence and adjusts rollover callback execution/threading and fenced handling. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| factory.close(this); | ||
| STATE_UPDATER.set(this, State.Closed); | ||
| executor.execute(() -> clearNotInitiatedPendingAddEntries(new ManagedLedgerException("Managed ledger is closed"))); |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In asyncClose(), the new failure passed to clearNotInitiatedPendingAddEntries() is a generic ManagedLedgerException("Managed ledger is closed"). This makes failures for queued-but-not-initiated adds inconsistent with other closed paths (eg internalAsyncAddEntry uses ManagedLedgerAlreadyClosedException) and can break callers that key off the specific subtype. Consider using ManagedLedgerAlreadyClosedException (or at least matching the existing close message pattern) here.
| executor.execute(() -> clearNotInitiatedPendingAddEntries(new ManagedLedgerException("Managed ledger is closed"))); | |
| executor.execute(() -> clearNotInitiatedPendingAddEntries( | |
| new ManagedLedgerAlreadyClosedException("Managed ledger is closed"))); |
| maybeUpdateCursorBeforeTrimmingConsumedLedger(); | ||
| } | ||
| metadataMutex.unlock(); | ||
| // make sure that pendingAddEntries' opeartions are executed in the same thread |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment has a typo: "opeartions" → "operations".
| // make sure that pendingAddEntries' opeartions are executed in the same thread | |
| // make sure that pendingAddEntries' operations are executed in the same thread |
| LedgerHandle originalCurrentLedger = currentLedger; | ||
| ledgers.put(lh.getId(), newLedger); | ||
| currentLedger = lh; | ||
| currentLedgerTimeoutTriggered = new AtomicBoolean(); | ||
| currentLedgerEntries = 0; | ||
| currentLedgerSize = 0; | ||
| updateLedgersIdsComplete(originalCurrentLedger); | ||
| mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() | ||
| - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS); | ||
| // May need to update the cursor position | ||
| maybeUpdateCursorBeforeTrimmingConsumedLedger(); |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new executor.execute() block updates ledgers/currentLedger and calls updateLedgersIdsComplete() without re-checking the ManagedLedger state. Since this work is now deferred, it can run after asyncClose()/setFenced and potentially transition the state back to LedgerOpened and/or touch ledgers after closure. Consider checking STATE_UPDATER.get(this) for Closed or isFenced inside the executor task and bailing out (while still releasing metadataMutex) if the ledger is no longer writable.
| LedgerHandle originalCurrentLedger = currentLedger; | |
| ledgers.put(lh.getId(), newLedger); | |
| currentLedger = lh; | |
| currentLedgerTimeoutTriggered = new AtomicBoolean(); | |
| currentLedgerEntries = 0; | |
| currentLedgerSize = 0; | |
| updateLedgersIdsComplete(originalCurrentLedger); | |
| mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() | |
| - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS); | |
| // May need to update the cursor position | |
| maybeUpdateCursorBeforeTrimmingConsumedLedger(); | |
| // Re-check state after deferring to executor to avoid mutating a closed/fenced ledger | |
| if (STATE_UPDATER.get(ManagedLedgerImpl.this) == State.Closed || isFenced()) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("[{}] Skipping ledger update after create complete because ledger is " | |
| + "closed or fenced", name); | |
| } | |
| } else { | |
| LedgerHandle originalCurrentLedger = currentLedger; | |
| ledgers.put(lh.getId(), newLedger); | |
| currentLedger = lh; | |
| currentLedgerTimeoutTriggered = new AtomicBoolean(); | |
| currentLedgerEntries = 0; | |
| currentLedgerSize = 0; | |
| updateLedgersIdsComplete(originalCurrentLedger); | |
| mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() | |
| - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS); | |
| // May need to update the cursor position | |
| maybeUpdateCursorBeforeTrimmingConsumedLedger(); | |
| } |
| } else if (state == State.Fenced) { | ||
| clearPendingAddEntries(new ManagedLedgerFencedException("Managed ledger is fenced")); |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ledgerClosed() only checks state == State.Fenced, but State.FencedForDeletion also returns true for state.isFenced(). In that case pending add entries would still not be cleared, which can reintroduce the leak for fenced-for-deletion. Consider using state.isFenced() here (and keep the exception message aligned with the specific fenced state if needed).
| } else if (state == State.Fenced) { | |
| clearPendingAddEntries(new ManagedLedgerFencedException("Managed ledger is fenced")); | |
| } else if (state.isFenced()) { | |
| clearPendingAddEntries(new ManagedLedgerFencedException( | |
| "Managed ledger is fenced (" + state + ")")); |
| void clearNotInitiatedPendingAddEntries(ManagedLedgerException e) { | ||
| Iterator<OpAddEntry> iterator = pendingAddEntries.iterator(); | ||
| while (iterator.hasNext()) { | ||
| OpAddEntry op = iterator.next(); | ||
| if (op.closeIfNotInitiated()) { | ||
| op.failed(e); | ||
| iterator.remove(); | ||
| } | ||
| } |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are existing tests in this module for pendingAddEntries behavior, but the new leak-prevention logic (clearNotInitiatedPendingAddEntries + OpAddEntry.closeIfNotInitiated) is not covered. Please add tests for the two scenarios described in the PR (close/fence occurring while ops are queued but not yet initiated, and fenced write-failure path clearing subsequent queued ops) to prevent regressions.
Fixed |
Motivation
There ard two scenarios where pendingAddEntries can leak in a ManagedLedger.
After an add operation, there may be a need to update the ledger's properties. If the ZooKeeper update fails, the callback sets the ManagedLedger state to fenced. Subsequent write operations that fail will trigger ManagedLedgerImpl#ledgerClosed. For a ManagedLedger in the fenced state, this can result in pendingAddEntries not being properly cleaned up, causing a leak.
The ManagedLedger needs to rollover because the current ledger is full, switching its state to closedLedger or creatingLedger. New incoming add requests are appended to the pendingAddEntries queue but have not yet been initiated. If the ManagedLedger is closed (state set to closed) after the rollover is executed but before its callback returns, the requests in the pendingAddEntries queue will never be completed, resulting in a leak.
Modifications
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: