-
Notifications
You must be signed in to change notification settings - Fork 81
feat: add support to NegativeAcknowledge in Consumer #263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -237,4 +237,18 @@ private async ValueTask InternalAcknowledge(IEnumerable<MessageId> messageIds, C | |||||||||||||||||||
|
|
||||||||||||||||||||
| public ValueTask<IEnumerable<MessageId>> GetLastMessageIds(CancellationToken cancellationToken = default) => | ||||||||||||||||||||
| throw new NotImplementedException(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| public async ValueTask NegativeAcknowledge(MessageId messageId, CancellationToken cancellationToken) | ||||||||||||||||||||
| { | ||||||||||||||||||||
| var command = new CommandRedeliverUnacknowledgedMessages(); | ||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already have 'RedeliverUnacknowledgedMessages' on the IConsumer that does this. I haven't used negative acknowledge, but isn't it a client-side feature to have the same message delivered again in a specified TimeSpan? Meaning no interaction with the broker and enabling the user to set the TimeSpan?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although RedeliverUnacknowledgedMessages already exists in IConsumer, the NegativeAcknowledge functionality brings a level of granularity that can be very useful in scenarios where we do not want to request redelivery of all pending messages, but only those that have failed on time. The NegativeAcknowledge method allows marking specific messages to be redelivered by the broker, also aligning with the official Apache Pulsar API and facilitating use cases where fine management of retries is important (for example, when some messages fail intermittently and we don't want to affect the rest of the backlog). In this way, we give more control to the user over error and retry management, avoiding global behaviors that may be unnecessary or inefficient in certain consumption contexts. What do you think about?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have two 'RedeliverUnacknowledgedMessages' methods. One to redeliver all and one where you can specify the messageIds you want redelivered? pulsar-dotpulsar/src/DotPulsar/Abstractions/IConsumer.cs Lines 62 to 70 in e2025a0
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, currently there are two RedeliverUnacknowledgedMessages methods: one to redeliver all pending messages and another to specify the specific messageIds that you want to redeliver. The difference I propose with NegativeAcknowledge is more conceptual and alignment with the official Apache Pulsar API. While RedeliverUnacknowledgedMessages is an explicit action requesting the redeliver of messages (either all or a selection), the NegativeAcknowledge method allows to signal message delivery failures in a more direct and “semantic” way, facilitating scenarios where the workflow requires marking certain messages for retry without necessarily manually managing the IDs to redeliver. In addition, some Pulsar clients implement NegativeAcknowledge allowing retry to occur after a defined interval, which can be useful in cases of temporary/intermittent failures. That said, if you consider that both methods (RedeliverUnacknowledgedMessages for specific IDs and all) already cover the use cases, we can close the topic here. If not, my proposal is to keep the discussion going to see if it brings value from the point of view of usability and alignment with the official API.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @ivan-ciklum, DotPulsar doesn’t aim to align with the "official" API, so unless this introduces new functionality — rather than just renaming methods for consistency — I won’t be able to merge it. |
||||||||||||||||||||
| command.MessageIds.Add(messageId.ToMessageIdData()); | ||||||||||||||||||||
| await _executor.Execute(() => InternalRedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| public async ValueTask NegativeAcknowledge(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default) | ||||||||||||||||||||
| { | ||||||||||||||||||||
| var command = new CommandRedeliverUnacknowledgedMessages(); | ||||||||||||||||||||
| command.MessageIds.AddRange(messageIds.Select(messageId => messageId.ToMessageIdData())); | ||||||||||||||||||||
| await _executor.Execute(() => InternalRedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are duplicate