Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/BatchMessageContainerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
namespace pulsar {

BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& producer)
: topicName_(producer.topic_),
: topicName_(producer.topic()),
producerConfig_(producer.conf_),
producerName_(producer.producerName_),
producerId_(producer.producerId_),
Expand Down
2 changes: 1 addition & 1 deletion lib/BatchMessageContainerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class BatchMessageContainerBase : public boost::noncopyable {

protected:
// references to ProducerImpl's fields
const std::shared_ptr<std::string> topicName_;
const std::string topicName_;
const ProducerConfiguration& producerConfig_;
const std::string& producerName_;
const uint64_t& producerId_;
Expand Down
14 changes: 7 additions & 7 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture()

const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }

const std::string& ConsumerImpl::getTopic() const { return *topic_; }
const std::string& ConsumerImpl::getTopic() const { return topic(); }

void ConsumerImpl::start() {
HandlerBase::start();
Expand All @@ -194,7 +194,7 @@ void ConsumerImpl::start() {

// Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
// constructor completed.
if (TopicName::get(*topic_)->isPersistent()) {
if (TopicName::get(topic())->isPersistent()) {
if (config_.getAckGroupingTimeMs() > 0) {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled(),
Expand Down Expand Up @@ -249,7 +249,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
*topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
topic(), subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(), config_.getPriorityLevel());
Expand Down Expand Up @@ -552,7 +552,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::

Message m(messageId, brokerEntryMetadata, metadata, payload);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(topic_);
m.impl_->setTopicName(getTopicPtr());
m.impl_->setRedeliveryCount(msg.redelivery_count());

if (metadata.has_schema_version()) {
Expand Down Expand Up @@ -1243,7 +1243,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
return;
}

LOG_INFO(getName() << "Closing consumer for topic " << topic_);
LOG_INFO(getName() << "Closing consumer for topic " << topic());
state_ = Closing;
incomingMessages_.close();

Expand Down Expand Up @@ -1764,7 +1764,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa
return;
}
if (result != ResultOk) {
LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
<< self->consumerName_ << "} Failed to acknowledge the message {"
<< originMessageId
<< "} of the original topic but send to the DLQ successfully : "
Expand All @@ -1777,7 +1777,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa
}
});
} else {
LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
<< self->consumerName_ << "} Failed to send DLQ message to {"
<< self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
<< "{" << originMessageId << "} : " << res);
Expand Down
6 changes: 3 additions & 3 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ DECLARE_LOG_OBJECT()
namespace pulsar {

HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
: client_(client),
topic_(std::make_shared<std::string>(topic)),
: topic_(std::make_shared<std::string>(topic)),
client_(client),
executor_(client->getIOExecutorProvider()->get()),
mutex_(),
creationTimestamp_(TimeUtils::now()),
Expand Down Expand Up @@ -88,7 +88,7 @@ void HandlerBase::grabCnx() {
return;
}
auto self = shared_from_this();
client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx).addListener([this, self](Result result, bool) {
Expand Down
6 changes: 5 additions & 1 deletion lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,18 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {

virtual const std::string& getName() const = 0;

const std::string& topic() const { return *topic_; }
const std::shared_ptr<std::string>& getTopicPtr() const { return topic_; }

private:
const std::shared_ptr<std::string> topic_;

void handleDisconnection(Result result, const ClientConnectionPtr& cnx);

void handleTimeout(const boost::system::error_code& ec);

protected:
ClientImplWeakPtr client_;
const std::shared_ptr<std::string> topic_;
ExecutorServicePtr executor_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
Expand Down
10 changes: 5 additions & 5 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
interceptors_(interceptors) {
std::stringstream consumerStrStream;
consumerStrStream << "[Muti Topics Consumer: "
<< "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
<< "TopicName - " << topic() << " - Subscription - " << subscriptionName << "]";
consumerStr_ = consumerStrStream.str();

if (conf.getUnAckedMessagesTimeoutMs() != 0) {
Expand Down Expand Up @@ -312,7 +312,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
}

void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
LOG_INFO("[ Topics Consumer " << topic() << "," << subscriptionName_ << "] Unsubscribing");

auto callback = [this, originalCallback](Result result) {
if (result == ResultOk) {
Expand Down Expand Up @@ -483,7 +483,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
*numberTopicPartitions_ = 0;
if (consumers.empty()) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
<< " topic" << topic_ << " subscription - " << subscriptionName_);
<< " topic" << topic() << " subscription - " << subscriptionName_);
callback(ResultAlreadyClosed);
return;
}
Expand Down Expand Up @@ -518,7 +518,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
msg.impl_->setTopicName(consumer.impl_->topic_);
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());

Lock lock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
Expand Down Expand Up @@ -744,7 +744,7 @@ Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCrea
}
const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; }

const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_; }
const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); }

const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }

Expand Down
14 changes: 7 additions & 7 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
partition_(partition),
producerName_(conf_.getProducerName()),
userProvidedProducerName_(false),
producerStr_("[" + *topic_ + ", " + producerName_ + "] "),
producerStr_("[" + topic() + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
batchTimer_(executor_->createDeadlineTimer()),
Expand All @@ -67,7 +67,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
memoryLimitController_(client->getMemoryLimitController()),
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
interceptors_(interceptors) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
<< " id: " << producerId_);

int64_t initialSequenceId = conf.getInitialSequenceId();
Expand All @@ -93,7 +93,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,

if (conf_.isEncryptionEnabled()) {
std::ostringstream logCtxStream;
logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]";
logCtxStream << "[" << topic() << ", " << producerName_ << ", " << producerId_ << "]";
std::string logCtx = logCtxStream.str();
msgCrypto_ = std::make_shared<MessageCrypto>(logCtx, true);
msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
Expand Down Expand Up @@ -123,7 +123,7 @@ ProducerImpl::~ProducerImpl() {
}
}

const std::string& ProducerImpl::getTopic() const { return *topic_; }
const std::string& ProducerImpl::getTopic() const { return topic(); }

const std::string& ProducerImpl::getProducerName() const { return producerName_; }

Expand All @@ -148,7 +148,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();

SharedBuffer cmd = Commands::newProducer(*topic_, producerId_, producerName_, requestId,
SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId,
conf_.getProperties(), conf_.getSchema(), epoch_,
userProvidedProducerName_, conf_.isEncryptionEnabled(),
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
Expand Down Expand Up @@ -218,7 +218,7 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
cnx->registerProducer(producerId_, shared_from_this());
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
producerStr_ = "[" + *topic_ + ", " + producerName_ + "] ";
producerStr_ = "[" + topic() + ", " + producerName_ + "] ";
topicEpoch = responseData.topicEpoch;

if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
Expand Down Expand Up @@ -788,7 +788,7 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) {

return;
}
LOG_INFO(getName() << "Closing producer for topic " << topic_);
LOG_INFO(getName() << "Closing producer for topic " << topic());
state_ = Closing;

ClientConnectionPtr cnx = getCnx().lock();
Expand Down