Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TableFunctionsWithClusterAlternativesVisitor : public InDepthQueryTreeVisi
bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr &) { return true; }
bool shouldReplaceWithClusterAlternatives() const
{
return !has_subquery && !has_join && (table_count + table_function_count) == 1;
return !has_subquery && !has_join && ((table_count + table_function_count) == 1 || (table_function_count == 0));
}

private:
Expand Down
19 changes: 19 additions & 0 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ namespace Setting
extern const SettingsBool allow_experimental_database_glue_catalog;
extern const SettingsBool allow_experimental_database_hms_catalog;
extern const SettingsBool use_hive_partitioning;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;

}
namespace DataLakeStorageSetting
{
Expand Down Expand Up @@ -434,6 +437,21 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con

auto cluster_name = settings[DatabaseDataLakeSetting::object_storage_cluster].value;

const auto & query_settings = context_->getSettingsRef();

const auto parallel_replicas_cluster_name = query_settings[Setting::cluster_for_parallel_replicas].toString();
if (!parallel_replicas_cluster_name.empty())
cluster_name = parallel_replicas_cluster_name;

const auto can_use_parallel_replicas = !parallel_replicas_cluster_name.empty()
&& query_settings[Setting::parallel_replicas_for_cluster_engines]
&& context_->canUseTaskBasedParallelReplicas()
&& !context_->isDistributed();

bool can_use_distributed_iterator =
context_->getClientInfo().collaborate_with_initiator &&
can_use_parallel_replicas;

return std::make_shared<StorageObjectStorageCluster>(
cluster_name,
configuration,
Expand All @@ -449,6 +467,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
getCatalog(),
/* if_not_exists */ true,
/* is_datalake_query */ true,
can_use_distributed_iterator,
/* is_table_function */ true,
/* lazy_init */ true);
}
Expand Down
12 changes: 4 additions & 8 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <Common/Exception.h>
#include <Common/StringUtils.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSetQuery.h>
#include <Interpreters/Context.h>

Expand Down Expand Up @@ -92,6 +93,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
std::shared_ptr<DataLake::ICatalog> catalog,
bool if_not_exists,
bool is_datalake_query,
bool distributed_processing,
bool is_table_function,
bool lazy_init)
: IStorageCluster(
Expand Down Expand Up @@ -196,7 +198,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
catalog,
if_not_exists,
is_datalake_query,
/* distributed_processing */false,
distributed_processing,
partition_by,
/* is_table_function */false,
/* lazy_init */lazy_init,
Expand Down Expand Up @@ -369,13 +371,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(

auto * table_function = extractTableFunctionFromSelectQuery(query);
if (!table_function)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected SELECT query from table function {}, got '{}'",
configuration->getEngineName(), query->formatForErrorMessage());
}

return;
auto * expression_list = table_function->arguments->as<ASTExpressionList>();
if (!expression_list)
{
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class StorageObjectStorageCluster : public IStorageCluster
std::shared_ptr<DataLake::ICatalog> catalog,
bool if_not_exists,
bool is_datalake_query,
bool distributed_processing,
bool is_table_function,
bool lazy_init);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
configuration->getCatalog(context, args.query.attach),
args.query.if_not_exists,
/* is_datalake_query */ false,
/* distributed_processing */ false,
/* is_table_function */ false,
/* lazy_init */ false);
}
Expand Down
9 changes: 5 additions & 4 deletions src/TableFunctions/TableFunctionObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration, is_data_lake>::

const auto is_secondary_query = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;

bool can_use_distributed_iterator =
client_info.collaborate_with_initiator &&
context->hasClusterFunctionReadTaskCallback();

if (can_use_parallel_replicas && !is_secondary_query && !is_insert_query)
{
storage = std::make_shared<StorageObjectStorageCluster>(
Expand All @@ -207,17 +211,14 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration, is_data_lake>::
configuration->getCatalog(context, /* attach */ false),
/* if_not_exists */ false,
/* is_datalake_query */ false,
/* distributed_processing */ can_use_distributed_iterator,
/* is_table_function */ true,
/* lazy_init */ false);

storage->startup();
return storage;
}

bool can_use_distributed_iterator =
client_info.collaborate_with_initiator &&
context->hasClusterFunctionReadTaskCallback();

storage = std::make_shared<StorageObjectStorage>(
configuration,
getObjectStorage(context, !is_insert_query),
Expand Down
1 change: 1 addition & 0 deletions src/TableFunctions/TableFunctionObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, Configuration, is_data_
/* catalog*/ nullptr,
/* if_not_exists */ false,
/* is_datalake_query */ false,
/* distributed_processing */ false,
/* is_table_function */ true,
/* lazy_init */ true);
}
Expand Down
16 changes: 15 additions & 1 deletion tests/integration/test_database_iceberg/configs/cluster.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,21 @@
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</query_log>
</clickhouse>
51 changes: 51 additions & 0 deletions tests/integration/test_database_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ def started_cluster():
user_configs=[],
stay_alive=True,
with_iceberg_catalog=True,
)

cluster.add_instance(
"node2",
main_configs=["configs/backups.xml", "configs/cluster.xml"],
user_configs=[],
stay_alive=True,
with_iceberg_catalog=True,
with_zookeeper=True,
)

Expand Down Expand Up @@ -821,3 +829,46 @@ def test_gcs(started_cluster):
"""
)
assert "Google cloud storage converts to S3" in str(err.value)


def test_cluster_select(started_cluster):
node1 = started_cluster.instances["node1"]
node2 = started_cluster.instances["node2"]

test_ref = f"test_list_tables_{uuid.uuid4()}"
table_name = f"{test_ref}_table"
root_namespace = f"{test_ref}_namespace"

catalog = load_catalog_impl(started_cluster)
create_clickhouse_iceberg_database(started_cluster, node1, CATALOG_NAME)
create_clickhouse_iceberg_database(started_cluster, node2, CATALOG_NAME)
create_clickhouse_iceberg_table(started_cluster, node1, root_namespace, table_name, "(x String)")
node1.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_name}` VALUES ('pablo');", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1})

query_id = uuid.uuid4().hex
assert node1.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` SETTINGS parallel_replicas_for_cluster_engines=1, enable_parallel_replicas=2, cluster_for_parallel_replicas='cluster_simple'", query_id=query_id) == 'pablo\n'

node1.query("SYSTEM FLUSH LOGS system.query_log")
node2.query("SYSTEM FLUSH LOGS system.query_log")

assert node1.query(f"SELECT Settings['parallel_replicas_for_cluster_engines'] AS parallel_replicas_for_cluster_engines FROM system.query_log WHERE query_id = '{query_id}' LIMIT 1;") == '1\n'

for replica in [node1, node2]:
cluster_secondary_queries = (
replica.query(
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE
type = 'QueryStart' AND
positionCaseInsensitive(query, 's3Cluster') != 0 AND
position(query, 'system.query_log') = 0 AND
NOT is_initial_query AND
initial_query_id = '{query_id}'
"""
)
.strip()
.split("\n")
)
assert len(cluster_secondary_queries) == 1

assert node2.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`", settings={"parallel_replicas_for_cluster_engines":1, 'enable_parallel_replicas': 2, 'cluster_for_parallel_replicas': 'cluster_simple', 'parallel_replicas_for_cluster_engines' : 1}) == 'pablo\n'
Loading