Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ delayedDeliveryFixedDelayDetectionLookahead=50000
delayedDeliveryMaxDelayInMillis=0

# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false
acknowledgmentAtBatchIndexLevelEnabled=true

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
private long delayedDeliveryMaxDelayInMillis = 0;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;
private boolean acknowledgmentAtBatchIndexLevelEnabled = true;

@FieldContext(
category = CATEGORY_WEBSOCKET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,6 @@ public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception {
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(subscriptionName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.topic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ protected void startBroker() throws Exception {
conf.setConfigurationMetadataStoreUrl("zk:localhost:3181");
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setBookkeeperClientExposeStatsToPrometheus(true);
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);

conf.setBrokerShutdownTimeoutMs(0L);
conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,6 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp
.isAckReceiptEnabled(true)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
super.baseSetup();
}

Expand All @@ -87,7 +86,6 @@ public void testBatchMessageAck() {
.subscriptionName(subscriptionName)
.receiverQueueSize(50)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();

Expand Down Expand Up @@ -212,7 +210,6 @@ public void testBatchMessageMultiNegtiveAck() throws Exception{
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(10)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();

Expand Down Expand Up @@ -254,7 +251,6 @@ public void testBatchMessageMultiNegtiveAck() throws Exception{
.subscriptionName(subscriptionName2)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(10)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();
@Cleanup
Expand Down Expand Up @@ -310,7 +306,6 @@ public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Excepti
.isAckReceiptEnabled(true)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Expand All @@ -322,7 +317,6 @@ public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Excepti
.isAckReceiptEnabled(true)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();

for (int i = 0; i < 5; i++) {
Expand Down Expand Up @@ -385,7 +379,6 @@ public void testNegativeAckAndLongAckDelayWillNotLeadRepeatConsume() throws Exce
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(redeliveryDelaySeconds, TimeUnit.SECONDS)
.enableBatchIndexAcknowledgment(true)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.acknowledgmentGroupTime(1, TimeUnit.HOURS)
.subscribe();
Expand Down Expand Up @@ -461,7 +454,6 @@ public void testMixIndexAndNonIndexUnAckMessageCount() throws Exception {
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
.enableBatchIndexAcknowledgment(true)
.isAckReceiptEnabled(true)
.subscribe();

Expand Down Expand Up @@ -492,7 +484,6 @@ public void testUnAckMessagesWhenConcurrentDeliveryAndAck() throws Exception {
.topic(topicName)
.receiverQueueSize(receiverQueueSize)
.subscriptionName(subName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true);

Expand Down Expand Up @@ -666,7 +657,6 @@ public void testPermitsIfHalfAckBatchMessage() throws Exception {
.topic(topicName)
.receiverQueueSize(receiverQueueSize)
.subscriptionName(subName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ protected void setup() throws Exception {
conf.setDefaultRetentionSizeInMB(100);
conf.setDefaultRetentionTimeInMinutes(100);
super.baseSetup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}

@AfterClass(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ public void ackCommitTest() throws Exception {
.topic(ACK_COMMIT_TOPIC)
.subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

Expand Down Expand Up @@ -347,7 +346,6 @@ public void ackAbortTest() throws Exception {
.topic(ACK_ABORT_TOPIC)
.subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Awaitility.await().until(consumer::isConnected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();
}

Expand Down Expand Up @@ -1451,9 +1450,6 @@ public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
public void testPendingAckBatchMessageCommit() throws Exception {
String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit";

// enable batch index ack
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer(Schema.BYTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public void testTransactionBufferLowWaterMark() throws Exception {
.topic(TOPIC)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
final String TEST1 = "test1";
Expand Down Expand Up @@ -196,7 +195,6 @@ public void testPendingAckLowWaterMark() throws Exception {
.topic(TOPIC)
.subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
final String TEST1 = "test1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public void commitTxnTest() throws Exception {
.topic(TOPIC)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
final String TEST1 = "test1";
Expand Down Expand Up @@ -139,7 +138,6 @@ public void abortTxnTest() throws Exception {
.topic(TOPIC)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
final String TEST1 = "test1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ protected final void setup() throws Exception {
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
config.setAcknowledgmentAtBatchIndexLevelEnabled(true);
config.setTransactionCoordinatorEnabled(true);
configurations[i] = config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0);
}

Expand All @@ -76,7 +75,6 @@ public void txnAckTestNoBatchAndSharedSubMemoryDeleteTest() throws Exception {
.topic(normalTopic)
.isAckReceiptEnabled(true)
.subscriptionName(subscriptionName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(2, TimeUnit.SECONDS)
.acknowledgmentGroupTime(0, TimeUnit.MICROSECONDS)
Expand Down Expand Up @@ -155,7 +153,6 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(normalTopic)
.subscriptionName(subscriptionName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

Expand Down Expand Up @@ -273,7 +270,6 @@ public void testPendingAckClearPositionIsSmallerThanMarkDelete() throws Exceptio
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(normalTopic)
.subscriptionName(subscriptionName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ public void individualPendingAckReplayTest() throws Exception {
.topic(PENDING_ACK_REPLAY_TOPIC)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();

Transaction abortTxn = pulsarClient.newTransaction()
Expand Down Expand Up @@ -336,7 +335,6 @@ public void testPendingAckMetrics() throws Exception {
.topic(PENDING_ACK_REPLAY_TOPIC)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive)
.enableBatchIndexAcknowledgment(true)
.subscribe();

for (int a = 0; a < messageCount; a++) {
Expand Down Expand Up @@ -447,7 +445,6 @@ public void cumulativePendingAckReplayTest() throws Exception {
.topic(PENDING_ACK_REPLAY_TOPIC)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

Transaction abortTxn = pulsarClient.newTransaction()
Expand Down Expand Up @@ -534,7 +531,6 @@ private void testDeleteSubThenDeletePendingAckManagedLedger() throws Exception {
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

consumer.close();
Expand Down Expand Up @@ -563,7 +559,6 @@ private void testDeleteTopicThenDeletePendingAckManagedLedger() throws Exception
.topic(topic)
.subscriptionName(subName1)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

consumer1.close();
Expand All @@ -573,7 +568,6 @@ private void testDeleteTopicThenDeletePendingAckManagedLedger() throws Exception
.topic(topic)
.subscriptionName(subName2)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

consumer2.close();
Expand Down Expand Up @@ -699,7 +693,6 @@ public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Expand Down Expand Up @@ -827,7 +820,6 @@ public void testTransactionConflictExceptionWhenAckBatchMessage() throws Excepti
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(subscriptionName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Exclusive)
.isAckReceiptEnabled(true)
.topic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public class MLPendingAckStoreTest extends TransactionTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, 1, NAMESPACE1 + "/test", 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void testAcknowledgment(boolean enableBatching) throws Exception {
final var consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.enableBatchIndexAcknowledgment(true)
.isAckReceiptEnabled(true)
.subscribe();
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ protected void doInitConf() throws Exception {
super.doInitConf();
conf.setSystemTopicEnabled(false);
conf.setTransactionCoordinatorEnabled(false);
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}

@AfterClass(alwaysRun = true)
Expand Down Expand Up @@ -243,7 +242,6 @@ private Consumer<String> createConsumer(String topicName, String subName, Subscr
.subscriptionName(subName)
.subscriptionType(subType)
.isAckReceiptEnabled(true)
.enableBatchIndexAcknowledgment(true)
.subscribe();
return consumer;
}
Expand All @@ -264,4 +262,4 @@ private MessagesEntry receiveAllMessages(Consumer<String> consumer) throws Excep

private record MessagesEntry(Set<String> messageSet, Set<MessageId> messageIdSet) {}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
super.internalSetup();
super.producerBaseSetup();
doReturn(CompletableFuture.completedFuture(new LedgerMetadata() {
Expand Down Expand Up @@ -174,7 +173,6 @@ public void testBatchMessageIndexAckForSharedSubscription(boolean ackReceiptEnab
.receiverQueueSize(100)
.isAckReceiptEnabled(ackReceiptEnabled)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(2, TimeUnit.SECONDS)
.subscribe();

Expand Down Expand Up @@ -254,7 +252,6 @@ public void testBatchMessageIndexAckForExclusiveSubscription(boolean ackReceiptE
.subscriptionName("sub")
.receiverQueueSize(100)
.isAckReceiptEnabled(ackReceiptEnabled)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Expand Down Expand Up @@ -324,7 +321,6 @@ public void testDoNotRecycleAckSetMultipleTimes() throws Exception {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS)
.topic(topic)
.enableBatchIndexAcknowledgment(true)
.subscriptionName("test")
.subscribe();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ public void testNegativeAcksWithBatch() throws Exception {
@Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setup();
String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatchAckEnabled");

Expand All @@ -390,7 +389,6 @@ public void testNegativeAcksWithBatchAckEnabled() throws Exception {
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
.subscribe();

Expand Down Expand Up @@ -437,7 +435,6 @@ public void testFailoverConsumerBatchCumulateAck() throws Exception {
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
.receiverQueueSize(10)
.subscribe();
Expand Down
Loading
Loading