Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
M(IcebergMetadataFilesCacheMisses, "Number of times iceberg metadata files have not been found in the iceberg metadata cache and had to be read from (remote) disk.", ValueType::Number) \
M(IcebergMetadataFilesCacheWeightLost, "Approximate number of bytes evicted from the iceberg metadata cache.", ValueType::Number) \
M(IcebergMetadataReadWaitTimeMicroseconds, "Total time data readers spend waiting for iceberg metadata files to be read and parsed, summed across all reader threads.", ValueType::Microseconds) \
M(IcebergManifestFilesParallelFetchMicroseconds, "Wall-clock time saved by parallel manifest file fetching: time from submitting the first parallel fetch until the last future is consumed, measured once per iterator initialization.", ValueType::Microseconds) \
M(IcebergIteratorInitializationMicroseconds, "Total time spent on synchronous initialization of iceberg data iterators.", ValueType::Microseconds) \
M(IcebergMetadataUpdateMicroseconds, "Total time spent on synchronous initialization of iceberg data iterators.", ValueType::Microseconds) \
M(IcebergMetadataReturnedObjectInfos, "Total number of returned object infos from iceberg iterator.", ValueType::Number) \
Expand Down
10 changes: 9 additions & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4910,7 +4910,15 @@ Possible values:
- 0 - Disabled
- 1 - Enabled
)", 0) \
\
DECLARE(UInt64, iceberg_metadata_files_parallel_loading_threads, 8, R"(
Number of threads used to fetch Iceberg manifest files in parallel during query planning.

When greater than 1, manifest files listed in the manifest list are fetched concurrently from object storage, reducing cold-cache query latency proportionally to the number of manifests. Each thread calls `getManifestFile` which is backed by `IcebergMetadataFilesCache` with singleflight protection, so concurrent requests for the same key do not cause duplicate S3 fetches.

Setting this to 1 disables parallel fetching and restores fully sequential behavior, which is useful for debugging or reproducing exact serial timing.

Valid range: 1–64.
)", 0) \
DECLARE(Bool, use_query_cache, false, R"(
If turned on, `SELECT` queries may utilize the [query cache](../query-cache.md). Parameters [enable_reads_from_query_cache](#enable_reads_from_query_cache)
and [enable_writes_to_query_cache](#enable_writes_to_query_cache) control in more detail how the cache is used.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
});
addSettingsChanges(settings_changes_history, "25.8",
{
{"iceberg_metadata_files_parallel_loading_threads", 1, 8, "New setting: number of threads to fetch Iceberg manifest files in parallel during query planning."},
{"output_format_json_quote_64bit_integers", true, false, "Disable quoting of the 64 bit integers in JSON by default"},
{"show_data_lake_catalogs_in_system_tables", true, true, "New setting"},
{"optimize_rewrite_regexp_functions", false, true, "A new setting"},
Expand Down
164 changes: 149 additions & 15 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#if USE_AVRO

#include <cstddef>
#include <future>
#include <memory>
#include <optional>
#include <Formats/FormatFilterInfo.h>
Expand All @@ -22,6 +23,7 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/SharedThreadPools.h>
#include <Interpreters/Context.h>
#include <Disks/DiskType.h>

Expand Down Expand Up @@ -62,6 +64,7 @@ extern const Event IcebergMinMaxIndexPrunedFiles;
extern const Event IcebergMetadataReadWaitTimeMicroseconds;
extern const Event IcebergMetadataReturnedObjectInfos;
extern const Event IcebergIteratorNextMicroseconds;
extern const Event IcebergManifestFilesParallelFetchMicroseconds;
};


Expand All @@ -75,6 +78,7 @@ extern const int LOGICAL_ERROR;
namespace Setting
{
extern const SettingsBool use_iceberg_partition_pruning;
extern const SettingsUInt64 iceberg_metadata_files_parallel_loading_threads;
};


Expand Down Expand Up @@ -118,6 +122,83 @@ defineDeletesSpan(ManifestFileEntry data_object_, const std::vector<ManifestFile

}

void SingleThreadIcebergKeysIterator::initParallelPrefetch()
{
if (!data_snapshot)
return;

const size_t n = data_snapshot->manifest_list_entries.size();
if (n == 0)
return;

auto configuration_locked = configuration.lock();

ThreadPool & pool = getIOThreadPool().get();
auto thread_group = CurrentThread::getGroup();

/// Reserve to avoid reallocation: std::future is move-only, and vector reallocation
/// with non-noexcept move + no copy would cause a compile error on some STL versions.
prefetch_entries.reserve(n);

/// Submit one getManifestFile task per matching manifest, in manifest_list order.
/// CacheBase::getOrSet has singleflight protection — concurrent callers for the same
/// key serialize on a per-key token mutex, so we don't cause N duplicate S3 GETs.
for (size_t i = 0; i < n; ++i)
{
const auto & entry = data_snapshot->manifest_list_entries[i];
if (entry.content_type != manifest_file_content_type)
continue;

auto promise = std::make_shared<std::promise<Iceberg::ManifestFilePtr>>();
prefetch_entries.push_back({i, promise->get_future()});

/// Capture all data by value. object_storage, local_context and the shared_ptrs
/// inside persistent_components are all reference-counted, so the lambda keeps
/// them alive even if SingleThreadIcebergKeysIterator is destroyed while a task
/// is still running.
pool.scheduleOrThrowOnError(
[promise_cap = std::move(promise),
object_storage_cap = this->object_storage,
configuration_cap = configuration_locked,
persistent_components_cap = this->persistent_components,
local_context_cap = this->local_context,
log_cap = this->log,
abs_path = entry.manifest_file_absolute_path,
seq_num = entry.added_sequence_number,
snap_id = entry.added_snapshot_id,
secondary_storages_cap = this->secondary_storages,
thread_group_cap = thread_group]() mutable
{
DB::ThreadGroupSwitcher switcher(thread_group_cap, "IcebergMFetch");
try
{
auto result = Iceberg::getManifestFile(
object_storage_cap,
configuration_cap,
persistent_components_cap,
local_context_cap,
log_cap,
abs_path,
seq_num,
snap_id,
*secondary_storages_cap);
promise_cap->set_value(std::move(result));
}
catch (...)
{
promise_cap->set_exception(std::current_exception());
}
});
}

LOG_DEBUG(
log,
"Submitted {} parallel manifest file fetch tasks ({} total entries, thread_count={})",
prefetch_entries.size(),
n,
parallel_loading_threads);
}

std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::IcebergIteratorNextMicroseconds);
Expand All @@ -127,26 +208,60 @@ std::optional<ManifestFileEntry> SingleThreadIcebergKeysIterator::next()
return std::nullopt;
}

while (manifest_file_index < data_snapshot->manifest_list_entries.size())
/// Lazy initialization of parallel prefetch on the first next() call.
if (!prefetch_initialized)
{
prefetch_initialized = true;
if (parallel_loading_threads > 1)
initParallelPrefetch();
}

while (true)
{
if (!current_manifest_file_content)
{
if (persistent_components.format_version > 1 && data_snapshot->manifest_list_entries[manifest_file_index].content_type != manifest_file_content_type)
if (parallel_loading_threads > 1)
{
++manifest_file_index;
continue;
/// Parallel path: consume the next pre-fetched manifest future.
if (prefetch_consume_pos >= prefetch_entries.size())
return std::nullopt;

auto & [manifest_idx, fut] = prefetch_entries[prefetch_consume_pos++];
ManifestFilePtr result;
{
ProfileEventTimeIncrement<Microseconds> wait_watch(ProfileEvents::IcebergManifestFilesParallelFetchMicroseconds);
result = fut.get();
}
current_manifest_file_content = std::move(result);
internal_data_index = 0;
}
else
{
/// Serial path: find the next manifest with matching content type.
while (manifest_file_index < data_snapshot->manifest_list_entries.size())
{
if (persistent_components.format_version > 1 &&
data_snapshot->manifest_list_entries[manifest_file_index].content_type != manifest_file_content_type)
{
++manifest_file_index;
continue;
}
current_manifest_file_content = Iceberg::getManifestFile(
object_storage,
configuration.lock(),
persistent_components,
local_context,
log,
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path,
data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number,
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id,
*secondary_storages);
internal_data_index = 0;
break;
}
if (!current_manifest_file_content)
return std::nullopt;
}
current_manifest_file_content = Iceberg::getManifestFile(
object_storage,
configuration.lock(),
persistent_components,
local_context,
log,
data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path,
data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number,
data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id,
*secondary_storages);
internal_data_index = 0;
}
auto files = files_generator(current_manifest_file_content);
while (internal_data_index < files.size())
Expand Down Expand Up @@ -215,6 +330,19 @@ SingleThreadIcebergKeysIterator::~SingleThreadIcebergKeysIterator()
ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunedFiles, partition_pruned_files);
if (min_max_index_pruned_files > 0)
ProfileEvents::increment(ProfileEvents::IcebergMinMaxIndexPrunedFiles, min_max_index_pruned_files);

/// Wait for any in-flight prefetch tasks that were not yet consumed by next().
/// The lambda captures shared_ptrs by value so the objects remain alive, but we
/// must wait to prevent the thread-pool thread from writing to a promise whose
/// shared state we no longer observe.
for (size_t i = prefetch_consume_pos; i < prefetch_entries.size(); ++i)
{
if (prefetch_entries[i].future.valid())
{
try { prefetch_entries[i].future.wait(); }
catch (...) {} // NOLINT: intentional swallow in destructor
}
}
}

SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator(
Expand Down Expand Up @@ -251,6 +379,8 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator(
, log(getLogger("IcebergIterator"))
, secondary_storages(secondary_storages_)
, manifest_file_content_type(manifest_file_content_type_)
, parallel_loading_threads(
local_context_ ? local_context_->getSettingsRef()[Setting::iceberg_metadata_files_parallel_loading_threads].value : 1)
{
}

Expand Down Expand Up @@ -303,6 +433,10 @@ IcebergIterator::IcebergIterator(
, table_schema_id(table_snapshot_->schema_id)
, secondary_storages(secondary_storages_)
{
/// Drain the delete manifests. With parallel_loading_threads > 1, the first call to
/// deletes_iterator.next() triggers initParallelPrefetch() which fans out all delete
/// manifest fetches to the IO thread pool simultaneously. Subsequent next() calls
/// consume the pre-fetched futures in order, so the serial drain loop below is fast.
auto delete_file = deletes_iterator.next();
while (delete_file.has_value())
{
Expand Down
17 changes: 16 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/ConcurrentBoundedQueue.h>

#include <optional>
#include <future>
#include <base/defines.h>

#include <Core/BackgroundSchedulePool.h>
Expand Down Expand Up @@ -54,6 +55,8 @@ class SingleThreadIcebergKeysIterator
~SingleThreadIcebergKeysIterator();

private:
void initParallelPrefetch();

ObjectStoragePtr object_storage;
std::shared_ptr<const ActionsDAG> filter_dag;
ContextPtr local_context;
Expand All @@ -66,7 +69,7 @@ class SingleThreadIcebergKeysIterator
LoggerPtr log;
std::shared_ptr<SecondaryStorages> secondary_storages;

// By Iceberg design it is difficult to avoid storing position deletes in memory.
/// Serial iteration state (used when parallel_loading_threads == 1)
size_t manifest_file_index = 0;
size_t internal_data_index = 0;
Iceberg::ManifestFilePtr current_manifest_file_content;
Expand All @@ -77,6 +80,18 @@ class SingleThreadIcebergKeysIterator

size_t min_max_index_pruned_files = 0;
size_t partition_pruned_files = 0;

/// Parallel prefetch state (used when parallel_loading_threads > 1)
UInt64 parallel_loading_threads = 1;

struct PrefetchEntry
{
size_t manifest_list_index;
std::future<Iceberg::ManifestFilePtr> future;
};
std::vector<PrefetchEntry> prefetch_entries;
size_t prefetch_consume_pos = 0;
bool prefetch_initialized = false;
};

}
Expand Down
Loading
Loading