Skip to content

[fix][client] Fix failed to close consumer because of the error: param memorySize is a negative value#25805

Open
poorbarcode wants to merge 7 commits into
apache:masterfrom
poorbarcode:fix/msg_lost_2
Open

[fix][client] Fix failed to close consumer because of the error: param memorySize is a negative value#25805
poorbarcode wants to merge 7 commits into
apache:masterfrom
poorbarcode:fix/msg_lost_2

Conversation

@poorbarcode
Copy link
Copy Markdown
Contributor

@poorbarcode poorbarcode commented May 18, 2026

Motivation

Time/Threads consumer.receiveMessage Close Pulsar Source Receive messages from Server
1 push a message into consumer.incomingMessages
2 pop a message from consumer.incomingMessages
3 reduce incomingMessagesSize: 0 -> -45
4 Close Pulsar Source
5 Close producer
6 Failed due to Try to reserve/release memory failed, the param memorySize is a negative value
7 plus consumer.incomingMessagesSize

Modifications

  • let increasingincomingMessagesSize happen before pushing new messages into the queue
    • Since it is hard to add a test to reproduce the issue, I skipped writing the test

Other things to do

The accuracy of consumer.incomingMessagesSize(which was only used for batch receive API) still has issues, but this PR does not involve any related fixes

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

…ose consumer because of the error: param memorySize is a negative value
@poorbarcode poorbarcode self-assigned this May 18, 2026
@poorbarcode poorbarcode added this to the 5.0.0-M1 milestone May 18, 2026
@poorbarcode poorbarcode added the type/bug The PR fixed a bug or issue reported a bug label May 18, 2026
Comment thread pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java Outdated
@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 22, 2026

Let increasing incomingMessagesSize and decreasing incomingMessagesSize run in the same thread

  • Since it is hard to add a test to reproduce the issue, I skipped writing the test

Claude Code local review provided this type of analysis about this.

[INTENT MISMATCH] Race not fully eliminated — multiple decreaseIncomingMessageSize sites still run on the calling thread
The PR description says "Let increasing incomingMessagesSize and decreasing incomingMessagesSize run in the same thread", but only 4 of ~9 call sites are migrated. Not wrapped:
- ConsumerImpl.removeExpiredMessagesFromQueue (ConsumerImpl.java:2985) — runs from negative-ack / expiry tasks
- MultiTopicsConsumerImpl.removeExpiredMessagesFromQueue (MultiTopicsConsumerImpl.java:953)
- MultiTopicsConsumerImpl.messageProcessed (MultiTopicsConsumerImpl.java:351) — although in practice only invoked from notifyPendingBatchReceivedCallBack which already runs on
internalPinnedExecutor, so this is likely safe; worth a comment explaining the asymmetry.
- MultiTopicsConsumerImpl.internalReceiveAsync (MultiTopicsConsumerImpl.java:468) — already inside internalPinnedExecutor.execute(...), so OK.
The first two can still hit the original race. Consider routing all decrease* through the pinned executor, or push the serialization into ConsumerBase.decreaseIncomingMessageSize
itself so callers don't have to remember.

Is it necessary to handle these findings?

@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 22, 2026

Another Claude finding

New error log on pendingReceives.isEmpty() branch can false-fire under cancellation
notifyPendingReceivedCallback is called from ConsumerImpl.java:1383 after hasNextPendingReceive() returns true, but a concurrent cancel()/timeout on the pending future can empty
pendingReceives before this method runs (and the cancel runs on the same internalPinnedExecutor, but not all flows do). Logging this at ERROR with "you encountered a bug" risks alert
fatigue. Consider DEBUG, or also gate by pendingReceives being unexpectedly empty despite no recent cancellation.

@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 22, 2026

6 Failed due to Try to reserve/release memory failed, the param memorySize is a negative value

Do you have an example stack trace?

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reconsider the fix based on the below Claude Code assisted analysis:

The threading-model change in the PR addresses a symptom path (a visible thread on which the decrement runs), but it neither covers all call sites nor pinpoints the root race.

The actual race is in ConsumerBase.enqueueMessageAndCheckBatchReceive (ConsumerBase.java:984-1001):

// synchronize redeliverUnacknowledgedMessages().
incomingQueueLock.lock();
try {
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message
// instance anymore, since for pooled messages, this instance was possibly already been released
// and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
} finally {
incomingQueueLock.unlock();
}

incomingMessages.offer(...) makes the message visible to any consumer thread before the counter is incremented. The consumer thread can then take() it and call decreaseIncomingMessageSize, driving the counter negative exactly as described in the PR's motivation table. The decrement path doesn't take incomingQueueLock, so the lock here only orders enqueues, not increment-vs-decrement.

Given that, there is a cleaner way to fix this than relocating threads.

Root-cause fix in enqueueMessageAndCheckBatchReceive — reserve memory and bump the counter before offer(...), with rollback if offer fails. After this, the invariant "counter ≥ sum of sizes currently in incomingMessages" holds, and the race goes away regardless of which thread decrements.

The PR's current approach has two weaknesses against either of these:

  • It only relocates some decreaseIncomingMessageSize call sites (removeExpiredMessagesFromQueue in both ConsumerImpl and MultiTopicsConsumerImpl are untouched), so the same race can still drive the counter negative through those paths.
  • It silently introduces a "decrement happens after receive() returns" semantics change for the sync receive paths — observable in getIncomingMessageSize() and MemoryLimitController headroom — without test coverage.

@poorbarcode
Copy link
Copy Markdown
Contributor Author

@lhotari

Improved solution: let increasing incoming messages size happen before pushing new messages into the queue. Thanks for your suggestion

@poorbarcode poorbarcode requested review from lhotari and removed request for lhotari May 22, 2026 08:38
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

nit: would it make sense to have the log message changes in a separate PR? That could make it easier to backport changes.

@poorbarcode
Copy link
Copy Markdown
Contributor Author

nit: would it make sense to have the log message changes in a separate PR? That could make it easier to backport changes.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants