Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ DEFINE_mDouble(standby_compaction_version_ratio, "0.8");

DEFINE_mBool(enable_cache_read_from_peer, "true");

// Rate limit for warmup download in bytes per second, default 100MB/s
// 0 means no limit
DEFINE_mInt64(file_cache_warmup_download_rate_limit_bytes_per_second, "104857600");

// Cache the expiration time of the peer address.
// This can be configured to be less than the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
// If the value is -1, use the `rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration as the expiration time.
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ DECLARE_mDouble(standby_compaction_version_ratio);

DECLARE_mBool(enable_cache_read_from_peer);

// Rate limit for warmup download in bytes per second, default 100MB/s
// 0 means no limit
DECLARE_mInt64(file_cache_warmup_download_rate_limit_bytes_per_second);

DECLARE_mInt64(cache_read_from_peer_expired_seconds);

#include "common/compile_check_end.h"
Expand Down
11 changes: 11 additions & 0 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
#include <vector>

#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "cpp/s3_rate_limiter.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
Expand Down Expand Up @@ -425,6 +427,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
size_t size = empty_end - empty_start + 1;
std::unique_ptr<char[]> buffer(new char[size]);

// Apply rate limiting for warmup download tasks (node level)
// Rate limiting is applied before remote read to limit both S3 read and local cache write
if (io_ctx->is_warmup) {
auto* rate_limiter = ExecEnv::GetInstance()->warmup_download_rate_limiter();
if (rate_limiter != nullptr) {
rate_limiter->add(size);
}
}

// Determine read type and execute remote read
RETURN_IF_ERROR(
_execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx));
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct RuntimeFilterTimerQueue;
} // namespace pipeline
class WorkloadGroupMgr;
struct WriteCooldownMetaExecutors;
class S3RateLimiterHolder;
namespace io {
class FileCacheFactory;
class HdfsMgr;
Expand Down Expand Up @@ -308,6 +309,7 @@ class ExecEnv {
io::HdfsMgr* hdfs_mgr() { return _hdfs_mgr; }
io::PackedFileManager* packed_file_manager() { return _packed_file_manager; }
IndexPolicyMgr* index_policy_mgr() { return _index_policy_mgr; }
S3RateLimiterHolder* warmup_download_rate_limiter() { return _warmup_download_rate_limiter; }

#ifdef BE_TEST
void set_tmp_file_dir(std::unique_ptr<segment_v2::TmpFileDirs> tmp_file_dirs) {
Expand Down Expand Up @@ -566,6 +568,7 @@ class ExecEnv {
kerberos::KerberosTicketMgr* _kerberos_ticket_mgr = nullptr;
io::HdfsMgr* _hdfs_mgr = nullptr;
io::PackedFileManager* _packed_file_manager = nullptr;
S3RateLimiterHolder* _warmup_download_rate_limiter = nullptr;
};

template <>
Expand Down
19 changes: 19 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "common/kerberos/kerberos_ticket_mgr.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/s3_rate_limiter.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/fs_file_cache_storage.h"
Expand Down Expand Up @@ -140,6 +141,11 @@ namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;

// Warmup download rate limiter metrics
bvar::Adder<int64_t> warmup_download_rate_limit_ns("warmup_download_rate_limit_ns");
bvar::Adder<int64_t> warmup_download_rate_limit_exceed_req_num(
"warmup_download_rate_limit_exceed_req_num");

static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
bool init_system_metrics = config::enable_system_metrics;
std::set<std::string> disk_devices;
Expand Down Expand Up @@ -417,6 +423,18 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,

_index_policy_mgr = new IndexPolicyMgr();

// Initialize warmup download rate limiter for cloud mode
if (config::is_cloud_mode() &&
config::file_cache_warmup_download_rate_limit_bytes_per_second > 0) {
int64_t rate_limit = config::file_cache_warmup_download_rate_limit_bytes_per_second;
// max_burst is the same as rate_limit (1 second burst)
// limit is 0 which means no total limit
_warmup_download_rate_limiter = new S3RateLimiterHolder(
rate_limit, rate_limit, 0,
metric_func_factory(warmup_download_rate_limit_ns,
warmup_download_rate_limit_exceed_req_num));
}

RETURN_IF_ERROR(_spill_stream_mgr->init());
RETURN_IF_ERROR(_runtime_query_statistics_mgr->start_report_thread());
_dict_factory = new doris::vectorized::DictionaryFactory();
Expand Down Expand Up @@ -922,6 +940,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_heap_profiler);

SAFE_DELETE(_index_policy_mgr);
SAFE_DELETE(_warmup_download_rate_limiter);

_s_tracking_memory = false;

Expand Down
Loading