Skip to content

[Bug] broker: Stale backlog after creating Earliest subscription on trimmed topic #25813

@ShuKe-code

Description

@ShuKe-code

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

pulsar broker 3.0.12
java 17

Issue Description

Observed on a 3.0.x based branch.

What did you expect to see?

After the inactive subscription is deleted and all old ledgers are trimmed, recreating the durable subscription with Earliest should initialize the cursor with backlog 0 if there is no real remaining data.

stats.backlog, stats-internal, and precise backlog should be consistent.

What did you see instead?

admin topics stats can report backlog > 0, while:

  • admin topics stats-internal shows old ledgers already trimmed
  • precise backlog is 0
  • the consumer cannot actually receive any message
  • unloading the topic makes the reported backlog become 0

Error messages


Reproducing the issue

  1. Create a partitioned topic.
  2. Produce messages across multiple ledgers.
  3. Configure retention aggressively so old ledgers are trimmed, leaving only a new empty current ledger.
  4. Create a new and durable subscription name and SubscriptionInitialPosition.Earliest.
  5. Compare:
    • admin topics stats <topic>
    • admin topics stats-internal <topic>
    • precise backlog

Additional information

Root cause is in ManagedLedgerImpl#getFirstPositionAndCounter() during earliest cursor initialization.

When all historical ledgers are already trimmed and only an empty current ledger remains, getFirstPosition() can return a synthetic position on the old last confirmed ledger (for example 469:-1).
getFirstPositionAndCounter() then uses that synthetic position in getNumberOfEntries(Range.openClosed(...)), which counts entries from a ledger that no longer exists in ledgers.

This leads to an incorrect messagesConsumedCounter when a durable subscription is recreated with SubscriptionInitialPosition.Earliest after inactive subscription deletion. The inaccurate counter then propagates into approximate backlog stats.

A fix is to treat this synthetic-first-position case specially and initialize the earliest cursor from lastConfirmedEntry and entriesAddedCounter instead of counting against trimmed ledgers.

--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3798,19 +3798,40 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
      */
     Pair<PositionImpl, Long> getFirstPositionAndCounter() {
         PositionImpl pos;
+        PositionImpl firstPosition;
         long count;
         Pair<PositionImpl, Long> lastPositionAndCounter;
 
         do {
-            pos = getFirstPosition();
+            firstPosition = getFirstPosition();
             lastPositionAndCounter = getLastPositionAndCounter();
-            count = lastPositionAndCounter.getRight()
-                    - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.getLeft()));
-        } while (pos.compareTo(getFirstPosition()) != 0
+            if (isSyntheticFirstPosition(firstPosition)) {
+                pos = lastPositionAndCounter.getLeft();
+                count = lastPositionAndCounter.getRight();
+            } else {
+                pos = firstPosition;
+                count = lastPositionAndCounter.getRight()
+                        - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.getLeft()));
+            }
+        } while (firstPosition.compareTo(getFirstPosition()) != 0
                 || lastPositionAndCounter.getLeft().compareTo(getLastPosition()) != 0);
         return Pair.of(pos, count);
     }
 
+    private boolean isSyntheticFirstPosition(PositionImpl position) {
+        Long firstLedgerId = ledgers.firstKey();
+        if (firstLedgerId == null || position == null) {
+            return false;
+        }
+
+        LedgerInfo firstLedger = ledgers.get(firstLedgerId);
+        return firstLedgerId > lastConfirmedEntry.getLedgerId()
+                && position.getLedgerId() == lastConfirmedEntry.getLedgerId()
+                && position.getEntryId() == -1
+                && firstLedger != null
+                && firstLedger.getEntries() == 0;
+    }

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions