Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c4a48bb
[fix][broker] Fix markDeletedPosition race condition in maybeUpdateCu…
oneby-wang Dec 24, 2025
732264f
[fix][broker] Add debug log
oneby-wang Dec 24, 2025
1d34d97
[fix][broker] Wait updating cursor finished when opening managed ledger
oneby-wang Dec 24, 2025
e3812ca
[fix][broker] Modify uncorrected comment
oneby-wang Dec 24, 2025
aa25bcb
[fix][broker] Add test for maybeUpdateCursorBeforeTrimmingConsumedLedger
oneby-wang Dec 24, 2025
7d03b0a
[fix][broker] Fix test
oneby-wang Dec 24, 2025
93b0255
[fix][broker] Fix test
oneby-wang Dec 24, 2025
40932eb
[fix][test] Rollback some codes for test
oneby-wang Dec 24, 2025
892a372
[fix][broker] Try to fix maybeUpdateCursorBeforeTrimmingConsumedLedge…
oneby-wang Dec 27, 2025
15ec22e
[fix][broker] Fix race condition in ManagedCursorImpl.getNumberOfEntries
oneby-wang Dec 27, 2025
33b327d
[fix][broker] Fix CompactionTest.testCompactorReadsCompacted
oneby-wang Dec 27, 2025
6f4b2fe
[fix][broker] Fix race condition in ManagedCursorImpl.getNumberOfEntries
oneby-wang Dec 28, 2025
33cb384
[fix][broker] Fix ManagedCursorTest.testFlushCursorAfterError test
oneby-wang Dec 28, 2025
f320c55
[fix][broker] Modify logic
oneby-wang Dec 28, 2025
4b73a15
[fix][broker] Fix ManagedCursorTest testFlushCursorAfterInactivity an…
oneby-wang Dec 28, 2025
3d1bb13
[fix][broker] Fix ManagedCursorTest.testLazyCursorLedgerCreation test
oneby-wang Dec 28, 2025
87b25c6
[fix][broker] Fix ManagedCursorTest.testPersistentMarkDeleteIfCreateC…
oneby-wang Dec 28, 2025
d7b1f6a
[fix][broker] Fix ManagedCursorTest.testPersistentMarkDeleteIfSwitchC…
oneby-wang Dec 28, 2025
e52772d
[fix][broker] Fix tests
oneby-wang Dec 28, 2025
16deef7
[fix][broker] Fix some flaky assert in ManagedLedgerTest.testTrimmerR…
oneby-wang Jan 8, 2026
6475bca
[fix][broker] Remove some flaky asserts in ManagedLedgerTest.testTrim…
oneby-wang Jan 9, 2026
0589f5a
[fix][broker] Try to fix race condition when setting mark delete prop…
oneby-wang Jan 9, 2026
bd62af6
[fix][broker] Support null mark delete properties in NonDurableCursor…
oneby-wang Jan 9, 2026
4cb9046
[fix][broker] Rollback CompactionTest.testCompactorReadsCompacted test
oneby-wang Jan 9, 2026
1399578
[fix][broker] Add ManagedLedgerTest.testTrimmerRaceConditionInNonDura…
oneby-wang Jan 9, 2026
925b3ed
[fix][broker] Fix race condition in ManagedCursorImpl.updateLastMarkD…
oneby-wang Jan 9, 2026
df9ac29
[fix][broker] Fix checkstyle
oneby-wang Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1250,14 +1250,17 @@ public boolean hasMoreEntries() {

@Override
public long getNumberOfEntries() {
if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) {
Position readPos = readPosition;
Position lastPosition = ledger.getLastPosition();
Position nextPosition = lastPosition.getNext();
if (readPos.compareTo(nextPosition) > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read",
ledger.getName(), name, readPosition, ledger.getLastPosition());
ledger.getName(), name, readPos, lastPosition);
}
return 0;
} else {
return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext()));
return getNumberOfEntries(Range.closedOpen(readPos, nextPosition));
}
}

Expand Down Expand Up @@ -2255,25 +2258,27 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
}

Position newPosition = ackBatchPosition(position);
if (ledger.getLastConfirmedEntry().compareTo(newPosition) < 0) {
Position markDeletePos = markDeletePosition;
Position lastConfirmedEntry = ledger.getLastConfirmedEntry();
if (lastConfirmedEntry.compareTo(newPosition) < 0) {
boolean shouldCursorMoveForward = false;
try {
long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
long ledgerEntries = ledger.getLedgerInfo(markDeletePos.getLedgerId()).get().getEntries();
Long nextValidLedger = ledger.getNextValidLedger(lastConfirmedEntry.getLedgerId());
shouldCursorMoveForward = nextValidLedger != null
&& (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
&& (markDeletePos.getEntryId() + 1 >= ledgerEntries)
&& (newPosition.getLedgerId() == nextValidLedger);
} catch (Exception e) {
log.warn("Failed to get ledger entries while setting mark-delete-position", e);
}

if (shouldCursorMoveForward) {
log.info("[{}] move mark-delete-position from {} to {} since all the entries have been consumed",
ledger.getName(), markDeletePosition, newPosition);
ledger.getName(), markDeletePos, newPosition);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {}"
+ " for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
+ " for cursor [{}]", ledger.getName(), position, lastConfirmedEntry, name);
}
callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
Expand Down Expand Up @@ -2329,11 +2334,15 @@ protected void internalAsyncMarkDelete(final Position newPosition, Map<String, L
final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) {
ledger.mbean.addMarkDeleteOp();

MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx,
alignAcknowledgeStatusAfterPersisted);

// We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available
synchronized (pendingMarkDeleteOps) {
// use given properties or when missing, use the properties from the previous field value
MarkDeleteEntry last = pendingMarkDeleteOps.peekLast();
Map<String, Long> propertiesToUse =
properties != null ? properties : (last != null ? last.properties : getProperties());
MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx,
alignAcknowledgeStatusAfterPersisted);

// The state might have changed while we were waiting on the queue mutex
switch (state) {
case Closed:
Expand Down Expand Up @@ -2701,17 +2710,20 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
// update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition
private void updateLastMarkDeleteEntryToLatest(final Position newPosition,
final Map<String, Long> properties) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
if (last != null && last.newPosition.compareTo(newPosition) > 0) {
// keep current value, don't update
return last;
} else {
// use given properties or when missing, use the properties from the previous field value
Map<String, Long> propertiesToUse =
properties != null ? properties : (last != null ? last.properties : Collections.emptyMap());
return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
}
});
synchronized (pendingMarkDeleteOps) {
// use given properties or when missing, use the properties from the previous field value
MarkDeleteEntry lastPending = pendingMarkDeleteOps.peekLast();
Map<String, Long> propertiesToUse =
properties != null ? properties : (lastPending != null ? lastPending.properties : getProperties());
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
if (last != null && last.newPosition.compareTo(newPosition) > 0) {
// keep current value, don't update
return last;
} else {
return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
}
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,14 +481,15 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
public void initializeComplete() {
log.info("[{}] Successfully initialize managed ledger", name);
pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
}
// May need to update the cursor position and wait them finished
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger().whenComplete((__, ex) -> {
// ignore ex since it is handled in maybeUpdateCursorBeforeTrimmingConsumedLedger
future.complete(newledger);
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1761,11 +1761,10 @@ public void operationComplete(Void v, Stat stat) {
updateLedgersIdsComplete(originalCurrentLedger);
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
}
metadataMutex.unlock();

// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
}

@Override
Expand Down Expand Up @@ -2709,18 +2708,23 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
this.waitingEntryCallBacks.add(cb);
}

public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
public CompletableFuture<Void> maybeUpdateCursorBeforeTrimmingConsumedLedger() {
List<CompletableFuture<Void>> cursorMarkDeleteFutures = new ArrayList<>();
for (ManagedCursor cursor : cursors) {
Position lastAckedPosition = cursor.getPersistentMarkDeletedPosition() != null
? cursor.getPersistentMarkDeletedPosition() : cursor.getMarkDeletedPosition();
LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
CompletableFuture<Void> future = new CompletableFuture<>();
cursorMarkDeleteFutures.add(future);

// Snapshot positions into a local variables to avoid race condition.
Position markDeletedPosition = cursor.getMarkDeletedPosition();
Position lastAckedPosition = markDeletedPosition;
LedgerInfo curPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
.map(Map.Entry::getValue).orElse(null);

if (currPointedLedger != null) {
if (curPointedLedger != null) {
if (nextPointedLedger != null) {
if (lastAckedPosition.getEntryId() != -1
&& lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
&& lastAckedPosition.getEntryId() + 1 >= curPointedLedger.getEntries()) {
lastAckedPosition = PositionFactory.create(nextPointedLedger.getLedgerId(), -1);
}
} else {
Expand All @@ -2730,25 +2734,37 @@ public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
}

if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
int compareResult = lastAckedPosition.compareTo(markDeletedPosition);
if (compareResult > 0) {
Position finalPosition = lastAckedPosition;
log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
cursor.asyncMarkDelete(lastAckedPosition, cursor.getProperties(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.info("Successfully persisted cursor position for cursor:{} to {}",
cursor, finalPosition);
}
log.info("Mark deleting cursor:{} from {} to {} since ledger consumed completely.", cursor,
markDeletedPosition, lastAckedPosition);
cursor.asyncMarkDelete(lastAckedPosition, null, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.info("Successfully persisted cursor position for cursor:{} to {}", cursor, finalPosition);
future.complete(null);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
cursor, cursor.getMarkDeletedPosition(), finalPosition, exception);
}
}, null);
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Failed to mark delete: {} from {} to {}. ", cursor, cursor.getMarkDeletedPosition(),
finalPosition, exception);
future.completeExceptionally(exception);
}
}, null);
} else if (compareResult == 0) {
log.debug("No need to reset cursor: {}, last acked position equals to current mark-delete position {}.",
cursor, markDeletedPosition);
future.complete(null);
} else {
// Should not happen
log.warn("Ledger rollover tries to mark delete an already mark-deleted position. Current mark-delete:"
+ " {} -- attempted position: {}", markDeletedPosition, lastAckedPosition);
future.complete(null);
}
}
return FutureUtil.waitForAll(cursorMarkDeleteFutures);
}

private void trimConsumedLedgersInBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ void recover(final VoidCallback callback) {
protected void internalAsyncMarkDelete(final Position newPosition, Map<String, Long> properties,
final MarkDeleteCallback callback, final Object ctx, Runnable alignAcknowledgeStatusAfterPersisted) {
// Bypass persistence of mark-delete position and individually deleted messages info

MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx,
alignAcknowledgeStatusAfterPersisted);
MarkDeleteEntry mdEntry;
lock.writeLock().lock();
try {
// use given properties or when missing, use the properties from the previous field value
Map<String, Long> propertiesToUse = properties != null ? properties : getProperties();
mdEntry = new MarkDeleteEntry(newPosition, propertiesToUse, callback, ctx,
alignAcknowledgeStatusAfterPersisted);
lastMarkDeleteEntry = mdEntry;
mdEntry.alignAcknowledgeStatus();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
ml.close();
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThan(lastEntry);

// cleanup.
ml.delete();
Expand Down Expand Up @@ -498,12 +499,18 @@ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception {
assertTrue(slowestReadPosition.getLedgerId() >= lastEntry.getLedgerId());
assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
assertThat(cursor.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
assertThat(cursor.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);

// Verify the mark delete position can be recovered properly.
ml.close();
ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
ManagedCursorImpl cursorRecovered = (ManagedCursorImpl) ml.openCursor(cursorName);
assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(), lastEntry);
assertThat(cursorRecovered.getPersistentMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);
// If previous ledger is trimmed, Cursor: ManagedCursorImpl{ledger=ml_test, name=c1, ackPos=12:0, readPos=15:0}
// does not exist in the managed-ledger. Recovered cursor's position will not be moved forward.
// TODO should be handled in ledger trim process.
assertThat(cursorRecovered.getMarkDeletedPosition()).isGreaterThanOrEqualTo(lastEntry);

// cleanup.
ml.delete();
Expand Down Expand Up @@ -4441,7 +4448,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
});
}

Expand Down Expand Up @@ -4500,7 +4507,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterIndDelInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
});
}

Expand Down Expand Up @@ -4552,7 +4559,7 @@ public void testFlushCursorAfterError() throws Exception {
ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
ManagedCursor c2 = ledger2.openCursor("c");

assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
assertThat(c2.getMarkDeletedPosition()).isGreaterThan(positions.get(positions.size() - 1));
});
}

Expand Down Expand Up @@ -4815,7 +4822,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}

@Test
public void testLazyCursorLedgerCreation() throws Exception {
public void testEagerCursorLedgerCreation() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
Expand All @@ -4840,8 +4847,8 @@ public void testLazyCursorLedgerCreation() throws Exception {
ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
assertEquals(cursor1.getState(), "NoLedger");
assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition);
assertEquals(cursor1.getState(), "Open");
assertThat(cursor1.getMarkDeletedPosition()).isGreaterThan(finalLastPosition);

// Verify the recovered cursor can work with new mark delete.
lastPosition = null;
Expand Down
Loading