Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,8 @@
M(1003, SSH_EXCEPTION) \
M(1004, STARTUP_SCRIPTS_ERROR) \
M(1005, PENDING_MUTATIONS_NOT_ALLOWED) \
M(1006, EXPORT_PARTITION_ALREADY_EXPORTED) \
M(1007, PARTITION_EXPORT_FAILED) \
/* See END */

#ifdef APPLY_FOR_EXTERNAL_ERROR_CODES
Expand All @@ -675,7 +677,7 @@ namespace ErrorCodes
APPLY_FOR_ERROR_CODES(M)
#undef M

constexpr ErrorCode END = 1005;
constexpr ErrorCode END = 1007;
ErrorPairHolder values[END + 1]{};

struct ErrorCodesNames
Expand Down
8 changes: 8 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7408,6 +7408,14 @@ On the other hand, there is a chance once the task executes that part has alread
DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, false, R"(
Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information.
Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled.
)", 0) \
DECLARE(ExportPartitionAllOnError, export_merge_tree_partition_all_on_error, ExportPartitionAllOnError::throw_first, R"(
Failure handling for `ALTER TABLE ... EXPORT PARTITION ALL ...`.
Possible values:
- `throw_first` (default) - stop at the first failed partition; partitions already scheduled remain scheduled.
- `collect` - try every partition and throw a single aggregated exception at the end if any failed; partitions that succeeded remain scheduled.
- `skip_conflicts` - silently skip partitions that are already exported / being exported (errors with code EXPORT_PARTITION_ALREADY_EXPORTED); fail-fast on every other error.
Has no effect on `EXPORT PARTITION <id>` (single-partition export).
)", 0) \
DECLARE(Timezone, iceberg_partition_timezone, "", R"(
Time zone by which partitioning of Iceberg tables was performed.
Expand Down
3 changes: 2 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class WriteBuffer;
M(CLASS_NAME, ObjectStorageGranularityLevel) \
M(CLASS_NAME, DecorrelationJoinKind) \
M(CLASS_NAME, JoinOrderAlgorithm) \
M(CLASS_NAME, DeduplicateInsertSelectMode)
M(CLASS_NAME, DeduplicateInsertSelectMode) \
M(CLASS_NAME, ExportPartitionAllOnError)


COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT)
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
{"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."},
{"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."},
{"export_merge_tree_partition_all_on_error", "throw_first", "throw_first", "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
{"object_storage_cluster", "", "", "Antalya: New setting"},
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,4 +449,6 @@ IMPLEMENT_SETTING_ENUM(DeduplicateInsertSelectMode, ErrorCodes::BAD_ARGUMENTS,

IMPLEMENT_SETTING_AUTO_ENUM(MergeTreePartExportFileAlreadyExistsPolicy, ErrorCodes::BAD_ARGUMENTS);

IMPLEMENT_SETTING_AUTO_ENUM(ExportPartitionAllOnError, ErrorCodes::BAD_ARGUMENTS);

}
9 changes: 9 additions & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,4 +582,13 @@ enum class MergeTreePartExportFileAlreadyExistsPolicy : uint8_t

DECLARE_SETTING_ENUM(MergeTreePartExportFileAlreadyExistsPolicy)

enum class ExportPartitionAllOnError : uint8_t
{
throw_first,
collect,
skip_conflicts,
};

DECLARE_SETTING_ENUM(ExportPartitionAllOnError)

}
7 changes: 5 additions & 2 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6196,8 +6196,11 @@ void MergeTreeData::checkAlterPartitionIsPossible(
const auto * partition_ast = command.partition->as<ASTPartition>();
if (partition_ast && partition_ast->all)
{
if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION && !(command.type == PartitionCommand::REPLACE_PARTITION && !command.replace))
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
if (command.type != PartitionCommand::DROP_PARTITION
&& command.type != PartitionCommand::ATTACH_PARTITION
&& command.type != PartitionCommand::EXPORT_PARTITION
&& !(command.type == PartitionCommand::REPLACE_PARTITION && !command.replace))
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH/EXPORT PARTITION ALL currently");
}
else
{
Expand Down
83 changes: 81 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ namespace Setting
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
extern const SettingsBool export_merge_tree_partition_lock_inside_the_task;
extern const SettingsExportPartitionAllOnError export_merge_tree_partition_all_on_error;
extern const SettingsString export_merge_tree_part_filename_pattern;
extern const SettingsBool write_full_path_in_iceberg_metadata;
extern const SettingsBool allow_experimental_insert_into_iceberg;
Expand Down Expand Up @@ -335,6 +336,8 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int INVALID_SETTING_VALUE;
extern const int PENDING_MUTATIONS_NOT_ALLOWED;
extern const int EXPORT_PARTITION_ALREADY_EXPORTED;
extern const int PARTITION_EXPORT_FAILED;
}

namespace ServerSetting
Expand Down Expand Up @@ -8096,6 +8099,82 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
"If you are exporting to an Apache Iceberg table, you also need to enable the setting `allow_experimental_insert_into_iceberg` on all replicas. The same goes for `allow_experimental_export_merge_tree_part`");
}

/// EXPORT PARTITION ALL: expand into one sub-call per active partition id.
/// Failure handling is controlled by `export_merge_tree_partition_all_on_error`.
if (const auto * partition_ast = command.partition->as<ASTPartition>(); partition_ast && partition_ast->all)
{
auto partition_id_set = getAllPartitionIds();
if (partition_id_set.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Table {} has no active partitions to export",
getStorageID().getNameForLogs());

/// Sort for deterministic ordering (so failure messages and tests are stable).
std::vector<String> partition_ids(partition_id_set.begin(), partition_id_set.end());
std::sort(partition_ids.begin(), partition_ids.end());

const auto & on_error_setting = query_context->getSettingsRef()[Setting::export_merge_tree_partition_all_on_error];
const ExportPartitionAllOnError on_error = on_error_setting.value;

LOG_INFO(log, "EXPORT PARTITION ALL: scheduling export for {} partitions, on_error={}",
partition_ids.size(), on_error_setting.toString());

std::vector<std::pair<String, String>> failures; /// (partition_id, message)
size_t skipped_conflicts = 0;

for (const auto & partition_id : partition_ids)
{
PartitionCommand sub = command;
auto synthetic = make_intrusive<ASTPartition>();
synthetic->setPartitionID(make_intrusive<ASTLiteral>(partition_id));
sub.partition = synthetic;

try
{
exportPartitionToTable(sub, query_context);
}
catch (const Exception & e)
{
switch (on_error)
{
case ExportPartitionAllOnError::throw_first:
throw;
case ExportPartitionAllOnError::skip_conflicts:
if (e.code() == ErrorCodes::EXPORT_PARTITION_ALREADY_EXPORTED)
{
++skipped_conflicts;
LOG_INFO(log,
"EXPORT PARTITION ALL: skipping partition {} (already exported / concurrent): {}",
partition_id, e.message());
break;
}
throw;
case ExportPartitionAllOnError::collect:
LOG_WARNING(log, "EXPORT PARTITION ALL: partition {} failed: {}",
partition_id, e.message());
failures.emplace_back(partition_id, e.message());
break;
}
}
}

if (!failures.empty())
{
String aggregated = fmt::format(
"EXPORT PARTITION ALL: {}/{} partitions failed to schedule. Per-partition errors:",
failures.size(), partition_ids.size());
for (const auto & [pid, msg] : failures)
aggregated += fmt::format("\n {}: {}", pid, msg);
throw Exception(ErrorCodes::PARTITION_EXPORT_FAILED, "{}", aggregated);
}

if (skipped_conflicts > 0)
LOG_INFO(log, "EXPORT PARTITION ALL: skipped {} partitions due to existing exports",
skipped_conflicts);

return;
}

const auto dest_database = query_context->resolveDatabase(command.to_database);
const auto dest_table = command.to_table;
const auto dest_storage_id = StorageID(dest_database, dest_table);
Expand Down Expand Up @@ -8175,7 +8254,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &

if (!has_expired && !query_context->getSettingsRef()[Setting::export_merge_tree_partition_force_export])
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Export with key {} already exported or it is being exported, and it has not expired. Set `export_merge_tree_partition_force_export` to overwrite it.", export_key);
throw Exception(ErrorCodes::EXPORT_PARTITION_ALREADY_EXPORTED, "Export with key {} already exported or it is being exported, and it has not expired. Set `export_merge_tree_partition_force_export` to overwrite it.", export_key);
}

LOG_INFO(log, "Overwriting export with key {}", export_key);
Expand Down Expand Up @@ -8369,7 +8448,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
{
/// Lost the race on the root export node. Current code already
/// validated (exists / expired / force) — so this is *always* a race.
throw Exception(ErrorCodes::BAD_ARGUMENTS,
throw Exception(ErrorCodes::EXPORT_PARTITION_ALREADY_EXPORTED,
"Export with key {} was created concurrently by another replica. Retry if needed",
export_key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,32 @@ def test_export_two_partitions_to_iceberg(cluster):
assert count_2021 == 1, f"Expected 1 row for year=2021, got {count_2021}"


def test_export_partition_all_to_iceberg(cluster):
"""
`ALTER TABLE ... EXPORT PARTITION ALL TO TABLE ...` schedules every active partition
in one statement and exercises the Iceberg-specific destination compatibility checks
(which are repeated per sub-call inside the loop).
"""
node = cluster.instances["replica1"]

uid = unique_suffix()
mt_table = f"mt_{uid}"
iceberg_table = f"iceberg_{uid}"

setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1"])

node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {iceberg_table}")

wait_for_export_status(node, mt_table, iceberg_table, "2020", "COMPLETED")
wait_for_export_status(node, mt_table, iceberg_table, "2021", "COMPLETED")

count_2020 = int(node.query(f"SELECT count() FROM {iceberg_table} WHERE year = 2020").strip())
count_2021 = int(node.query(f"SELECT count() FROM {iceberg_table} WHERE year = 2021").strip())

assert count_2020 == 3, f"Expected 3 rows for year=2020, got {count_2020}"
assert count_2021 == 1, f"Expected 1 row for year=2021, got {count_2021}"


def test_failure_is_logged_in_system_table(cluster):
"""
When S3 is unreachable the export must be marked FAILED in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1505,3 +1505,100 @@ def test_export_partition_resumes_after_stop_moves_during_export(cluster):

row_count = int(node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip())
assert row_count == 3, f"Expected 3 rows in S3 after export completed, got {row_count}"


def test_export_partition_all(cluster):
"""Happy path for `ALTER TABLE ... EXPORT PARTITION ALL TO TABLE ...`.

Schedules one export task per active partition in a single ALTER, then
verifies every partition lands in the destination S3 table.
"""
node = cluster.instances["replica1"]

uid = str(uuid.uuid4()).replace("-", "_")
mt_table = f"export_all_mt_{uid}"
s3_table = f"export_all_s3_{uid}"

node.query(
f"CREATE TABLE {mt_table} (id UInt64, year UInt16)"
f" ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', 'replica1')"
f" PARTITION BY year ORDER BY tuple()"
)
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2021), (3, 2022)")
create_s3_table(node, s3_table)

node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}")

for partition_id in ("2020", "2021", "2022"):
wait_for_export_status(node, mt_table, s3_table, partition_id, "COMPLETED", timeout=60)

row_count = int(node.query(f"SELECT count() FROM {s3_table}").strip())
assert row_count == 3, f"Expected 3 rows in S3 after EXPORT PARTITION ALL, got {row_count}"


def test_export_partition_all_failure_modes(cluster):
"""Cover the three values of `export_merge_tree_partition_all_on_error`.

Set up an already-fully-exported source table, then re-run EXPORT PARTITION ALL
with each failure mode and assert the documented behavior.
"""
node = cluster.instances["replica1"]

uid = str(uuid.uuid4()).replace("-", "_")
mt_table = f"export_all_modes_mt_{uid}"
s3_table = f"export_all_modes_s3_{uid}"
empty_mt = f"export_all_empty_mt_{uid}"

node.query(
f"CREATE TABLE {mt_table} (id UInt64, year UInt16)"
f" ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', 'replica1')"
f" PARTITION BY year ORDER BY tuple()"
)
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2021), (3, 2022)")
create_s3_table(node, s3_table)

# First run: schedule + wait for all partitions to complete.
node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}")
for partition_id in ("2020", "2021", "2022"):
wait_for_export_status(node, mt_table, s3_table, partition_id, "COMPLETED", timeout=60)

# Empty table: throws BAD_ARGUMENTS (no active partitions).
node.query(
f"CREATE TABLE {empty_mt} (id UInt64, year UInt16)"
f" ENGINE = ReplicatedMergeTree('/clickhouse/tables/{empty_mt}', 'replica1')"
f" PARTITION BY year ORDER BY tuple()"
)
error = node.query_and_get_error(
f"ALTER TABLE {empty_mt} EXPORT PARTITION ALL TO TABLE {s3_table}"
)
assert "no active partitions to export" in error, (
f"Expected 'no active partitions' error, got: {error}"
)

# throw_first (default): re-run aborts on the first conflicting partition.
error = node.query_and_get_error(
f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}"
f" SETTINGS export_merge_tree_partition_all_on_error = 'throw_first'"
)
assert "EXPORT_PARTITION_ALREADY_EXPORTED" in error, (
f"Expected EXPORT_PARTITION_ALREADY_EXPORTED in error, got: {error}"
)

# collect: aggregated PARTITION_EXPORT_FAILED message lists every conflicting partition.
error = node.query_and_get_error(
f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}"
f" SETTINGS export_merge_tree_partition_all_on_error = 'collect'"
)
assert "PARTITION_EXPORT_FAILED" in error, (
f"Expected PARTITION_EXPORT_FAILED in error, got: {error}"
)
for partition_id in ("2020", "2021", "2022"):
assert partition_id in error, (
f"Expected aggregated error to mention partition {partition_id}, got: {error}"
)

# skip_conflicts: succeeds silently because every partition conflicts and is skipped.
node.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ALL TO TABLE {s3_table}"
f" SETTINGS export_merge_tree_partition_all_on_error = 'skip_conflicts'"
)
Loading