Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,9 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
scheduleReconnection(get_shared_this_ptr());
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) {
LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result));
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (result == ResultRetryable) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
scheduleReconnection(get_shared_this_ptr());
} else {
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
Expand Down
10 changes: 8 additions & 2 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
}
}

bool HandlerBase::isRetriableError(Result result) { return result == ResultRetryable; }

void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
const auto state = handler->state_.load();
if (state == Pending || state == Ready) {
Expand All @@ -164,4 +162,12 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBase
}
}

Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const {
if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >= operationTimeut_)) {
return ResultTimeout;
} else {
return result;
}
}

} // namespace pulsar
7 changes: 2 additions & 5 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ class HandlerBase {
*/
static void scheduleReconnection(HandlerBasePtr handler);

/*
* Should we retry in error that are transient
*/
bool isRetriableError(Result result);

/**
* Do some cleanup work before changing `connection_` to `cnx`.
*
Expand Down Expand Up @@ -127,6 +122,8 @@ class HandlerBase {
Backoff backoff_;
uint64_t epoch_;

Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const;

private:
DeadlineTimerPtr timer_;

Expand Down
3 changes: 2 additions & 1 deletion lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
scheduleReconnection(shared_from_this());
} else {
// Producer was not yet created, retry to connect to broker if it's possible
if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) {
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (result == ResultRetryable) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
scheduleReconnection(shared_from_this());
} else {
Expand Down
14 changes: 14 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1321,4 +1321,18 @@ TEST(ConsumerTest, testNotSetSubscriptionName) {
client.close();
}

TEST(ConsumerTest, testRetrySubscribe) {
Client client{lookupUrl};
for (int i = 0; i < 10; i++) {
// "Subscription is fenced" error might happen here because the previous seek operation might not be
// done in broker, the consumer should retry until timeout
Consumer consumer;
ASSERT_EQ(client.subscribe("test-close-before-seek-done", "sub", consumer), ResultOk);
consumer.seekAsync(MessageId::earliest(), [](Result) {});
consumer.close();
}
// TODO: Currently it's hard to test the timeout error without configuring the operation timeout in
// milliseconds
}

} // namespace pulsar