Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7315,6 +7315,15 @@ Allows creation of [QBit](../../sql-reference/data-types/qbit.md) data type.
)", BETA, allow_experimental_qbit_type) \
DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"(
Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \
DECLARE(Timezone, iceberg_partition_timezone, "", R"(
Time zone by which partitioning of Iceberg tables was performed.
Possible values:

- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
- `` (empty value) - use server or session timezone

Default value is empty.
)", 0) \
\
/* ####################################################### */ \
/* ########### START OF EXPERIMENTAL FEATURES ############ */ \
Expand Down
4 changes: 4 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "26.1.1.20001",
{
{"iceberg_partition_timezone", "", "", "New setting."},
});
addSettingsChanges(settings_changes_history, "26.1",
{
{"parallel_replicas_filter_pushdown", false, false, "New setting"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace DB
namespace Setting
{
extern const SettingsUInt64 iceberg_insert_max_partitions;
extern const SettingsTimezone iceberg_partition_timezone;
}

namespace ErrorCodes
Expand Down Expand Up @@ -53,7 +54,7 @@ ChunkPartitioner::ChunkPartitioner(

auto & factory = FunctionFactory::instance();

auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name);
auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]);
if (!transform_and_argument)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown transform {}", transform_name);

Expand All @@ -67,6 +68,7 @@ ChunkPartitioner::ChunkPartitioner(
result_data_types.push_back(function->getReturnType(columns_for_function));
functions.push_back(function);
function_params.push_back(transform_and_argument->argument);
function_time_zones.push_back(transform_and_argument->time_zone);
columns_to_apply.push_back(column_name);
}
}
Expand Down Expand Up @@ -103,6 +105,14 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk)
arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "#"));
}
arguments.push_back(name_to_column[columns_to_apply[transform_ind]]);
if (function_time_zones[transform_ind].has_value())
{
auto type = std::make_shared<DataTypeString>();
auto column_value = ColumnString::create();
column_value->insert(*function_time_zones[transform_ind]);
auto const_column = ColumnConst::create(std::move(column_value), chunk.getNumRows());
arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "PartitioningTimezone"));
}
auto result
= functions[transform_ind]->build(arguments)->execute(arguments, std::make_shared<DataTypeString>(), chunk.getNumRows(), false);
for (size_t i = 0; i < chunk.getNumRows(); ++i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class ChunkPartitioner

std::vector<FunctionOverloadResolverPtr> functions;
std::vector<std::optional<size_t>> function_params;
std::vector<std::optional<String>> function_time_zones;
std::vector<String> columns_to_apply;
std::vector<DataTypePtr> result_data_types;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <compare>
#include <optional>

#include <Interpreters/Context.h>
#include <Interpreters/IcebergMetadataLog.h>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
Expand All @@ -14,6 +15,7 @@
#include <Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>

#include <Core/Settings.h>
#include <Core/TypeId.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Poco/JSON/Parser.h>
Expand All @@ -34,6 +36,11 @@ namespace DB::ErrorCodes
extern const int BAD_ARGUMENTS;
}

namespace DB::Setting
{
extern const SettingsTimezone iceberg_partition_timezone;
}

namespace DB::Iceberg
{

Expand Down Expand Up @@ -219,7 +226,7 @@ ManifestFileContent::ManifestFileContent(
auto transform_name = partition_specification_field->getValue<String>(f_partition_transform);
auto partition_name = partition_specification_field->getValue<String>(f_partition_name);
common_partition_specification.emplace_back(source_id, transform_name, partition_name);
auto partition_ast = getASTFromTransform(transform_name, numeric_column_name);
auto partition_ast = getASTFromTransform(transform_name, numeric_column_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]);
/// Unsupported partition key expression
if (partition_ast == nullptr)
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ using namespace DB;
namespace DB::Iceberg
{

DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name)
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone)
{
auto transform_and_argument = parseTransformAndArgument(transform_name_src);
auto transform_and_argument = parseTransformAndArgument(transform_name_src, time_zone);
if (!transform_and_argument)
{
LOG_WARNING(&Poco::Logger::get("Iceberg Partition Pruning"), "Cannot parse iceberg transform name: {}.", transform_name_src);
Expand All @@ -47,6 +47,13 @@ DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String &
return makeASTFunction(
transform_and_argument->transform_name, make_intrusive<ASTLiteral>(*transform_and_argument->argument), make_intrusive<ASTIdentifier>(column_name));
}
if (transform_and_argument->time_zone)
{
return makeASTFunction(
transform_and_argument->transform_name,
make_intrusive<ASTIdentifier>(column_name),
make_intrusive<ASTLiteral>(*transform_and_argument->time_zone));
}
return makeASTFunction(transform_and_argument->transform_name, make_intrusive<ASTIdentifier>(column_name));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace DB::Iceberg
struct ManifestFileEntry;
class ManifestFileContent;

DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name);
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone);

/// Prune specific data files based on manifest content
class ManifestFilesPruner
Expand Down
31 changes: 20 additions & 11 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ namespace ProfileEvents
namespace DB::Setting
{
extern const SettingsUInt64 output_format_compression_level;
extern const SettingsTimezone iceberg_partition_timezone;
}

/// Hard to imagine a hint file larger than 10 MB
Expand Down Expand Up @@ -240,27 +241,31 @@ bool writeMetadataFileAndVersionHint(
}


std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src)
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src, const String & time_zone)
{
std::string transform_name = Poco::toLower(transform_name_src);

std::optional<String> time_zone_opt;
if (!time_zone.empty())
time_zone_opt = time_zone;

if (transform_name == "year" || transform_name == "years")
return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt};
return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt, time_zone_opt};

if (transform_name == "month" || transform_name == "months")
return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt};
return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt, time_zone_opt};

if (transform_name == "day" || transform_name == "date" || transform_name == "days" || transform_name == "dates")
return TransformAndArgument{"toRelativeDayNum", std::nullopt};
return TransformAndArgument{"toRelativeDayNum", std::nullopt, time_zone_opt};

if (transform_name == "hour" || transform_name == "hours")
return TransformAndArgument{"toRelativeHourNum", std::nullopt};
return TransformAndArgument{"toRelativeHourNum", std::nullopt, time_zone_opt};

if (transform_name == "identity")
return TransformAndArgument{"identity", std::nullopt};
return TransformAndArgument{"identity", std::nullopt, std::nullopt};

if (transform_name == "void")
return TransformAndArgument{"tuple", std::nullopt};
return TransformAndArgument{"tuple", std::nullopt, std::nullopt};

if (transform_name.starts_with("truncate") || transform_name.starts_with("bucket"))
{
Expand All @@ -284,11 +289,11 @@ std::optional<TransformAndArgument> parseTransformAndArgument(const String & tra

if (transform_name.starts_with("truncate"))
{
return TransformAndArgument{"icebergTruncate", argument};
return TransformAndArgument{"icebergTruncate", argument, std::nullopt};
}
else if (transform_name.starts_with("bucket"))
{
return TransformAndArgument{"icebergBucket", argument};
return TransformAndArgument{"icebergBucket", argument, std::nullopt};
}
}
return std::nullopt;
Expand Down Expand Up @@ -1138,7 +1143,8 @@ KeyDescription getSortingKeyDescriptionFromMetadata(Poco::JSON::Object::Ptr meta
auto column_name = source_id_to_column_name[source_id];
int direction = field->getValue<String>(f_direction) == "asc" ? 1 : -1;
auto iceberg_transform_name = field->getValue<String>(f_transform);
auto clickhouse_transform_name = parseTransformAndArgument(iceberg_transform_name);
auto clickhouse_transform_name = parseTransformAndArgument(iceberg_transform_name,
local_context->getSettingsRef()[Setting::iceberg_partition_timezone]);
String full_argument;
if (clickhouse_transform_name->transform_name != "identity")
{
Expand All @@ -1147,7 +1153,10 @@ KeyDescription getSortingKeyDescriptionFromMetadata(Poco::JSON::Object::Ptr meta
{
full_argument += std::to_string(*clickhouse_transform_name->argument) + ", ";
}
full_argument += column_name + ")";
full_argument += column_name;
if (clickhouse_transform_name->time_zone)
full_argument += ", " + *clickhouse_transform_name->time_zone;
full_argument += ")";
}
else
{
Expand Down
7 changes: 6 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ struct TransformAndArgument
{
String transform_name;
std::optional<size_t> argument;
/// When Iceberg table is partitioned by time, splitting by partitions can be made using different timezone
/// (UTC in most cases). This timezone can be set with setting `iceberg_partition_timezone`, value is in this member.
/// When Iceberg partition condition converted to ClickHouse function in `parseTransformAndArgument` method
/// `time_zone` added as second argument to functions like `toRelativeDayNum`, `toYearNumSinceEpoch`, etc.
std::optional<String> time_zone;
};

std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src);
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src, const String & time_zone);

Poco::JSON::Object::Ptr getMetadataJSONObject(
const String & metadata_file_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ services:
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8080:8080
- 10002:10000
- 10003:10001
- ${SPARK_ICEBERG_EXTERNAL_PORT:-8080}:8080
- ${SPARK_ICEBERG_EXTERNAL_PORT_2:-10002}:10000
- ${SPARK_ICEBERG_EXTERNAL_PORT_3:-10003}:10001
stop_grace_period: 5s
cpus: 3
rest:
image: tabulario/iceberg-rest:1.6.0
ports:
- 8182:8181
- ${ICEBERG_REST_EXTERNAL_PORT:-8182}:8181
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=ClickHouse_Minio_P@ssw0rd
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/helpers/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,9 @@ def __init__(
self.minio_secret_key = minio_secret_key

self.spark_session = None
self.spark_iceberg_external_port = 8080
self.spark_iceberg_external_port_2 = 10002
self.spark_iceberg_external_port_3 = 10003
self.with_iceberg_catalog = False
self.with_glue_catalog = False
self.with_hms_catalog = False
Expand Down Expand Up @@ -885,6 +888,8 @@ def __init__(
self._letsencrypt_pebble_api_port = 14000
self._letsencrypt_pebble_management_port = 15000

self.iceberg_rest_external_port = 8182

self.docker_client: docker.DockerClient = None
self.is_up = False
self.env = os.environ.copy()
Expand Down Expand Up @@ -1702,6 +1707,10 @@ def setup_hms_catalog_cmd(self, instance, env_variables, docker_compose_yml_dir)
def setup_iceberg_catalog_cmd(
self, instance, env_variables, docker_compose_yml_dir, extra_parameters=None
):
env_variables["ICEBERG_REST_EXTERNAL_PORT"] = str(self.iceberg_rest_external_port)
env_variables["SPARK_ICEBERG_EXTERNAL_PORT"] = str(self.spark_iceberg_external_port)
env_variables["SPARK_ICEBERG_EXTERNAL_PORT_2"] = str(self.spark_iceberg_external_port_2)
env_variables["SPARK_ICEBERG_EXTERNAL_PORT_3"] = str(self.spark_iceberg_external_port_3)
self.with_iceberg_catalog = True
file_name = "docker_compose_iceberg_rest_catalog.yml"
if extra_parameters is not None and extra_parameters["docker_compose_file_name"] != "":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<iceberg_partition_timezone>UTC</iceberg_partition_timezone>
</default>
</profiles>
</clickhouse>
3 changes: 3 additions & 0 deletions tests/integration/test_database_iceberg/configs/timezone.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<clickhouse>
<timezone>Asia/Istanbul</timezone>
</clickhouse>
Loading
Loading