[fix][broker] The configuration subscriptionExpirationTimeMinutes does not work if the topic was reloaded frequently#25244
Conversation
… not work if the topic was reloaded frequently
|
#22794 makes |
|
@thetumbled, could you review this PR? |
|
One additional detail about |
There was a problem hiding this comment.
Pull request overview
This pull request fixes an issue where the subscriptionExpirationTimeMinutes configuration does not work correctly when topics are reloaded frequently. The fix introduces a mechanism to distinguish between active and inactive cursor states by using negative timestamps to represent inactive time and positive timestamps for active time, and ensures this state is properly persisted to both metadata store and BookKeeper ledgers.
Changes:
- Modified the
lastActivetimestamp mechanism to use negative values for inactive cursors and positive values for active cursors, with the least significant bit tracking whether the value has been persisted - Added persistence of
lastActiveto BookKeeper ledgers in addition to the metadata store to prevent state loss during topic reloads - Updated the subscription expiration check logic to only delete subscriptions when they have negative timestamps (inactive) and exceed the expiration threshold
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
| managed-ledger/src/main/proto/MLDataFormats.proto | Added lastActive field to PositionInfo and restored the deprecated lastActive field in ManagedCursorInfo for persistence to BookKeeper |
| managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java | Added new updateLastActive(boolean) method and getLastActiveOrInActive() method; deprecated the old updateLastActive() |
| managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | Implemented the new lastActive mechanism with negative/positive timestamps, bit manipulation for persistence tracking, and recovery logic |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java | Updated all cursor.updateLastActive() calls to pass the active/inactive state and trigger persistence for the first consumer after inactive state |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java | Modified checkInactiveSubscriptions to only delete subscriptions with negative (inactive) timestamps |
| pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CursorLastActiveTimeTest.java | Added comprehensive tests validating the new lastActive mechanism across topic reloads and consumer lifecycle |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private CompletableFuture<Void> addConsumerInternal(Consumer consumer) { | ||
| return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> { | ||
| return pendingAckHandle.pendingAckHandleFuture() | ||
| .thenCompose(__ -> cursor.updateLastActive(true)) |
There was a problem hiding this comment.
The async call to cursor.updateLastActive(true) on line 245 can fail (as seen in the implementation where internalAsyncMarkDelete might fail). However, the error from this operation is not explicitly handled, and it would propagate to the caller of addConsumer. Consider whether this is the intended behavior or if the error should be logged and handled differently, especially since updating lastActive is metadata-related and might not warrant failing the entire consumer addition.
| // Lock the Subscription object before locking the Dispatcher object to avoid deadlocks | ||
| synchronized (this) { | ||
| if (dispatcher != null && dispatcher.isConsumerConnected()) { | ||
| cursor.updateLastActive(true); |
There was a problem hiding this comment.
The call to cursor.updateLastActive(true) on line 918 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Consider handling or logging errors from this async operation.
| @Override | ||
| public void resetComplete(Object ctx) { | ||
| if (dispatcher != null && !dispatcher.getConsumers().isEmpty()) { | ||
| cursor.updateLastActive(true); |
There was a problem hiding this comment.
The call to cursor.updateLastActive(true) on line 965 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Consider handling or logging errors from this async operation.
| msgOutFromRemovedConsumer.add(stats.msgOutCounter); | ||
|
|
||
| if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { | ||
| cursor.updateLastActive(false); |
There was a problem hiding this comment.
The call to cursor.updateLastActive(false) on line 367 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Consider handling or logging errors from this async operation.
| @@ -132,7 +134,7 @@ message ManagedCursorInfo { | |||
| repeated LongProperty properties = 5; | |||
|
|
|||
| // deprecated, do not persist this field anymore | |||
There was a problem hiding this comment.
The field 'lastActive' in ManagedCursorInfo was previously marked as deprecated (line 137 removes the deprecation). While this change is backward compatible from a protobuf perspective (optional fields can be re-enabled), there may be confusion if different versions of the code have conflicting expectations. Consider adding a comment explaining that this field was temporarily deprecated but is now being used again, to help future maintainers understand the history.
| // deprecated, do not persist this field anymore | |
| // NOTE: this field was previously marked as deprecated and some older | |
| // versions of the code may ignore or avoid persisting it. It is now | |
| // intentionally used again; keep this field for backward compatibility. |
| return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> { | ||
| return pendingAckHandle.pendingAckHandleFuture() | ||
| .thenCompose(__ -> cursor.updateLastActive(true)) | ||
| .thenCompose(future -> { |
There was a problem hiding this comment.
The lambda parameter name 'future' on line 246 is misleading. The result of cursor.updateLastActive(true) is a CompletableFuture<Void>, so the parameter represents a Void value, not a future. Consider renaming it to '__' or 'ignore' for clarity, similar to the parameter on line 245.
| .thenCompose(future -> { | |
| .thenCompose(__ -> { |
| @Override | ||
| public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException { | ||
| cursor.updateLastActive(); | ||
| cursor.updateLastActive(true); |
There was a problem hiding this comment.
The call to cursor.updateLastActive(true) on line 356 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Since removeConsumer is a synchronized method that can throw BrokerServiceException, consider whether the async operation should be awaited or if errors should be handled. Alternatively, if the operation is intentionally fire-and-forget, add a comment explaining why.
| @Override | ||
| public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) { | ||
| cursor.updateLastActive(); | ||
| cursor.updateLastActive(true); |
There was a problem hiding this comment.
The call to cursor.updateLastActive(true) on line 438 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Consider handling or logging errors from this async operation.
| * Get the last active time or inactive time of the cursor. | ||
| * | ||
| * @return If the value is negative, it indicates the last inactive time and has not been active since then. | ||
| * If the value is positive, it indicates the last active time (the last time delete was called, markDelete). |
There was a problem hiding this comment.
The documentation comment on line 2805 says "the last time delete was called, markDelete" which is confusing. It's unclear what "delete was called" means in this context. Consider clarifying this to say "the last time the cursor was actively used (e.g., when messages were acknowledged or mark-delete was called)" to make it more understandable.
| * If the value is positive, it indicates the last active time (the last time delete was called, markDelete). | |
| * If the value is positive, it indicates the last time the cursor was actively used (for example, when | |
| * messages were acknowledged or {@code markDelete} was called). |
| void initialize(Position position, Map<String, Long> properties, Map<String, String> cursorProperties, | ||
| final VoidCallback callback) { | ||
| recoveredCursor(position, properties, cursorProperties, null); | ||
| // When a cursor is creating, no consumers has connected, so we mark it as inactive state. |
There was a problem hiding this comment.
Typo: "has" should be "have" to match the subject "consumers". The sentence should read "When a cursor is creating, no consumers have connected..."
| // When a cursor is creating, no consumers has connected, so we mark it as inactive state. | |
| // When a cursor is creating, no consumers have connected, so we mark it as inactive state. |
Background
cursor.activeandcursor.lastActiveare applied in different scenarios, they are not in the same fields.cursor.active: boolean: whether the consumption catched upcursor.lastActive: the latest time that the cursor was touched, such as consumer registation, consumer unregistation, reset cursors.cursor.lastActivebroker.conf -> subscriptionExpirationTimeMinutes. Let Broker knows whether the cursor has been idle for a long time, to correctly delete the inactive subscriptions.cursor.lastActive3-1. update
cursor.lastActiveto the current timestamp when consumers register, unregister, reset cursors. Broker persists it to metadata store when closing topics.3-2. Pulsar supports persisting cursor state to Bookies.
3-3. #17573 removed the mechanism that recovering
cursor.lastActivefrom Metadata store, to fix an issue[1]topic.LAC:3:99subscriptionExpirationTimeMinutes:1 dayconsumers:["c1"]c1registered at2026-02-12 00:00cursor.markDeletedPosition:3:0c1acknowledged message3:0at2026-02-12 00:01cursor.lastActivewas set to2026-02-12 00:01and was persisted into Metadata storec1do not acknowlege any messages anymore.2026-02-13 01:00cursor.lastActiveto2026-02-12 00:012026-02-12 00:01 ~ 2026-02-13 01:00is longer than1 day, the subscription was deleted, but the consumer is still connected until the broker crashed.3:0(exclusive) ~ 3:99were lostMotivation
Issues 1. The inactive subscription can not be deleted automaticaily if bundle rebalances frequently
subscriptionExpirationTimeMinutes:1 day2026-02-12 00:00: all consumers were offline at2026-02-12 12:00: rebalanced: the topic was reloadedcursor.lastActive, the value ofcursor.lastActivewill be reset tosystem.currentMillis2026-02-13 06:00: rebalanced: the topic was reloadedcursor.lastActive, the value ofcursor.lastActivewill be reset tosystem.currentMillisIssue 2: when adding the new feature that Pulsar supports persisting cursor state to Bookies, it forgot to persist
cursor.lastActiveto Bookies, which will also cause the issue that inactive subscription can not be deleted automaticaily.Modifications
cursor.lastActiveto Bookie also, since the field is typedoptional, there is no compatibilty issues in multi versionscursor.lastActiveto a negative value (v = 0 - system.currentMillis), when all consumers are unregister.cursor.lastActivewhen consumers register.cursor.lastActiveis negative and{current time} - Math.abs(cursor.lastActive) > {threshold}cursor.lastActivewhen the first consumer registers, to avoid the issue that [fix][broker] Fix unexpected subscription deletion caused by the cursor last active time not updated in time #17573 fixed.Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: x