-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker]Fixed an issue where the entire subscription would be blocked when a chunk message with an ID of zero did not exist. #25120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ocked when a chunk message with an ID of zero did not exist.
|
@zjxxzjwang Please add the following content to your PR description and select a checkbox: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical issue where subscriptions become blocked when chunk messages have problems: either when chunk0 is missing, or when a consumer's available permits remain negative. The fix acknowledges and logs orphaned non-first chunks instead of endlessly retrying them, and removes permit checking for subsequent chunks after the first chunk to prevent permit-related blocking.
Key changes:
- Modified chunk message handling to directly acknowledge non-first chunks when chunk0 is missing
- Removed permit validation for chunk messages after the first chunk to prevent blocking when permits are exhausted
- Added subscription parameter to SharedConsumerAssignor to enable acknowledgment of orphaned chunks
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java | Core fix: inline chunk handling logic, add subscription parameter for acknowledgment, remove getConsumerForUuid method, and handle missing chunk0 scenario |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java | Pass subscription to SharedConsumerAssignor constructor |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java | Pass subscription to SharedConsumerAssignor constructor |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java | Update test to accommodate new constructor signature with null subscription |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
Outdated
Show resolved
Hide resolved
|
@zjxxzjwang please resolve the merge conflict |
ok |
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
|
@lhotari Hello, please review this PR. It fixes a critical bug related to the existence of chunk messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
Outdated
Show resolved
Hide resolved
…ocked when a chunk message with an ID of zero did not exist.
…ocked when a chunk message with an ID of zero did not exist.
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the review comments. I think that this behavior should be active only when autoSkipNonRecoverableData is enabled in broker.conf. That would be a safe option for avoiding unintentional data loss.
| consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entry); | ||
| } | ||
|
|
||
| private boolean skipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skipChunk sounds like a command, it's better to rename it to make it sound like a query.
| private boolean skipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) { | |
| private boolean shouldSkipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) { |
| if (subscription != null) { | ||
| log.warn("[{}][{}] Skip the message because it is not the first chunk." | ||
| + " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", | ||
| subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), | ||
| metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); | ||
| // Directly ack the message. | ||
| if (!(subscription instanceof PulsarCompactorSubscription)) { | ||
| subscription.acknowledgeMessage(Collections.singletonList( | ||
| entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap()); | ||
| } | ||
| } | ||
| entryAndMetadata.release(); | ||
| return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's a potential for data loss, acknowledging the messages should be active only if autoSkipNonRecoverableData is set in broker.conf. The log message should be logged with ERROR level when autoSkipNonRecoverableData isn't set and the message shouldn't get acknowledged. I think it's fine to skip the message in that case so that processing the subscription continues, but there will be a backlog left behind due to the unacked messages.
| if (consumerForUuid == null) { | ||
| if (skipChunk(entryAndMetadata, metadata)) { | ||
| return availablePermits; | ||
| } | ||
| consumerForUuid = consumer; | ||
| uuidToConsumer.put(uuid, consumerForUuid); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this solution would only skip the first entry of possibly multiple chunk entries.
Let's say if entry with chunkId 0 got lost and there would be subsequent entries chunkId 1, chunkId 2 and chunkId 3. The entries with chunkId 2 and chunkId 3 would get delivered to the client, causing a similar issue.
Motivation
1、A message is split into chunk0, chunk1, and chunk2. When chunk0 is unexpectedly lost, the current code logic will endlessly deliver chunk1 and chunk2 to redeliveryMessages, causing the entire subscription to become blocked.
2、When a chunk message corresponds to a producer whose availablePermits remain below zero for an extended period, that chunk message will also be endlessly delivered to the “redeliveryMessages” queue, similarly causing the entire subscription to become blocked.
Modifications
1、In the event of chunk0 being unexpectedly lost, chunk1 and chunk2 messages are directly acknowledged and logged without blocking the entire subscription.
2、When sending each chunk of the same message, the availablePermits of the consumer corresponding to that chunk message are no longer checked, ensuring normal delivery.
Documentation
docdoc-requireddoc-not-neededdoc-complete