-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][client] Prevent epoch race in MultiTopicsConsumer batch receive #25210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[fix][client] Prevent epoch race in MultiTopicsConsumer batch receive #25210
Conversation
|
@ChimdumebiNebolisa Please add the following content to your PR description and select a checkbox: |
fa110d7 to
9602f5c
Compare
5ed0fda to
a83cb0d
Compare
| @@ -0,0 +1,185 @@ | |||
| # Investigation: GitHub issue #25204 – MultiTopicsConsumer receives message with older ConsumerEpoch after redeliverUnacknowledgedMessages() | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not commit the investigation document to the code base. You can post it to Gist and share the link in the PR description
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerEpochRaceTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerEpochRaceTest.java
Outdated
Show resolved
Hide resolved
|
Thanks for the review. I’ve updated the test to address the feedback:
The remaining reflection is only for private members with no stable test seam ( |
Fixes #25204
Motivation
MultiTopicsConsumerprocesses a received batch by validating the consumer epoch per message. IfredeliverUnacknowledgedMessages()runs concurrently, it can incrementconsumerEpochwhile the batch loop is still iterating, which can produce a mixed outcome where part of the batch is accepted and the rest is filtered. This matches the behavior reported in #25204.Modifications
incomingQueueLockacross the entire batch loop and theincomingMessages.size()read, soconsumerEpochcannot change mid-batch.MultiTopicsConsumerEpochRaceTestto assert the post-fix invariant:acceptedByEpochCount == 2)Verifying this change
flaky, so it must not be excluded):mvn --% -pl pulsar-client "-Dtest=org.apache.pulsar.client.impl.MultiTopicsConsumerEpochRaceTest" "-DexcludedGroups=quarantine" testmvn -pl pulsar-client testDoes this pull request potentially affect one of the following parts
The threading modelDocumentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: ChimdumebiNebolisa#2