Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,38 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
Result handleResult = ResultOk;

if (result == ResultOk) {
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
{
Lock mutexLock(mutex_);
if (!changeToReadyState()) {
auto client = client_.lock();
if (client) {
LOG_INFO(getName() << "Closing subscribed consumer since it was already closed");
int requestId = client->newRequestId();
auto name = getName();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
.addListener([name](Result result, const ResponseData&) {
if (result == ResultOk) {
LOG_INFO(name << "Closed consumer successfully after subscribe completed");
} else {
LOG_WARN(name << "Failed to close consumer: " << strResult(result));
}
});
} else {
// This should not happen normally because if client is destroyed, the connection pool
// should also be closed, which means all connections should be closed. Close the
// connection to let broker know this registered consumer is inactive.
LOG_WARN(getName()
<< "Client already closed when subscribe completed, close the connection "
<< cnx->cnxString());
cnx->close(ResultNotConnected);
}
return ResultAlreadyClosed;
}

LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
setCnx(cnx);
incomingMessages_.clear();
possibleSendToDeadLetterTopicMessages_.clear();
state_ = Ready;
backoff_.reset();
if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
// Complicated logic since we don't have a isLocked() function for mutex
Expand Down
5 changes: 5 additions & 0 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
firstRequestIdAfterConnect_.store(requestId, std::memory_order_release);
}

bool changeToReadyState() noexcept {
State expected = Pending;
return state_ == Ready || state_.compare_exchange_strong(expected, Ready);
}

private:
DeadlineTimerPtr timer_;
DeadlineTimerPtr creationTimer_;
Expand Down
22 changes: 22 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1573,4 +1573,26 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
ASSERT_EQ(ResultOk, client.close());
}

TEST(ConsumerTest, testCloseAfterSeek) {
const auto topic = "test-close-after-seek-" + std::to_string(time(nullptr));
const auto subscription = "sub";
Client client(lookupUrl);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis()));
consumer.closeAsync(nullptr);

// Test the previous consumer will be closed even after seek is done, at the moment the connection might
// not be established.
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));

// Test creating a consumer from a different client should also work for this case
Client anotherClient(lookupUrl);
consumer.closeAsync(nullptr);
ASSERT_EQ(ResultOk, anotherClient.subscribe(topic, subscription, consumer));

client.close();
anotherClient.close();
}

} // namespace pulsar