Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Backups/BackupEntriesCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ void BackupEntriesCollector::gatherDatabasesMetadata()

case ASTBackupQuery::ElementType::ALL:
{
for (const auto & [database_name, database] : DatabaseCatalog::instance().getDatabases())
for (const auto & [database_name, database] : DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = true}))
{
if (!element.except_databases.contains(database_name))
{
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6472,7 +6472,7 @@ Query Iceberg table using the snapshot that was current at a specific timestamp.
DECLARE(Int64, iceberg_snapshot_id, 0, R"(
Query Iceberg table using the specific snapshot id.
)", 0) \
DECLARE(Bool, show_data_lake_catalogs_in_system_tables, true, R"(
DECLARE(Bool, show_data_lake_catalogs_in_system_tables, false, R"(
Enables showing data lake catalogs in system tables.
)", 0) \
DECLARE(Bool, delta_lake_enable_expression_visitor_logging, false, R"(
Expand Down
5 changes: 5 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "25.8.12.10000",
{
{"show_data_lake_catalogs_in_system_tables", false, true, "Disable catalogs in system tables by default"},

});
addSettingsChanges(settings_changes_history, "25.8",
{
{"output_format_json_quote_64bit_integers", true, false, "Disable quoting of the 64 bit integers in JSON by default"},
Expand Down
6 changes: 1 addition & 5 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,14 +532,10 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator(
DatabaseTablesIteratorPtr DatabaseDataLake::getLightweightTablesIterator(
ContextPtr context_,
const FilterByNameFunction & filter_by_table_name,
bool skip_not_loaded,
bool skip_data_lake_catalog) const
bool skip_not_loaded) const
{
Tables tables;

if (skip_data_lake_catalog)
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());

auto catalog = getCatalog();
DB::Names iceberg_tables;

Expand Down
4 changes: 2 additions & 2 deletions src/Databases/DataLake/DatabaseDataLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
bool canContainDistributedTables() const override { return false; }
bool canContainRocksDBTables() const override { return false; }
bool shouldBeEmptyOnDetach() const override { return false; }
bool isDatalakeCatalog() const override { return true; }

bool empty() const override;

Expand All @@ -47,8 +48,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
DatabaseTablesIteratorPtr getLightweightTablesIterator(
ContextPtr context,
const FilterByNameFunction & filter_by_table_name,
bool skip_not_loaded,
bool skip_data_lake_catalog) const override;
bool skip_not_loaded) const override;


void shutdown() override {}
Expand Down
4 changes: 3 additions & 1 deletion src/Databases/IDatabase.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>

virtual bool canContainRocksDBTables() const { return true; }

virtual bool isDatalakeCatalog() const { return false; }

/// Load a set of existing tables.
/// You can call only once, right after the object is created.
virtual void loadStoredObjects( /// NOLINT
Expand Down Expand Up @@ -267,7 +269,7 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>

/// Same as above, but may return non-fully initialized StoragePtr objects which are not suitable for reading.
/// Useful for queries like "SHOW TABLES"
virtual DatabaseTablesIteratorPtr getLightweightTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false, [[maybe_unused]] bool skip_data_lake_catalog = false) const /// NOLINT
virtual DatabaseTablesIteratorPtr getLightweightTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const /// NOLINT
{
return getTablesIterator(context, filter_by_table_name, skip_not_loaded);
}
Expand Down
45 changes: 37 additions & 8 deletions src/Interpreters/DatabaseCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class DatabaseNameHints : public IHints<>
const bool need_to_check_access_for_databases = !access->isGranted(AccessType::SHOW_DATABASES);

Names result;
auto databases_list = database_catalog.getDatabases();
auto databases_list = database_catalog.getDatabases(GetDatabasesOptions{.with_datalake_catalogs = true});
for (const auto & database_name : databases_list | boost::adaptors::map_keys)
{
if (need_to_check_access_for_databases && !access->isGranted(AccessType::SHOW_DATABASES, database_name))
Expand Down Expand Up @@ -341,7 +341,10 @@ void DatabaseCatalog::shutdownImpl(std::function<void()> shutdown_system_logs)
auto it = std::find_if(elem.map.begin(), elem.map.end(), not_empty_mapping);
return it != elem.map.end();
}) == uuid_map.end());

databases.clear();
databases_without_datalake_catalogs.clear();

referential_dependencies.clear();
loading_dependencies.clear();
view_dependencies.clear();
Expand Down Expand Up @@ -576,6 +579,18 @@ void DatabaseCatalog::assertDatabaseExists(const String & database_name) const
}
}

bool DatabaseCatalog::hasDatalakeCatalogs() const
{
std::lock_guard lock{databases_mutex};
return databases.size() != databases_without_datalake_catalogs.size();
}

bool DatabaseCatalog::isDatalakeCatalog(const String & database_name) const
{
std::lock_guard lock{databases_mutex};
return databases.contains(database_name) && !databases_without_datalake_catalogs.contains(database_name);
}

void DatabaseCatalog::assertDatabaseDoesntExist(const String & database_name) const
{
std::lock_guard lock{databases_mutex};
Expand All @@ -594,6 +609,9 @@ void DatabaseCatalog::attachDatabase(const String & database_name, const Databas
std::lock_guard lock{databases_mutex};
assertDatabaseDoesntExistUnlocked(database_name);
databases.emplace(database_name, database);
if (!database->isDatalakeCatalog())
databases_without_datalake_catalogs.emplace(database_name, database);

NOEXCEPT_SCOPE({
UUID db_uuid = database->getUUID();
if (db_uuid != UUIDHelpers::Nil)
Expand All @@ -618,8 +636,9 @@ DatabasePtr DatabaseCatalog::detachDatabase(ContextPtr local_context, const Stri
if (db_uuid != UUIDHelpers::Nil)
removeUUIDMapping(db_uuid);
databases.erase(database_name);

}
if (auto it = databases_without_datalake_catalogs.find(database_name); it != databases_without_datalake_catalogs.end())
databases_without_datalake_catalogs.erase(it);
}
if (!db)
{
Expand Down Expand Up @@ -685,7 +704,13 @@ void DatabaseCatalog::updateDatabaseName(const String & old_name, const String &
databases.erase(it);
databases.emplace(new_name, db);

/// Update dependencies.
auto no_catalogs_it = databases_without_datalake_catalogs.find(old_name);
if (no_catalogs_it != databases_without_datalake_catalogs.end())
{
databases_without_datalake_catalogs.erase(no_catalogs_it);
databases_without_datalake_catalogs.emplace(new_name, db);
}

for (const auto & table_name : tables_in_database)
{
auto removed_ref_deps = referential_dependencies.removeDependencies(StorageID{old_name, table_name}, /* remove_isolated_tables= */ true);
Expand Down Expand Up @@ -806,10 +831,13 @@ bool DatabaseCatalog::isDatabaseExist(const String & database_name) const
return databases.end() != databases.find(database_name);
}

Databases DatabaseCatalog::getDatabases() const
Databases DatabaseCatalog::getDatabases(GetDatabasesOptions options) const
{
std::lock_guard lock{databases_mutex};
return databases;
if (options.with_datalake_catalogs)
return databases;

return databases_without_datalake_catalogs;
}

bool DatabaseCatalog::isTableExist(const DB::StorageID & table_id, ContextPtr context_) const
Expand Down Expand Up @@ -1075,7 +1103,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
std::map<String, std::pair<StorageID, DiskPtr>> dropped_metadata;
String path = fs::path("metadata_dropped") / "";

auto db_map = getDatabases();
auto db_map = getDatabases(GetDatabasesOptions{.with_datalake_catalogs = true});
std::set<DiskPtr> metadata_disk_list;
for (const auto & [_, db] : db_map)
{
Expand Down Expand Up @@ -1965,7 +1993,7 @@ void DatabaseCatalog::reloadDisksTask()
disks.swap(disks_to_reload);
}

for (auto & database : getDatabases())
for (auto & database : getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
{
// WARNING: In case of `async_load_databases = true` getTablesIterator() call wait for all table in the database to be loaded.
// WARNING: It means that no database will be able to update configuration until all databases are fully loaded.
Expand Down Expand Up @@ -2120,7 +2148,8 @@ std::pair<String, String> TableNameHints::getExtendedHintForTable(const String &
{
/// load all available databases from the DatabaseCatalog instance
auto & database_catalog = DatabaseCatalog::instance();
auto all_databases = database_catalog.getDatabases();
/// NOTE Skip datalake catalogs to avoid unnecessary access to remote catalogs (can be expensive)
auto all_databases = database_catalog.getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false});

for (const auto & [db_name, db] : all_databases)
{
Expand Down
16 changes: 15 additions & 1 deletion src/Interpreters/DatabaseCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ using TemporaryTablesMapping = std::map<String, TemporaryTableHolderPtr>;

class BackgroundSchedulePoolTaskHolder;

struct GetDatabasesOptions
{
bool with_datalake_catalogs{false};
};

/// For some reason Context is required to get Storage from Database object
class DatabaseCatalog : boost::noncopyable, WithMutableContext
{
Expand Down Expand Up @@ -171,7 +176,13 @@ class DatabaseCatalog : boost::noncopyable, WithMutableContext
DatabasePtr getDatabase(const UUID & uuid) const;
DatabasePtr tryGetDatabase(const UUID & uuid) const;
bool isDatabaseExist(const String & database_name) const;
Databases getDatabases() const;
/// Datalake catalogs are implement at IDatabase level in ClickHouse.
/// In general case Datalake catalog is a some remote service which contains iceberg/delta tables.
/// Sometimes this service charges money for requests. With this flag we explicitly protect ourself
/// to not accidentally query external non-free service for some trivial things like
/// autocompletion hints or system.tables query. We have a setting which allow to show
/// these databases everywhere, but user must explicitly specify it.
Databases getDatabases(GetDatabasesOptions options) const;

/// Same as getDatabase(const String & database_name), but if database_name is empty, current database of local_context is used
DatabasePtr getDatabase(const String & database_name, ContextPtr local_context) const;
Expand Down Expand Up @@ -272,6 +283,8 @@ class DatabaseCatalog : boost::noncopyable, WithMutableContext
bool canPerformReplicatedDDLQueries() const;

void updateMetadataFile(const DatabasePtr & database);
bool hasDatalakeCatalogs() const;
bool isDatalakeCatalog(const String & database_name) const;

private:
// The global instance of database catalog. unique_ptr is to allow
Expand Down Expand Up @@ -319,6 +332,7 @@ class DatabaseCatalog : boost::noncopyable, WithMutableContext
mutable std::mutex databases_mutex;

Databases databases TSA_GUARDED_BY(databases_mutex);
Databases databases_without_datalake_catalogs TSA_GUARDED_BY(databases_mutex);
UUIDToStorageMap uuid_map;

/// Referential dependencies between tables: table "A" depends on table "B"
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterCheckQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, ContextP
static Strings getAllDatabases(const ContextPtr & context)
{
Strings res;
const auto & databases = DatabaseCatalog::instance().getDatabases();
const auto & databases = DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false});
res.reserve(databases.size());
for (const auto & [database_name, _] : databases)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
auto db_num_limit = getContext()->getGlobalContext()->getServerSettings()[ServerSetting::max_database_num_to_throw].value;
if (db_num_limit > 0 && !internal)
{
size_t db_count = DatabaseCatalog::instance().getDatabases().size();
size_t db_count = DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = true}).size();
std::initializer_list<std::string_view> system_databases =
{
DatabaseCatalog::TEMPORARY_DATABASE,
Expand Down
15 changes: 14 additions & 1 deletion src/Interpreters/InterpreterShowTablesQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <Storages/ColumnsDescription.h>
#include <Common/Macros.h>
#include <Common/typeid_cast.h>
#include <Core/Settings.h>


namespace DB
Expand Down Expand Up @@ -229,8 +230,20 @@ BlockIO InterpreterShowTablesQuery::execute()

return res;
}
auto rewritten_query = getRewrittenQuery();
String database = getContext()->resolveDatabase(query.getFrom());
if (DatabaseCatalog::instance().isDatalakeCatalog(database))
{
auto context_copy = Context::createCopy(getContext());
/// HACK To always show them in explicit "SHOW TABLES" queries
context_copy->setSetting("show_data_lake_catalogs_in_system_tables", true);
return executeQuery(rewritten_query, context_copy, QueryFlags{ .internal = true }).second;
}
else
{
return executeQuery(rewritten_query, getContext(), QueryFlags{ .internal = true }).second;
}

return executeQuery(getRewrittenQuery(), getContext(), QueryFlags{ .internal = true }).second;
}

/// (*) Sorting is strictly speaking not necessary but 1. it is convenient for users, 2. SQL currently does not allow to
Expand Down
14 changes: 7 additions & 7 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type,
}
else
{
for (auto & elem : DatabaseCatalog::instance().getDatabases())
for (auto & elem : DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
{
startStopActionInDatabase(action_type, start, elem.first, elem.second, getContext(), log);
}
Expand Down Expand Up @@ -1075,7 +1075,7 @@ void InterpreterSystemQuery::restartReplicas(ContextMutablePtr system_context)
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_RESTART_REPLICA);
bool show_tables_is_granted_globally = access->isGranted(AccessType::SHOW_TABLES);

for (auto & elem : catalog.getDatabases())
for (auto & elem : catalog.getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
{
if (!elem.second->canContainMergeTreeTables())
continue;
Expand Down Expand Up @@ -1137,7 +1137,7 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
}
else if (query.is_drop_whole_replica)
{
auto databases = DatabaseCatalog::instance().getDatabases();
auto databases = DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false});
auto access = getContext()->getAccess();
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_DROP_REPLICA);

Expand Down Expand Up @@ -1178,7 +1178,7 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
String remote_replica_path = fs::path(query.replica_zk_path) / "replicas" / query.replica;

/// This check is actually redundant, but it may prevent from some user mistakes
for (auto & elem : DatabaseCatalog::instance().getDatabases())
for (auto & elem : DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
{
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
Expand Down Expand Up @@ -1272,7 +1272,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
}
else if (query.is_drop_whole_replica)
{
auto databases = DatabaseCatalog::instance().getDatabases();
auto databases = DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false});
auto access = getContext()->getAccess();
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_DROP_REPLICA);

Expand All @@ -1298,7 +1298,7 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA);

/// This check is actually redundant, but it may prevent from some user mistakes
for (auto & elem : DatabaseCatalog::instance().getDatabases())
for (auto & elem : DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get()))
check_not_local_replica(replicated, query);

Expand Down Expand Up @@ -1416,7 +1416,7 @@ void InterpreterSystemQuery::loadOrUnloadPrimaryKeysImpl(bool load)
getContext()->checkAccess(load ? AccessType::SYSTEM_LOAD_PRIMARY_KEY : AccessType::SYSTEM_UNLOAD_PRIMARY_KEY);
LOG_TRACE(log, "{} primary keys for all tables", load ? "Loading" : "Unloading");

for (auto & database : DatabaseCatalog::instance().getDatabases())
for (auto & database : DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
{
for (auto it = database.second->getTablesIterator(getContext()); it->isValid(); it->next())
{
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/ServerAsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint curr
}

{
auto databases = DatabaseCatalog::instance().getDatabases();
auto databases = DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false});

size_t max_queue_size = 0;
size_t max_inserts_in_queue = 0;
Expand Down Expand Up @@ -371,7 +371,7 @@ void ServerAsynchronousMetrics::updateMutationAndDetachedPartsStats()
DetachedPartsStats current_values{};
MutationStats current_mutation_stats{};

for (const auto & db : DatabaseCatalog::instance().getDatabases())
for (const auto & db : DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
{
if (!db.second->canContainMergeTreeTables())
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/loadMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu
// Wait for all table to be loaded and started
waitLoad(TablesLoaderForegroundPoolId, load_metadata);

for (const auto & [name, _] : DatabaseCatalog::instance().getDatabases())
for (const auto & [name, _] : DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
if (name != DatabaseCatalog::SYSTEM_DATABASE)
maybeConvertOrdinaryDatabaseToAtomic(context, name);

Expand Down
2 changes: 1 addition & 1 deletion src/Server/ReplicasStatusHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServe
bool ok = true;
WriteBufferFromOwnString message;

auto databases = DatabaseCatalog::instance().getDatabases();
auto databases = DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false});

/// Iterate through all the replicated tables.
for (const auto & db : databases)
Expand Down
Loading
Loading