Skip to content

[fix][broker] The configuration subscriptionExpirationTimeMinutes does not work if the topic was reloaded frequently#25244

Open
poorbarcode wants to merge 1 commit intoapache:masterfrom
poorbarcode:fix/expire_cursor
Open

[fix][broker] The configuration subscriptionExpirationTimeMinutes does not work if the topic was reloaded frequently#25244
poorbarcode wants to merge 1 commit intoapache:masterfrom
poorbarcode:fix/expire_cursor

Conversation

@poorbarcode
Copy link
Contributor

Background

  1. The attributes cursor.active and cursor.lastActive are applied in different scenarios, they are not in the same fields.
  • cursor.active: boolean: whether the consumption catched up
  • cursor.lastActive: the latest time that the cursor was touched, such as consumer registation, consumer unregistation, reset cursors.

  1. The usages of cursor.lastActive
  • Relates to the configuration broker.conf -> subscriptionExpirationTimeMinutes. Let Broker knows whether the cursor has been idle for a long time, to correctly delete the inactive subscriptions.
  • Let users know the latest time of touching the curosr

  1. The history of changing of cursor.lastActive

3-1. update cursor.lastActive to the current timestamp when consumers register, unregister, reset cursors. Broker persists it to metadata store when closing topics.
3-2. Pulsar supports persisting cursor state to Bookies.
3-3. #17573 removed the mechanism that recovering cursor.lastActive from Metadata store, to fix an issue[1]


  • [1]: The issue that losses messages
    • topic.LAC:3:99
    • subscriptionExpirationTimeMinutes: 1 day
    • consumers: ["c1"]
      • c1 registered at 2026-02-12 00:00
    • cursor.markDeletedPosition: 3:0
      • c1 acknowledged message 3:0at 2026-02-12 00:01
      • cursor.lastActive was set to 2026-02-12 00:01 and was persisted into Metadata store
    • c1 do not acknowlege any messages anymore.
    • The broker was crashed at 2026-02-13 01:00
    • The topic was reloaded up.
      • The broker recovered cursor.lastActive to 2026-02-12 00:01
      • Since the time range 2026-02-12 00:01 ~ 2026-02-13 01:00 is longer than 1 day, the subscription was deleted, but the consumer is still connected until the broker crashed.
    • Messages 3:0(exclusive) ~ 3:99 were lost

Motivation

Issues 1. The inactive subscription can not be deleted automaticaily if bundle rebalances frequently

Issue 2: when adding the new feature that Pulsar supports persisting cursor state to Bookies, it forgot to persist cursor.lastActive to Bookies, which will also cause the issue that inactive subscription can not be deleted automaticaily.

Modifications

  • Regarding to Issue 2, persist cursor.lastActive to Bookie also, since the field is typed optional, there is no compatibilty issues in multi versions
  • Regarding toIssue 1, change the mechanism as follows:
    • Update cursor.lastActive to a negative value (v = 0 - system.currentMillis), when all consumers are unregister.
      • It is a marker that means the inactive time.
    • Update cursor.lastActive when consumers register.
      • It is the original meanning of the field.
    • How to determine whether the subscription should be deleted.
      • cursor.lastActive is negative and {current time} - Math.abs(cursor.lastActive) > {threshold}
    • Persists cursor.lastActive when the first consumer registers, to avoid the issue that [fix][broker] Fix unexpected subscription deletion caused by the cursor last active time not updated in time #17573 fixed.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

… not work if the topic was reloaded frequently
@poorbarcode poorbarcode added this to the 4.2.0 milestone Feb 12, 2026
@poorbarcode poorbarcode self-assigned this Feb 12, 2026
@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug release/4.1.4 release/4.0.10 labels Feb 12, 2026
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 12, 2026
@poorbarcode poorbarcode modified the milestones: 4.2.0, 5.0.0 Feb 12, 2026
@nodece
Copy link
Member

nodece commented Feb 13, 2026

#22794 makes cursor.lastActive memory-only, which avoids the cursor being deleted unexpectedly. This PR seems to change that behavior. It is safer not delete than to risk data loss.

@nodece
Copy link
Member

nodece commented Feb 13, 2026

@thetumbled, could you review this PR?

@lhotari lhotari changed the title [fix][broker]The configuration subscriptionExpirationTimeMinutes does… [fix][broker] The configuration subscriptionExpirationTimeMinutes does not work if the topic was reloaded frequently Feb 13, 2026
@lhotari
Copy link
Member

lhotari commented Feb 13, 2026

One additional detail about subscriptionExpirationTimeMinutes is that it won't work if the topic isn't active (there's no connected producers or other subscriptions with connected consumers on the topic). It would also be useful to document this detail although it's not directly related to this PR.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request fixes an issue where the subscriptionExpirationTimeMinutes configuration does not work correctly when topics are reloaded frequently. The fix introduces a mechanism to distinguish between active and inactive cursor states by using negative timestamps to represent inactive time and positive timestamps for active time, and ensures this state is properly persisted to both metadata store and BookKeeper ledgers.

Changes:

  • Modified the lastActive timestamp mechanism to use negative values for inactive cursors and positive values for active cursors, with the least significant bit tracking whether the value has been persisted
  • Added persistence of lastActive to BookKeeper ledgers in addition to the metadata store to prevent state loss during topic reloads
  • Updated the subscription expiration check logic to only delete subscriptions when they have negative timestamps (inactive) and exceed the expiration threshold

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 18 comments.

Show a summary per file
File Description
managed-ledger/src/main/proto/MLDataFormats.proto Added lastActive field to PositionInfo and restored the deprecated lastActive field in ManagedCursorInfo for persistence to BookKeeper
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java Added new updateLastActive(boolean) method and getLastActiveOrInActive() method; deprecated the old updateLastActive()
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java Implemented the new lastActive mechanism with negative/positive timestamps, bit manipulation for persistence tracking, and recovery logic
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java Updated all cursor.updateLastActive() calls to pass the active/inactive state and trigger persistence for the first consumer after inactive state
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java Modified checkInactiveSubscriptions to only delete subscriptions with negative (inactive) timestamps
pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CursorLastActiveTimeTest.java Added comprehensive tests validating the new lastActive mechanism across topic reloads and consumer lifecycle

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> {
return pendingAckHandle.pendingAckHandleFuture()
.thenCompose(__ -> cursor.updateLastActive(true))
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async call to cursor.updateLastActive(true) on line 245 can fail (as seen in the implementation where internalAsyncMarkDelete might fail). However, the error from this operation is not explicitly handled, and it would propagate to the caller of addConsumer. Consider whether this is the intended behavior or if the error should be logged and handled differently, especially since updating lastActive is metadata-related and might not warrant failing the entire consumer addition.

Copilot uses AI. Check for mistakes.
// Lock the Subscription object before locking the Dispatcher object to avoid deadlocks
synchronized (this) {
if (dispatcher != null && dispatcher.isConsumerConnected()) {
cursor.updateLastActive(true);
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to cursor.updateLastActive(true) on line 918 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Consider handling or logging errors from this async operation.

Copilot uses AI. Check for mistakes.
@Override
public void resetComplete(Object ctx) {
if (dispatcher != null && !dispatcher.getConsumers().isEmpty()) {
cursor.updateLastActive(true);
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to cursor.updateLastActive(true) on line 965 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Consider handling or logging errors from this async operation.

Copilot uses AI. Check for mistakes.
msgOutFromRemovedConsumer.add(stats.msgOutCounter);

if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
cursor.updateLastActive(false);
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to cursor.updateLastActive(false) on line 367 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Consider handling or logging errors from this async operation.

Copilot uses AI. Check for mistakes.
@@ -132,7 +134,7 @@ message ManagedCursorInfo {
repeated LongProperty properties = 5;

// deprecated, do not persist this field anymore
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field 'lastActive' in ManagedCursorInfo was previously marked as deprecated (line 137 removes the deprecation). While this change is backward compatible from a protobuf perspective (optional fields can be re-enabled), there may be confusion if different versions of the code have conflicting expectations. Consider adding a comment explaining that this field was temporarily deprecated but is now being used again, to help future maintainers understand the history.

Suggested change
// deprecated, do not persist this field anymore
// NOTE: this field was previously marked as deprecated and some older
// versions of the code may ignore or avoid persisting it. It is now
// intentionally used again; keep this field for backward compatibility.

Copilot uses AI. Check for mistakes.
return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> {
return pendingAckHandle.pendingAckHandleFuture()
.thenCompose(__ -> cursor.updateLastActive(true))
.thenCompose(future -> {
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda parameter name 'future' on line 246 is misleading. The result of cursor.updateLastActive(true) is a CompletableFuture<Void>, so the parameter represents a Void value, not a future. Consider renaming it to '__' or 'ignore' for clarity, similar to the parameter on line 245.

Suggested change
.thenCompose(future -> {
.thenCompose(__ -> {

Copilot uses AI. Check for mistakes.
@Override
public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
cursor.updateLastActive();
cursor.updateLastActive(true);
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to cursor.updateLastActive(true) on line 356 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Since removeConsumer is a synchronized method that can throw BrokerServiceException, consider whether the async operation should be awaited or if errors should be handled. Alternatively, if the operation is intentionally fire-and-forget, add a comment explaining why.

Copilot uses AI. Check for mistakes.
@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
cursor.updateLastActive();
cursor.updateLastActive(true);
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to cursor.updateLastActive(true) on line 438 returns a CompletableFuture<Void> that is being ignored. If the future completes exceptionally, the exception will be silently swallowed. Consider handling or logging errors from this async operation.

Copilot uses AI. Check for mistakes.
* Get the last active time or inactive time of the cursor.
*
* @return If the value is negative, it indicates the last inactive time and has not been active since then.
* If the value is positive, it indicates the last active time (the last time delete was called, markDelete).
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation comment on line 2805 says "the last time delete was called, markDelete" which is confusing. It's unclear what "delete was called" means in this context. Consider clarifying this to say "the last time the cursor was actively used (e.g., when messages were acknowledged or mark-delete was called)" to make it more understandable.

Suggested change
* If the value is positive, it indicates the last active time (the last time delete was called, markDelete).
* If the value is positive, it indicates the last time the cursor was actively used (for example, when
* messages were acknowledged or {@code markDelete} was called).

Copilot uses AI. Check for mistakes.
void initialize(Position position, Map<String, Long> properties, Map<String, String> cursorProperties,
final VoidCallback callback) {
recoveredCursor(position, properties, cursorProperties, null);
// When a cursor is creating, no consumers has connected, so we mark it as inactive state.
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "has" should be "have" to match the subject "consumers". The sentence should read "When a cursor is creating, no consumers have connected..."

Suggested change
// When a cursor is creating, no consumers has connected, so we mark it as inactive state.
// When a cursor is creating, no consumers have connected, so we mark it as inactive state.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs release/4.0.10 release/4.1.4 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants