Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
return;
}

increaseAvailablePermits(currentCnx);
if (!hasParent_) {
increaseAvailablePermits(currentCnx);
}
if (track) {
trackMessage(msg.getMessageId());
}
Expand Down Expand Up @@ -1089,6 +1091,16 @@ void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCn
}
}

void ConsumerImpl::increaseAvailablePermits(const Message& msg) {
ClientConnectionPtr currentCnx = getCnx().lock();
if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
LOG_DEBUG(getName() << "Not adding permit since connection is different.");
return;
}

increaseAvailablePermits(currentCnx);
}

inline CommandSubscribe_SubType ConsumerImpl::getSubType() {
ConsumerType type = config_.getConsumerType();
switch (type) {
Expand Down
1 change: 1 addition & 0 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class ConsumerImpl : public ConsumerImplBase {
void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
CommandAck_ValidationError validationError);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
void increaseAvailablePermits(const Message& msg);
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
const BitSet& ackSet, int redeliveryCount);
Expand Down
1 change: 1 addition & 0 deletions lib/MessageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class MessageImpl {
int redeliveryCount_;
bool hasSchemaVersion_;
const std::string* schemaVersion_;
std::weak_ptr<class ConsumerImpl> consumerPtr_;

const std::string& getPartitionKey() const;
bool hasPartitionKey() const;
Expand Down
16 changes: 9 additions & 7 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
msg.impl_->consumerPtr_ = std::static_pointer_cast<ConsumerImpl>(consumer.impl_);

Lock lock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
Expand All @@ -530,18 +531,15 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
auto self = weakSelf.lock();
if (self) {
notifyPendingReceivedCallback(ResultOk, msg, callback);
auto consumer = msg.impl_->consumerPtr_.lock();
if (consumer) {
consumer->increaseAvailablePermits(msg);
}
}
});
return;
}

if (incomingMessages_.full()) {
lock.unlock();
}

// add message to block queue.
// when messages queue is full, will block listener thread on ConsumerImpl,
// then will not send permits to broker, will broker stop push message.
incomingMessages_.push(msg);
incomingMessagesSize_.fetch_add(msg.getLength());

Expand Down Expand Up @@ -1072,6 +1070,10 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
incomingMessagesSize_.fetch_sub(msg.getLength());
unAckedMessageTrackerPtr_->add(msg.getMessageId());
auto consumer = msg.impl_->consumerPtr_.lock();
if (consumer) {
consumer->increaseAvailablePermits(msg);
}
}

std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
Expand Down
4 changes: 2 additions & 2 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <memory>
#include <vector>

#include "BlockingQueue.h"
#include "Commands.h"
#include "ConsumerImplBase.h"
#include "ConsumerInterceptors.h"
Expand All @@ -33,6 +32,7 @@
#include "LookupDataResult.h"
#include "SynchronizedHashMap.h"
#include "TestUtil.h"
#include "UnboundedBlockingQueue.h"

namespace pulsar {
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
Expand Down Expand Up @@ -115,7 +115,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
std::map<std::string, int> topicsPartitions_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
BlockingQueue<Message> incomingMessages_;
UnboundedBlockingQueue<Message> incomingMessages_;
std::atomic_int incomingMessagesSize_ = {0};
MessageListener messageListener_;
DeadlineTimerPtr partitionsUpdateTimer_;
Expand Down
66 changes: 66 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1335,4 +1335,70 @@ TEST(ConsumerTest, testRetrySubscribe) {
// milliseconds
}

TEST(ConsumerTest, testNoListenerThreadBlocking) {
Client client{lookupUrl};

const int numPartitions = 2;
const std::string partitionedTopic = "testNoListenerThreadBlocking-" + std::to_string(time(nullptr));
int res =
makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

const int receiverQueueSize = 1;
const int receiverQueueSizeAcrossPartitions = receiverQueueSize * numPartitions;

Consumer consumer1, consumer2;
ConsumerConfiguration consumerConfig;
consumerConfig.setReceiverQueueSize(receiverQueueSize);
consumerConfig.setMaxTotalReceiverQueueSizeAcrossPartitions(receiverQueueSizeAcrossPartitions);
Result consumerResult;
consumerResult = client.subscribe(partitionedTopic, "sub1", consumerConfig, consumer1);
ASSERT_EQ(consumerResult, ResultOk);
consumerResult = client.subscribe(partitionedTopic, "sub2", consumerConfig, consumer2);
ASSERT_EQ(consumerResult, ResultOk);

Producer producer;
ProducerConfiguration producerConfig;
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
Result producerResult = client.createProducer(partitionedTopic, producerConfig, producer);
ASSERT_EQ(producerResult, ResultOk);

const int msgCount = receiverQueueSizeAcrossPartitions * 100;

for (int i = 0; i < msgCount; ++i) {
auto msg = MessageBuilder().setContent("test").build();
producer.sendAsync(msg, [](Result code, const MessageId& messageId) {});
}
producer.flush();
producer.close();

waitUntil(std::chrono::seconds(1), [consumer1] {
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
return multiConsumerImpl->getNumOfPrefetchedMessages() == receiverQueueSizeAcrossPartitions;
});

// check consumer1 prefetch num
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);

// read consumer2 while consumer1 reaches the prefech limit
for (int i = 0; i < msgCount; ++i) {
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer2);
int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);

Message msg;
Result ret = consumer2.receive(msg, 1000);
ASSERT_EQ(ret, ResultOk);
consumer2.acknowledge(msg);
}

consumer2.close();
consumer1.close();
client.close();
}

} // namespace pulsar