Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
state_ = Closing;

memoryLimitController_.close();
lookupServicePtr_->close();

auto producers = producers_.move();
auto consumers = consumers_.move();
Expand Down
2 changes: 2 additions & 0 deletions lib/LookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class LookupService {
const std::string& version = "") = 0;

virtual ~LookupService() {}

virtual void close() {}
};

typedef std::shared_ptr<LookupService> LookupServicePtr;
Expand Down
123 changes: 26 additions & 97 deletions lib/RetryableLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,17 @@
*/
#pragma once

#include <algorithm>
#include <memory>

#include "Backoff.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "LookupDataResult.h"
#include "LookupService.h"
#include "SynchronizedHashMap.h"
#include "NamespaceName.h"
#include "RetryableOperationCache.h"
#include "TopicName.h"

namespace pulsar {

class RetryableLookupService : public LookupService,
public std::enable_shared_from_this<RetryableLookupService> {
class RetryableLookupService : public LookupService {
private:
friend class PulsarFriend;
friend class LookupServiceTest;
struct PassKey {
explicit PassKey() {}
};
Expand All @@ -44,123 +38,58 @@ class RetryableLookupService : public LookupService,
explicit RetryableLookupService(PassKey, Args&&... args)
: RetryableLookupService(std::forward<Args>(args)...) {}

void close() override {
lookupCache_->clear();
partitionLookupCache_->clear();
namespaceLookupCache_->clear();
getSchemaCache_->clear();
}

template <typename... Args>
static std::shared_ptr<RetryableLookupService> create(Args&&... args) {
return std::make_shared<RetryableLookupService>(PassKey{}, std::forward<Args>(args)...);
}

LookupResultFuture getBroker(const TopicName& topicName) override {
return executeAsync<LookupResult>("get-broker-" + topicName.toString(),
[this, topicName] { return lookupService_->getBroker(topicName); });
return lookupCache_->run("get-broker-" + topicName.toString(),
[this, topicName] { return lookupService_->getBroker(topicName); });
}

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
return executeAsync<LookupDataResultPtr>(
return partitionLookupCache_->run(
"get-partition-metadata-" + topicName->toString(),
[this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); });
}

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override {
return executeAsync<NamespaceTopicsPtr>(
return namespaceLookupCache_->run(
"get-topics-of-namespace-" + nsName->toString(),
[this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
}

Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override {
return executeAsync<SchemaInfo>("get-schema" + topicName->toString(), [this, topicName, version] {
return getSchemaCache_->run("get-schema" + topicName->toString(), [this, topicName, version] {
return lookupService_->getSchema(topicName, version);
});
}

template <typename T>
Future<Result, T> executeAsync(const std::string& key, std::function<Future<Result, T>()> f) {
Promise<Result, T> promise;
executeAsyncImpl(key, f, promise, timeout_);
return promise.getFuture();
}

private:
const std::shared_ptr<LookupService> lookupService_;
const TimeDuration timeout_;
Backoff backoff_;
const ExecutorServiceProviderPtr executorProvider_;

SynchronizedHashMap<std::string, DeadlineTimerPtr> backoffTimers_;
RetryableOperationCachePtr<LookupResult> lookupCache_;
RetryableOperationCachePtr<LookupDataResultPtr> partitionLookupCache_;
RetryableOperationCachePtr<NamespaceTopicsPtr> namespaceLookupCache_;
RetryableOperationCachePtr<SchemaInfo> getSchemaCache_;

RetryableLookupService(std::shared_ptr<LookupService> lookupService, int timeoutSeconds,
ExecutorServiceProviderPtr executorProvider)
: lookupService_(lookupService),
timeout_(boost::posix_time::seconds(timeoutSeconds)),
backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
boost::posix_time::milliseconds(0)),
executorProvider_(executorProvider) {}

std::weak_ptr<RetryableLookupService> weak_from_this() noexcept { return shared_from_this(); }

// NOTE: Set the visibility to fix compilation error in GCC 6
template <typename T>
#ifndef _WIN32
__attribute__((visibility("hidden")))
#endif
void
executeAsyncImpl(const std::string& key, std::function<Future<Result, T>()> f, Promise<Result, T> promise,
TimeDuration remainingTime) {
auto weakSelf = weak_from_this();
f().addListener([this, weakSelf, key, f, promise, remainingTime](Result result, const T& value) {
auto self = weakSelf.lock();
if (!self) {
return;
}

if (result == ResultOk) {
backoffTimers_.remove(key);
promise.setValue(value);
} else if (result == ResultRetryable) {
if (remainingTime.total_milliseconds() <= 0) {
backoffTimers_.remove(key);
promise.setFailed(ResultTimeout);
return;
}

DeadlineTimerPtr timerPtr;
try {
timerPtr = executorProvider_->get()->createDeadlineTimer();
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what());
promise.setFailed(ResultConnectError);
return;
}
auto it = backoffTimers_.emplace(key, timerPtr);
auto& timer = *(it.first->second);
auto delay = std::min(backoff_.next(), remainingTime);
timer.expires_from_now(delay);

auto nextRemainingTime = remainingTime - delay;
LOG_INFO("Reschedule " << key << " for " << delay.total_milliseconds()
<< " ms, remaining time: " << nextRemainingTime.total_milliseconds()
<< " ms");
timer.async_wait([this, weakSelf, key, f, promise,
nextRemainingTime](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (!self || ec) {
if (self && ec != boost::asio::error::operation_aborted) {
LOG_ERROR("The timer for " << key << " failed: " << ec.message());
}
// The lookup service has been destructed or the timer has been cancelled
promise.setFailed(ResultTimeout);
return;
}
executeAsyncImpl(key, f, promise, nextRemainingTime);
});
} else {
backoffTimers_.remove(key);
promise.setFailed(result);
}
});
}

DECLARE_LOG_OBJECT()
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeoutSeconds)),
partitionLookupCache_(
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeoutSeconds)),
namespaceLookupCache_(
RetryableOperationCache<NamespaceTopicsPtr>::create(executorProvider, timeoutSeconds)),
getSchemaCache_(RetryableOperationCache<SchemaInfo>::create(executorProvider, timeoutSeconds)) {}
};

} // namespace pulsar
134 changes: 134 additions & 0 deletions lib/RetryableOperation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <pulsar/Result.h>

#include <algorithm>
#include <atomic>
#include <functional>
#include <memory>

#include "Backoff.h"
#include "ExecutorService.h"
#include "Future.h"
#include "LogUtils.h"

namespace pulsar {

template <typename T>
class RetryableOperation : public std::enable_shared_from_this<RetryableOperation<T>> {
struct PassKey {
explicit PassKey() {}
};

RetryableOperation(const std::string& name, std::function<Future<Result, T>()>&& func, int timeoutSeconds,
DeadlineTimerPtr timer)
: name_(name),
func_(std::move(func)),
timeout_(boost::posix_time::seconds(timeoutSeconds)),
backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
boost::posix_time::milliseconds(0)),
timer_(timer) {}

public:
template <typename... Args>
explicit RetryableOperation(PassKey, Args&&... args) : RetryableOperation(std::forward<Args>(args)...) {}

template <typename... Args>
static std::shared_ptr<RetryableOperation<T>> create(Args&&... args) {
return std::make_shared<RetryableOperation<T>>(PassKey{}, std::forward<Args>(args)...);
}

Future<Result, T> run() {
bool expected = false;
if (!started_.compare_exchange_strong(expected, true)) {
return promise_.getFuture();
}
return runImpl(timeout_);
}

void cancel() {
promise_.setFailed(ResultDisconnected);
boost::system::error_code ec;
timer_->cancel(ec);
}

private:
const std::string name_;
std::function<Future<Result, T>()> func_;
const TimeDuration timeout_;
Backoff backoff_;
Promise<Result, T> promise_;
std::atomic_bool started_{false};
DeadlineTimerPtr timer_;

Future<Result, T> runImpl(TimeDuration remainingTime) {
std::weak_ptr<RetryableOperation<T>> weakSelf{this->shared_from_this()};
func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (result == ResultOk) {
promise_.setValue(value);
return;
}
if (result != ResultRetryable) {
promise_.setFailed(result);
return;
}
if (remainingTime.total_milliseconds() <= 0) {
promise_.setFailed(ResultTimeout);
return;
}

auto delay = std::min(backoff_.next(), remainingTime);
timer_->expires_from_now(delay);

auto nextRemainingTime = remainingTime - delay;
LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds()
<< " ms, remaining time: " << nextRemainingTime.total_milliseconds()
<< " ms");
timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (ec) {
if (ec == boost::asio::error::operation_aborted) {
LOG_DEBUG("Timer for " << name_ << " is cancelled");
promise_.setFailed(ResultTimeout);
} else {
LOG_WARN("Timer for " << name_ << " failed: " << ec.message());
}
} else {
LOG_DEBUG("Run operation " << name_ << ", remaining time: "
<< nextRemainingTime.total_milliseconds() << " ms");
runImpl(nextRemainingTime);
}
});
});
return promise_.getFuture();
}

DECLARE_LOG_OBJECT()
};

} // namespace pulsar
Loading