Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,10 @@ public synchronized void asyncTerminate(TerminateCallback callback, Object ctx)
}

log.info("Terminating managed ledger");
State previousState = state;
state = State.Terminated;
clearPendingAddEntriesAfterTerminate(previousState,
new ManagedLedgerTerminatedException("Managed ledger was already terminated"));

LedgerHandle lh = currentLedger;
log.debug().attr("ledgerId", lh.getId()).log("Closing current writing ledger");
Expand Down Expand Up @@ -1706,7 +1709,30 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) {

@Override
public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx) {
if (STATE_UPDATER.get(this) == State.Closed) {
log.debug().attr("rc", rc).attr("ledgerId", lh != null ? lh.getId() : -1).log("createComplete");

// The create callback carries a future used by the timeout checker. Complete it before any terminal-state
// return; otherwise a late callback after terminate/close can leave the timeout task and create-op metric
// unbalanced. A true return means this callback is stale because the future was already completed, and the
// helper has already deleted the late-created ledger if needed.
if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}

mbean.endDataLedgerCreateOp();
State state = STATE_UPDATER.get(this);
if (state == State.Terminated) {
if (lh != null) {
log.warn().attr("rc", rc)
.attr("ledgerId", lh != null ? lh.getId() : -1)
.attr("state", state)
.log("Ledger create completed after the managed ledger is terminated,"
+ " so close and delete this ledger handle");
closeAndDeleteCreatedLedger(lh);
}
clearPendingAddEntries(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
} else if (state == State.Closed) {
if (lh != null) {
log.warn().attr("rc", rc)
.attr("ledgerId", lh != null ? lh.getId() : -1)
Expand All @@ -1716,14 +1742,6 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
}
return;
}

log.debug().attr("rc", rc).attr("ledgerId", lh != null ? lh.getId() : -1).log("createComplete");

if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}

mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error().attr("rc", rc).attr("message", BKException.getMessage(rc)).log("Error creating ledger");
ManagedLedgerException status = createManagedLedgerException(rc);
Expand All @@ -1749,8 +1767,11 @@ public void operationComplete(Void v, Stat stat) {
synchronized (ManagedLedgerImpl.this) {
try {
State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
if (state == State.Closed || state.isFenced()) {
log.debug().log("skip ledger update after create complete ledger is closed or fenced");
if (state == State.Closed || state == State.Terminated || state.isFenced()) {
log.debug().attr("state", state)
.log("skip ledger update after create complete ledger is not writable");
// TODO: if this path is hit after the new ledger was already written into metadata,
// delete the unused ledger together with removing it from the metadata.
lh.closeAsync().exceptionally(e -> {
if (e != null) {
log.error()
Expand All @@ -1760,6 +1781,10 @@ public void operationComplete(Void v, Stat stat) {
}
return null;
});
if (state == State.Terminated) {
clearPendingAddEntries(new ManagedLedgerTerminatedException(
"Managed ledger was already terminated"));
}
} else {
LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
Expand Down Expand Up @@ -1837,6 +1862,23 @@ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, Le
store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback);
}

private void closeAndDeleteCreatedLedger(LedgerHandle lh) {
long ledgerId = lh.getId();
lh.closeAsync().whenComplete((ignore, closeException) -> {
if (closeException != null) {
log.warn().attr("ledgerId", ledgerId)
.attr("error", closeException.getMessage())
.log("Failed to close late-created ledger before deletion");
}
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES).exceptionally(deleteException -> {
log.warn().attr("ledgerId", ledgerId)
.attr("error", deleteException.getMessage())
.log("Failed to delete late-created ledger");
return null;
});
});
}

@VisibleForTesting
void createNewOpAddEntryForNewLedger() {
// Avoid use same OpAddEntry between different ledger handle
Expand All @@ -1861,6 +1903,14 @@ void createNewOpAddEntryForNewLedger() {
}

protected synchronized void updateLedgersIdsComplete(@Nullable LedgerHandle originalCurrentLedger) {
State state = STATE_UPDATER.get(this);
if (state == State.Terminated) {
log.debug().attr("state", state)
.attr("pendingAddEntries", pendingAddEntries.size())
.log("Skip completing ledger switch because managed ledger is terminated");
clearPendingAddEntries(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
}
STATE_UPDATER.set(this, State.LedgerOpened);
// Delete original "currentLedger" if it has been removed from "ledgers".
if (originalCurrentLedger != null && !ledgers.containsKey(originalCurrentLedger.getId())){
Expand Down Expand Up @@ -1996,7 +2046,7 @@ synchronized void createLedgerAfterClosed() {

boolean isNeededCreateNewLedgerAfterCloseLedger() {
final State state = STATE_UPDATER.get(this);
if (state != State.CreatingLedger && state != State.LedgerOpened) {
if (state != State.CreatingLedger && state != State.LedgerOpened && state != State.Terminated) {
return true;
}
return false;
Expand Down Expand Up @@ -2080,6 +2130,41 @@ synchronized void clearPendingAddEntries(ManagedLedgerException e) {
}
}

synchronized boolean failAddIfTerminated(OpAddEntry op) {
if (STATE_UPDATER.get(this) != State.Terminated) {
return false;
}

// This is only for a failed add callback that arrives after terminate has taken ownership of the ledger.
// terminate closes the current ledger at the current BK LAC; any outstanding add drained by
// LedgerHandle.close() is outside the terminated position. If this callback falls through to the normal
// write-failure path, ledgerClosed() will return in Terminated state without completing this add, leaving the
// client callback hanging. Fail it explicitly instead.
pendingAddEntries.remove(op);
op.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return true;
}

synchronized void clearPendingAddEntriesAfterTerminate(State previousState, ManagedLedgerException e) {
// In these states, there is no current-ledger write path left. Every pending op is waiting for a future ledger
// or a replay onto that future ledger, which terminate must not create.
if (previousState == State.CreatingLedger || previousState == State.ClosedLedger
|| previousState == State.WriteFailed) {
clearPendingAddEntries(e);
return;
}

// In LedgerOpened/ClosingLedger, this queue can also contain writes already sent to the current ledger.
// Those writes are decided by the BK callback path: they either complete before close advances LAC, or
// LedgerHandle.close() drains them and OpAddEntry.handleAddFailure completes them as terminated. Only fail
// entries that have not been initiated yet, since they are only waiting for a future ledger.
for (OpAddEntry op : pendingAddEntries) {
if (op.getState() == OpAddEntry.State.OPEN && pendingAddEntries.remove(op)) {
op.failed(e);
}
}
}

void asyncReadEntries(OpReadEntry opReadEntry) {
final State state = STATE_UPDATER.get(this);
if (state.isFenced() || state == State.Closed) {
Expand Down Expand Up @@ -4638,8 +4723,11 @@ public Clock getClock() {
}

/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
* @return
* Complete the ledger-create future used by the timeout checker.
*
* @return true when this callback is stale because another callback or the timeout task already completed the
* future. In that case the caller must stop processing this callback. If a ledger was created by the stale
* callback, this method schedules it for deletion.
*/
@SuppressWarnings("unchecked")
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ void handleAddFailure(final LedgerHandle lh, Integer rc) {
finalMl.mbean.recordAddEntryError();

finalMl.getExecutor().execute(() -> {
if (finalMl.failAddIfTerminated(this)) {
return;
}
// Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
// from a BK callback.
// If we received a "MetadataVersionException" or a "LedgerFencedException", we should tell the ML that
Expand Down
Loading
Loading