Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
}

//this method is for individual ack not carry the transaction
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
@VisibleForTesting
CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Pair<Consumer, Position>> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
Expand Down Expand Up @@ -634,7 +635,8 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,


//this method is for individual ack carry the transaction
private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
@VisibleForTesting
CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
// Individual ack
List<Pair<Consumer, MutablePair<Position, Integer>>> positionsAcked = new ArrayList<>();
if (!isTransactionEnabled()) {
Expand All @@ -648,11 +650,6 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
Position position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null);
ObjectIntPair<Consumer> ackOwnerConsumerAndBatchSize = getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(),
msgId.getEntryId());
if (ackOwnerConsumerAndBatchSize == null) {
log.warn("[{}] [{}] Acknowledging message at {} that was already deleted", subscription,
consumerId, position);
continue;
}
Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left();
// acked count at least one
long ackedCount;
Expand All @@ -677,6 +674,10 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
AckSetStateUtil.getAckSetState(position).setAckSet(ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
}
if (ackOwnerConsumerAndBatchSize.rightInt() <= 0) {
// this means this message does not exist,so we need to set ackedCount to 0
ackedCount = 0;
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

Expand All @@ -686,7 +687,6 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {

totalAckCount.add(ackedCount);
}

CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList()));
if (Subscription.isIndividualAckMode(subType)) {
Expand Down Expand Up @@ -795,6 +795,7 @@ private ObjectIntPair<Consumer> getAckOwnerConsumerAndBatchSize(long ledgerId, l
}
}
}
return ObjectIntPair.of(this, 0);
}
return ObjectIntPair.of(this, 1);
}
Expand Down Expand Up @@ -1076,13 +1077,12 @@ public int hashCode() {
}

/**
* first try to remove ack-position from the current_consumer's pendingAcks.
* if ack-message doesn't present into current_consumer's pendingAcks
* a. try to remove from other connected subscribed consumers (It happens when client
* tries to acknowledge message through different consumer under the same subscription)
*
* Removes the specified position from the pending acknowledgments of the given consumer.
*
* @param position
* @param ackOwnedConsumer the consumer that owns the pending acknowledgment to be removed
* @param position the message position to be removed from the pending acknowledgments
* @return {@code true} if the position was successfully removed from the pending acknowledgments,
* {@code false} if the position was already removed or does not exist in the pending list
*/
private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position) {
PendingAcksMap ownedConsumerPendingAcks = ackOwnedConsumer.getPendingAcks();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static java.util.Collections.emptyMap;
import static org.apache.pulsar.common.api.proto.CommandAck.AckType.Individual;
import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared;
import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
import org.apache.pulsar.common.util.Codec;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class MessageIndividualAckTest {
private final int consumerId = 1;

private ServerCnx serverCnx;
private PersistentSubscription sub;
private PulsarTestContext pulsarTestContext;

@DataProvider(name = "individualAckModes")
public static Object[][] individualAckModes() {
return new Object[][]{
{Shared},
{Key_Shared},
};
}


@BeforeMethod
public void setup() throws Exception {
pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
.build();
serverCnx = pulsarTestContext.createServerCnxSpy();
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
AsyncDualMemoryLimiter maxTopicListInFlightLimiter = mock(AsyncDualMemoryLimiter.class);
doReturn(new PulsarCommandSenderImpl(null, serverCnx, maxTopicListInFlightLimiter))
.when(serverCnx).getCommandSender();

String topicName = TopicName.get("MessageIndividualAckTest").toString();
var mockManagedLedger = mock(ManagedLedger.class);
when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig());
var persistentTopic = new PersistentTopic(topicName, mockManagedLedger, pulsarTestContext.getBrokerService());
ManagedCursor cursor = mock(ManagedCursor.class);
doReturn(Codec.encode("sub-1")).when(cursor).getName();

sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
cursor, false));
doNothing().when(sub).acknowledgeMessage(any(), any(), any());
}

@AfterMethod(alwaysRun = true)
public void shutdown() throws Exception {
if (pulsarTestContext != null) {
pulsarTestContext.close();
pulsarTestContext = null;
}
sub = null;
}

@Test(timeOut = 5000, dataProvider = "individualAckModes")
public void testIndividualAckNormalWithMessageNotExist(CommandSubscribe.SubType subType) throws Exception {
KeySharedMeta keySharedMeta =
subType == Key_Shared ? new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT) : null;
Consumer consumer = new Consumer(sub, subType, "testIndividualAckNormal", consumerId, 0,
"Cons1", true, serverCnx, "myrole-1", emptyMap(), false, keySharedMeta,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer);
// mock a not exist ledger id and entry id
final long notExistLedgerId = 9999L;
final long notExistEntryId = 9999L;
// mock an exist ledger id and entry id
final long existLedgerId = 99L;
final long existEntryId = 99L;
// ack one message that not exists in the ledger and individualAckNormal() should return 0
CommandAck commandAck = new CommandAck();
commandAck.setAckType(Individual);
commandAck.setConsumerId(consumerId);
commandAck.addMessageId().setEntryId(notExistEntryId).setLedgerId(notExistLedgerId);
Long l1 = consumer.individualAckNormal(commandAck, null).get();
Assert.assertEquals(0L, l1.longValue());

// ack two messages that one exists and the other not and individualAckNormal() should return 1
consumer.getPendingAcks().addPendingAckIfAllowed(existLedgerId, existEntryId, 1, 99);
commandAck = new CommandAck();
commandAck.setAckType(Individual);
commandAck.setConsumerId(consumerId);
commandAck.addMessageId().setEntryId(notExistEntryId).setLedgerId(notExistLedgerId);
commandAck.addMessageId().setEntryId(existEntryId).setLedgerId(existLedgerId);
Long l2 = consumer.individualAckNormal(commandAck, null).get();
Assert.assertEquals(1L, l2.longValue());
}


@Test(timeOut = 5000, dataProvider = "individualAckModes")
public void testIndividualAckWithTransactionWithMessageNotExist(CommandSubscribe.SubType subType) throws Exception {
KeySharedMeta keySharedMeta =
subType == Key_Shared ? new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT) : null;
Consumer consumer = new Consumer(sub, subType, "testIndividualAck", consumerId, 0,
"Cons1", true, serverCnx, "myrole-1", emptyMap(), false, keySharedMeta,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
pulsarTestContext.getPulsarService().getConfig().setTransactionCoordinatorEnabled(true);
sub.addConsumer(consumer);
doNothing().when(sub).addUnAckedMessages(anyInt());
CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
when(sub.transactionIndividualAcknowledge(any(), any())).thenReturn(completedFuture);
// A not exist ledger id and entry id
final long notExistLedgerId = 9999L;
final long notExistEntryId = 9999L;
// Ack one message that not exists in the ledger and individualAckWithTransaction() should return 0
CommandAck commandAck = new CommandAck();
commandAck.setTxnidMostBits(1L);
commandAck.setTxnidLeastBits(1L);
commandAck.setAckType(Individual);
commandAck.setConsumerId(consumerId);
commandAck.addMessageId().setEntryId(notExistEntryId).setLedgerId(notExistLedgerId);
Long l1 = consumer.individualAckWithTransaction(commandAck).get();
Assert.assertEquals(l1.longValue(), 0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1529,9 +1529,11 @@ public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exce
Transaction txn = getTxn();
consumer.acknowledgeAsync(messageIds.get(1), txn).get();

// ack one message, the unack count is 4
// ack one message, the unack count is 5(unack count should remain unchanged here,
// as the redelivery operation has already deducted the entire batch's count from the unacknowledged total.
// Reducing it again would result in double counting)
assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 5);

// cleanup.
txn.abort().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ public static class TransactionHasOperationFailedException extends PulsarClientE
* Constructs an {@code TransactionHasOperationFailedException}.
*/
public TransactionHasOperationFailedException() {
super("Now allowed to commit the transaction due to failed operations of producing or acknowledgment");
super("Not allowed to commit the transaction due to failed operations of producing or acknowledgment");
}

/**
Expand Down
Loading