Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 54 additions & 14 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,23 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
if (result == ResultOk) {
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
{
Lock lock(mutex_);
Lock mutexLock(mutex_);
setCnx(cnx);
incomingMessages_.clear();
possibleSendToDeadLetterTopicMessages_.clear();
state_ = Ready;
backoff_.reset();
// Complicated logic since we don't have a isLocked() function for mutex
if (waitingForZeroQueueSizeMessage) {
sendFlowPermitsToBroker(cnx, 1);
if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
// Complicated logic since we don't have a isLocked() function for mutex
if (waitingForZeroQueueSizeMessage) {
sendFlowPermitsToBroker(cnx, 1);
}
// Note that the order of lock acquisition must be mutex_ -> pendingReceiveMutex_,
// otherwise a deadlock will occur.
Lock pendingReceiveMutexLock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
sendFlowPermitsToBroker(cnx, pendingReceives_.size());
}
}
availablePermits_ = 0;
}
Expand Down Expand Up @@ -915,7 +923,6 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
}

// Using RAII for locking
ClientConnectionPtr currentCnx = getCnx().lock();
Lock lock(mutexForReceiveWithZeroQueueSize);

// Just being cautious
Expand All @@ -924,9 +931,18 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
getName() << "The incoming message queue should never be greater than 0 when Queue size is 0");
incomingMessages_.clear();
}
waitingForZeroQueueSizeMessage = true;

sendFlowPermitsToBroker(currentCnx, 1);
{
// Lock mutex_ to prevent a race condition with handleCreateConsumer.
// If handleCreateConsumer is executed after setting waitingForZeroQueueSizeMessage to true and
// before calling sendFlowPermitsToBroker, the result may be that a flow permit is sent twice.
Lock lock(mutex_);
waitingForZeroQueueSizeMessage = true;
// If connection_ is nullptr, sendFlowPermitsToBroker does nothing.
// In other words, a flow permit will not be sent until setCnx(cnx) is executed in
// handleCreateConsumer.
sendFlowPermitsToBroker(getCnx().lock(), 1);
}

while (true) {
if (!incomingMessages_.pop(msg)) {
Expand All @@ -939,6 +955,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
Lock localLock(mutex_);
// if message received due to an old flow - discard it and wait for the message from the
// latest flow command
ClientConnectionPtr currentCnx = getCnx().lock();
if (msg.impl_->cnx_ == currentCnx.get()) {
waitingForZeroQueueSizeMessage = false;
// Can't use break here else it may trigger a race with connection opened.
Expand Down Expand Up @@ -966,19 +983,42 @@ void ConsumerImpl::receiveAsync(ReceiveCallback callback) {
return;
}

Lock lock(pendingReceiveMutex_);
if (messageListener_) {
LOG_ERROR(getName() << "Can not receive when a listener has been set");
callback(ResultInvalidConfiguration, msg);
return;
}

Lock mutexlock(mutex_, std::defer_lock);
if (config_.getReceiverQueueSize() == 0) {
// Lock mutex_ to prevent a race condition with handleCreateConsumer.
// If handleCreateConsumer is executed after pushing the callback to pendingReceives_ and
// before calling sendFlowPermitsToBroker, the result may be that a flow permit is sent twice.
// Note that the order of lock acquisition must be mutex_ -> pendingReceiveMutex_,
// otherwise a deadlock will occur.
mutexlock.lock();
}

Lock pendingReceiveMutexLock(pendingReceiveMutex_);
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
pendingReceiveMutexLock.unlock();
if (config_.getReceiverQueueSize() == 0) {
mutexlock.unlock();
}
messageProcessed(msg);
msg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
callback(ResultOk, msg);
} else if (config_.getReceiverQueueSize() == 0) {
pendingReceives_.push(callback);
// If connection_ is nullptr, sendFlowPermitsToBroker does nothing.
// In other words, a flow permit will not be sent until setCnx(cnx) is executed in
// handleCreateConsumer.
sendFlowPermitsToBroker(getCnx().lock(), 1);
pendingReceiveMutexLock.unlock();
mutexlock.unlock();
} else {
pendingReceives_.push(callback);
lock.unlock();

if (config_.getReceiverQueueSize() == 0) {
sendFlowPermitsToBroker(getCnx().lock(), 1);
}
pendingReceiveMutexLock.unlock();
}
}

Expand Down
100 changes: 100 additions & 0 deletions tests/ZeroQueueSizeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <mutex>

#include "ConsumerTest.h"
#include "HttpHelper.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"

Expand All @@ -37,6 +38,7 @@ using namespace pulsar;
static int totalMessages = 10;
static int globalCount = 0;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080";
static std::string contentBase = "msg-";

static void messageListenerFunction(Consumer consumer, const Message& msg, Latch& latch) {
Expand Down Expand Up @@ -287,3 +289,101 @@ TEST(ZeroQueueSizeTest, testPauseResumeNoReconnection) {

client.close();
}

class ZeroQueueSizeTest : public ::testing::TestWithParam<bool> {};

TEST_P(ZeroQueueSizeTest, testReceptionAfterUnloading) {
Client client(lookupUrl);
auto isAsync = GetParam();
std::string topicName = "zero-queue-size-reception-after-unloading";
if (isAsync) {
topicName += "-async";
}
std::string subName = "my-sub";

Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);

Consumer consumer;
ConsumerConfiguration consConfig;
consConfig.setReceiverQueueSize(0);
result = client.subscribe(topicName, subName, consConfig, consumer);
ASSERT_EQ(ResultOk, result);

for (int i = 0; i < totalMessages / 2; i++) {
std::ostringstream ss;
ss << contentBase << i;
Message msg = MessageBuilder().setContent(ss.str()).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
}

for (int i = 0; i < totalMessages / 2; i++) {
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
std::ostringstream ss;
ss << contentBase << i;
if (isAsync) {
Latch latch(1);
consumer.receiveAsync([&consumer, &ss, &latch](Result res, const Message& receivedMsg) {
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
latch.countdown();
});
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
} else {
Message receivedMsg;
consumer.receive(receivedMsg);
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
}
}

// Wait for messages to be delivered while performing `receive` or `receiveAsync` in a separate thread.
// At this time, the value of availablePermits should be 1.
std::thread consumeThread([&consumer, &isAsync] {
for (int i = totalMessages / 2; i < totalMessages; i++) {
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
std::ostringstream ss;
ss << contentBase << i;
if (isAsync) {
Latch latch(1);
consumer.receiveAsync([&consumer, &ss, &latch](Result res, const Message& receivedMsg) {
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
latch.countdown();
});
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
} else {
Message receivedMsg;
consumer.receive(receivedMsg);
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
}
}
});
std::this_thread::sleep_for(std::chrono::seconds(1));

int res = makePutRequest(adminUrl + "/admin/v2/persistent/public/default/" + topicName + "/unload", "");
ASSERT_TRUE(res / 100 == 2) << "res: " << res;

for (int i = totalMessages / 2; i < totalMessages; i++) {
std::ostringstream ss;
ss << contentBase << i;
Message msg = MessageBuilder().setContent(ss.str()).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
}

consumeThread.join();
consumer.unsubscribe();
consumer.close();
producer.close();
client.close();
}

INSTANTIATE_TEST_CASE_P(Pulsar, ZeroQueueSizeTest, ::testing::Values(false, true));