Skip to content

Conversation

@zjxxzjwang
Copy link
Contributor

@zjxxzjwang zjxxzjwang commented Dec 30, 2025

Motivation

1、A message is split into chunk0, chunk1, and chunk2. When chunk0 is unexpectedly lost, the current code logic will endlessly deliver chunk1 and chunk2 to redeliveryMessages, causing the entire subscription to become blocked.

2、When a chunk message corresponds to a producer whose availablePermits remain below zero for an extended period, that chunk message will also be endlessly delivered to the “redeliveryMessages” queue, similarly causing the entire subscription to become blocked.

Modifications

1、In the event of chunk0 being unexpectedly lost, chunk1 and chunk2 messages are directly acknowledged and logged without blocking the entire subscription.

2、When sending each chunk of the same message, the availablePermits of the consumer corresponding to that chunk message are no longer checked, ensuring normal delivery.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

…ocked when a chunk message with an ID of zero did not exist.
@github-actions
Copy link

@zjxxzjwang Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a critical issue where subscriptions become blocked when chunk messages have problems: either when chunk0 is missing, or when a consumer's available permits remain negative. The fix acknowledges and logs orphaned non-first chunks instead of endlessly retrying them, and removes permit checking for subsequent chunks after the first chunk to prevent permit-related blocking.

Key changes:

  • Modified chunk message handling to directly acknowledge non-first chunks when chunk0 is missing
  • Removed permit validation for chunk messages after the first chunk to prevent blocking when permits are exhausted
  • Added subscription parameter to SharedConsumerAssignor to enable acknowledgment of orphaned chunks

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.

File Description
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java Core fix: inline chunk handling logic, add subscription parameter for acknowledgment, remove getConsumerForUuid method, and handle missing chunk0 scenario
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java Pass subscription to SharedConsumerAssignor constructor
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java Pass subscription to SharedConsumerAssignor constructor
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java Update test to accommodate new constructor signature with null subscription

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@lhotari
Copy link
Member

lhotari commented Jan 2, 2026

@zjxxzjwang please resolve the merge conflict

@zjxxzjwang
Copy link
Contributor Author

@zjxxzjwang please resolve the merge conflict

ok

# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@zjxxzjwang
Copy link
Contributor Author

@lhotari Hello, please review this PR. It fixes a critical bug related to the existence of chunk messages.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…ocked when a chunk message with an ID of zero did not exist.
…ocked when a chunk message with an ID of zero did not exist.
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the review comments. I think that this behavior should be active only when autoSkipNonRecoverableData is enabled in broker.conf. That would be a safe option for avoiding unintentional data loss.

consumerToEntries.computeIfAbsent(consumer, __ -> new ArrayList<>()).add(entry);
}

private boolean skipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipChunk sounds like a command, it's better to rename it to make it sound like a query.

Suggested change
private boolean skipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) {
private boolean shouldSkipChunk(EntryAndMetadata entryAndMetadata, MessageMetadata metadata) {

Comment on lines +149 to +161
if (subscription != null) {
log.warn("[{}][{}] Skip the message because it is not the first chunk."
+ " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message.
if (!(subscription instanceof PulsarCompactorSubscription)) {
subscription.acknowledgeMessage(Collections.singletonList(
entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap());
}
}
entryAndMetadata.release();
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's a potential for data loss, acknowledging the messages should be active only if autoSkipNonRecoverableData is set in broker.conf. The log message should be logged with ERROR level when autoSkipNonRecoverableData isn't set and the message shouldn't get acknowledged. I think it's fine to skip the message in that case so that processing the subscription continues, but there will be a backlog left behind due to the unacked messages.

Comment on lines +112 to +118
if (consumerForUuid == null) {
if (skipChunk(entryAndMetadata, metadata)) {
return availablePermits;
}
consumerForUuid = consumer;
uuidToConsumer.put(uuid, consumerForUuid);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this solution would only skip the first entry of possibly multiple chunk entries.
Let's say if entry with chunkId 0 got lost and there would be subsequent entries chunkId 1, chunkId 2 and chunkId 3. The entries with chunkId 2 and chunkId 3 would get delivered to the client, causing a similar issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants