Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/DotPulsar/Abstractions/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,19 @@ public interface IConsumer : IGetLastMessageIds, ISeek, IStateHolder<ConsumerSta
/// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken = default);

/// <summary>
/// Acknowledge the consumption of a single message using the MessageId.
/// </summary>
/// <summary>
Copy link

Copilot AI May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are duplicate

tags in the XML documentation for the acknowledgment methods. Consider removing the redundant tag to ensure clarity and adherence to XML documentation standards.

Copilot uses AI. Check for mistakes.
/// Negative acknowledge the consumption of a single message using the MessageId.
/// This will tell the broker the message was not processed successfully and should be redelivered later.
/// </summary>
ValueTask NegativeAcknowledge(MessageId messageId, CancellationToken cancellationToken = default);

/// <summary>
/// Negative acknowledge the consumption of multiple messages using the MessageIds.
/// This will tell the broker the messages were not processed successfully and should be redelivered later.
/// </summary>
ValueTask NegativeAcknowledge(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default);
}
7 changes: 7 additions & 0 deletions src/DotPulsar/Extensions/ConsumerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,11 @@ public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this IConsu
var currentState = await consumer.State.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
return new ConsumerStateChanged(consumer, currentState);
}

/// <summary>
/// Negative acknowledge the consumption of a single message.
/// This will tell the broker the message was not processed successfully and should be redelivered later.
/// </summary>
public static async ValueTask NegativeAcknowledge(this IConsumer consumer, IMessage message, CancellationToken cancellationToken = default)
=> await consumer.NegativeAcknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
}
30 changes: 30 additions & 0 deletions src/DotPulsar/Internal/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,36 @@ public async ValueTask<IEnumerable<MessageId>> GetLastMessageIds(CancellationTok
return messageIds;
}

public async ValueTask NegativeAcknowledge(MessageId messageId, CancellationToken cancellationToken)
{
await Guard(cancellationToken).ConfigureAwait(false);

if (_singleSubConsumer is not null)
await _singleSubConsumer.NegativeAcknowledge(messageId, cancellationToken).ConfigureAwait(false);
else
await _subConsumers[messageId.Topic].NegativeAcknowledge(messageId, cancellationToken).ConfigureAwait(false);
}

public async ValueTask NegativeAcknowledge(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default)
{
await Guard(cancellationToken).ConfigureAwait(false);

if (_singleSubConsumer is not null)
{
await _singleSubConsumer.NegativeAcknowledge(messageIds, cancellationToken).ConfigureAwait(false);
return;
}

var groupedMessageIds = messageIds.GroupBy(messageIds => messageIds.Topic);
var nackTasks = new List<Task>();
foreach (var group in groupedMessageIds)
{
nackTasks.Add(_subConsumers[group.Key].NegativeAcknowledge(group, cancellationToken).AsTask());
}

await Task.WhenAll(nackTasks).ConfigureAwait(false);
}

private SubConsumer<TMessage> CreateSubConsumer(string topic)
{
var correlationId = Guid.NewGuid();
Expand Down
14 changes: 14 additions & 0 deletions src/DotPulsar/Internal/SubConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,18 @@ private async ValueTask InternalAcknowledge(IEnumerable<MessageId> messageIds, C

public ValueTask<IEnumerable<MessageId>> GetLastMessageIds(CancellationToken cancellationToken = default) =>
throw new NotImplementedException();

public async ValueTask NegativeAcknowledge(MessageId messageId, CancellationToken cancellationToken)
{
var command = new CommandRedeliverUnacknowledgedMessages();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have 'RedeliverUnacknowledgedMessages' on the IConsumer that does this. I haven't used negative acknowledge, but isn't it a client-side feature to have the same message delivered again in a specified TimeSpan? Meaning no interaction with the broker and enabling the user to set the TimeSpan?

Copy link
Copy Markdown
Author

@ivan-ciklum ivan-ciklum May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although RedeliverUnacknowledgedMessages already exists in IConsumer, the NegativeAcknowledge functionality brings a level of granularity that can be very useful in scenarios where we do not want to request redelivery of all pending messages, but only those that have failed on time.

The NegativeAcknowledge method allows marking specific messages to be redelivered by the broker, also aligning with the official Apache Pulsar API and facilitating use cases where fine management of retries is important (for example, when some messages fail intermittently and we don't want to affect the rest of the backlog).

In this way, we give more control to the user over error and retry management, avoiding global behaviors that may be unnecessary or inefficient in certain consumption contexts.

What do you think about?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two 'RedeliverUnacknowledgedMessages' methods. One to redeliver all and one where you can specify the messageIds you want redelivered?

/// <summary>
/// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default);
/// <summary>
/// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken = default);

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, currently there are two RedeliverUnacknowledgedMessages methods: one to redeliver all pending messages and another to specify the specific messageIds that you want to redeliver.

The difference I propose with NegativeAcknowledge is more conceptual and alignment with the official Apache Pulsar API. While RedeliverUnacknowledgedMessages is an explicit action requesting the redeliver of messages (either all or a selection), the NegativeAcknowledge method allows to signal message delivery failures in a more direct and “semantic” way, facilitating scenarios where the workflow requires marking certain messages for retry without necessarily manually managing the IDs to redeliver.

In addition, some Pulsar clients implement NegativeAcknowledge allowing retry to occur after a defined interval, which can be useful in cases of temporary/intermittent failures.

That said, if you consider that both methods (RedeliverUnacknowledgedMessages for specific IDs and all) already cover the use cases, we can close the topic here. If not, my proposal is to keep the discussion going to see if it brings value from the point of view of usability and alignment with the official API.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ivan-ciklum, DotPulsar doesn’t aim to align with the "official" API, so unless this introduces new functionality — rather than just renaming methods for consistency — I won’t be able to merge it.

command.MessageIds.Add(messageId.ToMessageIdData());
await _executor.Execute(() => InternalRedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
}

public async ValueTask NegativeAcknowledge(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default)
{
var command = new CommandRedeliverUnacknowledgedMessages();
command.MessageIds.AddRange(messageIds.Select(messageId => messageId.ToMessageIdData()));
await _executor.Execute(() => InternalRedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
}
}
176 changes: 176 additions & 0 deletions tests/DotPulsar.Tests/Internal/ConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,182 @@ public async Task Connectivity_WhenConnectionIsInitiallyUpAndReconnects_ShouldBe
exception.ShouldBeNull();
}

[Fact]
public async Task NegativeAcknowledge_GivenSingleMessage_ShouldBeRedelivered()
{
// Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);

var content = "test-nack-message";
var messageId = await producer.Send(content, _cts.Token);

// Receive the message the first time
var message = await consumer.Receive(_cts.Token);
message.MessageId.ShouldBe(messageId);
message.Value().ShouldBe(content);

// Act
await consumer.NegativeAcknowledge(message.MessageId, _cts.Token);

// The message should be redelivered
var redeliveredMessage = await consumer.Receive(_cts.Token);

// Assert
redeliveredMessage.Value().ShouldBe(content);

// Verify message was redelivered (same content, may or may not have redelivery count)
// En algunas versiones de Pulsar, RedeliveryCount no se incrementa inmediatamente
// después de un negative acknowledgment
redeliveredMessage.Value().ShouldBe(message.Value());

// Clean up - acknowledge the message so it doesn't get redelivered further
await consumer.Acknowledge(redeliveredMessage.MessageId, _cts.Token);
}

[Fact]
public async Task NegativeAcknowledge_GivenMultipleMessages_ShouldAllBeRedelivered()
{
// Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
const int numberOfMessages = 5;

await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);

var messageIds = new List<MessageId>();
var contents = new List<string>();

// Produce and receive messages
for (var i = 0; i < numberOfMessages; i++)
{
var content = $"test-nack-message-{i}";
contents.Add(content);
messageIds.Add(await producer.Send(content, _cts.Token));
}

var receivedMessages = new List<IMessage<string>>();
for (var i = 0; i < numberOfMessages; i++)
{
receivedMessages.Add(await consumer.Receive(_cts.Token));
}

// Act
await consumer.NegativeAcknowledge(receivedMessages.Select(m => m.MessageId), _cts.Token);

// Assert
var redeliveredContents = new List<string>();
for (var i = 0; i < numberOfMessages; i++)
{
var redeliveredMessage = await consumer.Receive(_cts.Token);
redeliveredContents.Add(redeliveredMessage.Value());

// Acknowledge to prevent further redeliveries
await consumer.Acknowledge(redeliveredMessage.MessageId, _cts.Token);
}

// Verificamos que todos los contenidos originales se hayan reentregado
redeliveredContents.ShouldBe(contents, ignoreOrder: true);
}

[Fact]
public async Task NegativeAcknowledge_GivenPartitionedTopic_ShouldRedeliverMessagesFromAllPartitions()
{
// Arrange
const int numberOfMessages = 9;
const int partitions = 3;
var topicName = await _fixture.CreatePartitionedTopic(partitions, _cts.Token);

await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);

// Produce messages to ensure they spread across partitions
var messageContents = new List<string>();
for (var i = 0; i < numberOfMessages; i++)
{
var content = $"test-nack-message-{i}";
messageContents.Add(content);
await producer.Send(content, _cts.Token);
}

// Receive all messages
var receivedMessages = new List<IMessage<string>>();
for (var i = 0; i < numberOfMessages; i++)
{
receivedMessages.Add(await consumer.Receive(_cts.Token));
}

// Group messages by partition
var messagesByPartition = receivedMessages.GroupBy(m => m.MessageId.Partition)
.ToDictionary(g => g.Key, g => g.ToList());

// Verify we have messages from different partitions
messagesByPartition.Count.ShouldBeGreaterThan(1);

// Act - negative acknowledge all messages
await consumer.NegativeAcknowledge(receivedMessages.Select(m => m.MessageId), _cts.Token);

// Assert - all messages should be redelivered
var redeliveredContents = new List<string>();
var redeliveredPartitions = new HashSet<int>();

for (var i = 0; i < numberOfMessages; i++)
{
var redeliveredMessage = await consumer.Receive(_cts.Token);
redeliveredContents.Add(redeliveredMessage.Value());
redeliveredPartitions.Add(redeliveredMessage.MessageId.Partition);

// Acknowledge to prevent further redeliveries
await consumer.Acknowledge(redeliveredMessage.MessageId, _cts.Token);
}

// Verify all original messages were redelivered
redeliveredContents.ShouldBe(messageContents, ignoreOrder: true);

// Verify messages came from multiple partitions
redeliveredPartitions.Count.ShouldBeGreaterThan(1);
}

[Fact]
public async Task NegativeAcknowledge_WhenConnectionIsDisconnected_ShouldRedeliverAfterReconnect()
{
// Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
await using var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName);
await using var producer = CreateProducer(client, topicName);

var messageContent = "test-nack-reconnect";

// Produce and consume a message
await producer.Send(messageContent, _cts.Token);
var message = await consumer.Receive(_cts.Token);
message.Value().ShouldBe(messageContent);

// Act - disconnect and nack the message
await using (await _fixture.DisableThePulsarConnection())
{
await consumer.StateChangedTo(ConsumerState.Disconnected, _cts.Token);
}

// Wait for reconnection
await consumer.StateChangedTo(ConsumerState.Active, _cts.Token);

// Now nack the message
await consumer.NegativeAcknowledge(message.MessageId, _cts.Token);

// Assert - should receive the message again
var redeliveredMessage = await consumer.Receive(_cts.Token);
redeliveredMessage.Value().ShouldBe(messageContent);

// Clean up
await consumer.Acknowledge(redeliveredMessage.MessageId, _cts.Token);
}

private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer<string> producer, int numberOfMessages, string content, CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
Expand Down