Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,18 @@
M(ZooKeeperBytesSent, "Number of bytes send over network while communicating with ZooKeeper.", ValueType::Bytes) \
M(ZooKeeperBytesReceived, "Number of bytes received over network while communicating with ZooKeeper.", ValueType::Bytes) \
\
M(ExportPartitionZooKeeperRequests, "Total number of ZooKeeper requests made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGet, "Number of 'get' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetChildren, "Number of 'getChildren' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetChildrenWatch, "Number of 'getChildrenWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperGetWatch, "Number of 'getWatch' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperCreate, "Number of 'create' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperSet, "Number of 'set' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperRemove, "Number of 'remove' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperRemoveRecursive, "Number of 'removeRecursive' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperMulti, "Number of 'multi' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperExists, "Number of 'exists' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
\
M(DistributedConnectionTries, "Total count of distributed connection attempts.", ValueType::Number) \
M(DistributedConnectionUsable, "Total count of successful distributed connections to a usable server (with required table, but maybe stale).", ValueType::Number) \
M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.", ValueType::Number) \
Expand Down
8 changes: 8 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6899,6 +6899,14 @@ Possible values:
- `` (empty value) - use session timezone

Default value is `UTC`.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"(
Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list.
On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, true, R"(
Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information.
Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled.
)", 0) \
\
/* ####################################################### */ \
Expand Down
4 changes: 3 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."},
{"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
17 changes: 8 additions & 9 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ struct ExportReplicatedMergeTreePartitionManifest
bool parallel_formatting;
bool parquet_parallel_encoding;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
bool lock_inside_the_task; /// todo temporary

std::string toJsonString() const
{
Expand All @@ -131,6 +132,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("lock_inside_the_task", lock_inside_the_task);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -160,18 +162,15 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.max_threads = json->getValue<size_t>("max_threads");
manifest.parallel_formatting = json->getValue<bool>("parallel_formatting");
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");

if (json->has("file_already_exists_policy"))
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
/// todo what to do if it's not a valid value?
if (file_already_exists_policy)
{
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
if (file_already_exists_policy)
{
manifest.file_already_exists_policy = file_already_exists_policy.value();
}

/// what to do if it's not a valid value?
manifest.file_already_exists_policy = file_already_exists_policy.value();
}

manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");
Comment on lines 171 to +172

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Parse old export manifests without lock_inside_the_task

ExportReplicatedMergeTreePartitionManifest::fromJsonString now unconditionally reads lock_inside_the_task from metadata.json. Existing export manifests written by prior versions do not contain this field, so any attempt to inspect existing exports (scheduler startup, status polling, or system.replicated_partition_exports) will throw during JSON parsing and abort the query. A backward-compatible default is needed to avoid breaking upgrades until all historical export nodes are rewritten.

Useful? React with 👍 / 👎.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not being used in production by anyone, it is safe to do it


return manifest;
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
/// This is used to prevent the parts from being deleted before finishing the export operation
/// It does not mean this replica will export all the parts
/// There is also a chance this replica does not contain a given part and it is totally ok.
std::vector<DataPartPtr> part_references;
mutable std::vector<DataPartPtr> part_references;

std::string getCompositeKey() const
{
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/BackgroundJobsAssignee.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, b
return schedule_res;
}

std::size_t BackgroundJobsAssignee::getAvailableMoveExecutors() const
{
return getContext()->getMovesExecutor()->getAvailableSlots();
}

String BackgroundJobsAssignee::toString(Type type)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/BackgroundJobsAssignee.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class BackgroundJobsAssignee : public WithContext
bool scheduleMoveTask(ExecutableTaskPtr move_task);
bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);

std::size_t getAvailableMoveExecutors() const;

/// Just call finish
~BackgroundJobsAssignee();

Expand Down
68 changes: 68 additions & 0 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#include <Storages/MergeTree/ExportPartFromPartitionExportTask.h>
#include <Common/ProfileEvents.h>

namespace ProfileEvents
{
extern const Event ExportPartitionZooKeeperRequests;
extern const Event ExportPartitionZooKeeperGetChildren;
extern const Event ExportPartitionZooKeeperCreate;
}
namespace DB
{

ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask(
StorageReplicatedMergeTree & storage_,
const std::string & key_,
const MergeTreePartExportManifest & manifest_)
: storage(storage_),
key(key_),
manifest(manifest_)
{
export_part_task = std::make_shared<ExportPartTask>(storage, manifest);
}

bool ExportPartFromPartitionExportTask::executeStep()
{
const auto zk = storage.getZooKeeper();
const auto part_name = manifest.data_part->name;

LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name);

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
{
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name);
export_part_task->executeStep();
return false;
}

LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name);
return false;
}

void ExportPartFromPartitionExportTask::cancel() noexcept
{
export_part_task->cancel();
}

void ExportPartFromPartitionExportTask::onCompleted()
{
export_part_task->onCompleted();
}

StorageID ExportPartFromPartitionExportTask::getStorageID() const
{
return export_part_task->getStorageID();
}

Priority ExportPartFromPartitionExportTask::getPriority() const
{
return export_part_task->getPriority();
}

String ExportPartFromPartitionExportTask::getQueryId() const
{
return export_part_task->getQueryId();
}
}
36 changes: 36 additions & 0 deletions src/Storages/MergeTree/ExportPartFromPartitionExportTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/MergeTreePartExportManifest.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ExportPartTask.h>

namespace DB
{

/*
Decorator around the ExportPartTask to lock the part inside the task
*/
class ExportPartFromPartitionExportTask : public IExecutableTask
{
public:
explicit ExportPartFromPartitionExportTask(
StorageReplicatedMergeTree & storage_,
const std::string & key_,
const MergeTreePartExportManifest & manifest_);
bool executeStep() override;
void onCompleted() override;
StorageID getStorageID() const override;
Priority getPriority() const override;
String getQueryId() const override;

void cancel() noexcept override;

private:
StorageReplicatedMergeTree & storage;
std::string key;
MergeTreePartExportManifest manifest;
std::shared_ptr<ExportPartTask> export_part_task;
};

}
33 changes: 28 additions & 5 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <Common/Exception.h>
#include <Common/ProfileEventsScope.h>
#include <Storages/MergeTree/ExportList.h>
#include <Formats/FormatFactory.h>

namespace ProfileEvents
{
Expand All @@ -38,15 +39,25 @@ namespace Setting
extern const SettingsUInt64 min_bytes_to_use_direct_io;
}

ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_, ContextPtr context_)
ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
: storage(storage_),
manifest(manifest_),
local_context(context_)
manifest(manifest_)
{
}

const MergeTreePartExportManifest & ExportPartTask::getManifest() const
{
return manifest;
}

bool ExportPartTask::executeStep()
{
auto local_context = Context::createCopy(storage.getContext());
local_context->makeQueryContextForExportPart();
local_context->setCurrentQueryId(manifest.transaction_id);
local_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART);
local_context->setSettings(manifest.settings);

const auto & metadata_snapshot = manifest.metadata_snapshot;

Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
Expand Down Expand Up @@ -91,7 +102,7 @@ bool ExportPartTask::executeStep()
block_with_partition_values,
(*exports_list_entry)->destination_file_path,
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
manifest.format_settings,
getFormatSettings(local_context),
local_context);
}
catch (const Exception & e)
Expand Down Expand Up @@ -126,10 +137,21 @@ bool ExportPartTask::executeStep()
}
}

tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_WARNING(getLogger("ExportPartTask"), "Export part {} failed: {}", manifest.data_part->name, e.message());

ProfileEvents::increment(ProfileEvents::PartsExportFailures);

storage.writePartLog(
PartLogElement::Type::EXPORT_PART,
ExecutionStatus::fromCurrentException("", true),
static_cast<UInt64>((*exports_list_entry)->elapsed * 1000000000),
manifest.data_part->name,
manifest.data_part,
{manifest.data_part},
nullptr,
nullptr,
exports_list_entry.get());

std::lock_guard inner_lock(storage.export_manifests_mutex);
storage.export_manifests.erase(manifest);

Expand Down Expand Up @@ -259,6 +281,7 @@ bool ExportPartTask::executeStep()

void ExportPartTask::cancel() noexcept
{
LOG_INFO(getLogger("ExportPartTask"), "Export part {} task cancel() method called", manifest.data_part->name);
cancel_requested.store(true);
pipeline.cancel();
}
Expand Down
5 changes: 2 additions & 3 deletions src/Storages/MergeTree/ExportPartTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ class ExportPartTask : public IExecutableTask
public:
explicit ExportPartTask(
MergeTreeData & storage_,
const MergeTreePartExportManifest & manifest_,
ContextPtr context_);
const MergeTreePartExportManifest & manifest_);
bool executeStep() override;
void onCompleted() override;
StorageID getStorageID() const override;
Priority getPriority() const override;
String getQueryId() const override;
const MergeTreePartExportManifest & getManifest() const;

void cancel() noexcept override;

private:
MergeTreeData & storage;
MergeTreePartExportManifest manifest;
ContextPtr local_context;
QueryPipeline pipeline;
std::atomic<bool> cancel_requested = false;

Expand Down
Loading
Loading