Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions docs/en/engines/table-engines/special/hybrid.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
---
description: 'Hybrid unions multiple data sources behind per-segment predicates so queries behave like a single table while data is migrated or tiered.'
slug: /engines/table-engines/special/hybrid
title: 'Hybrid Table Engine'
sidebar_label: 'Hybrid'
sidebar_position: 11
---

# Hybrid table engine

`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate.
The engine rewrites incoming queries so that each segment receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats.

It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source.

Typical use cases include:

- Zero-downtime migrations where "old" and "new" replicas temporarily overlap.
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
- Gradual roll-outs where only a subset of rows should be served from a new backend.

By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.

## Enable the engine

The Hybrid engine is experimental. Enable it per session (or in the user profile) before creating tables:

```sql
SET allow_experimental_hybrid_table = 1;
```

### Automatic Type Alignment

Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` **(enabled by default and requires `allow_experimental_analyzer = 1`)**, the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. You can opt out by setting the flag to `0` if it causes issues.

Segment schemas are cached when you create or attach a Hybrid table. If you alter a segment later (for example change a column type), refresh the Hybrid table (detach/attach or recreate it) so the cached headers stay in sync with the new schema; otherwise the auto-cast feature may miss the change and queries can still fail with header/type errors.

## Engine definition

```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
column1 type1,
column2 type2,
...
)
ENGINE = Hybrid(table_function_1, predicate_1 [, table_function_2, predicate_2 ...])
```

You must pass at least two arguments – the first table function and its predicate. Additional sources are appended as `table_function, predicate` pairs. The first table function is also used for `INSERT` statements.

### Arguments and behaviour

- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage.
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the segment's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
- The query planner picks the same processing stage for every segment as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.

## Example: local cluster plus S3 historical tier

The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.

```sql
-- Local MergeTree table that keeps current data
CREATE OR REPLACE TABLE btc_blocks_local
(
`hash` FixedString(64),
`version` Int64,
`mediantime` DateTime64(9),
`nonce` Int64,
`bits` FixedString(8),
`difficulty` Float64,
`chainwork` FixedString(64),
`size` Int64,
`weight` Int64,
`coinbase_param` String,
`number` Int64,
`transaction_count` Int64,
`merkle_root` FixedString(64),
`stripped_size` Int64,
`timestamp` DateTime64(9),
`date` Date
)
ENGINE = MergeTree
ORDER BY (timestamp)
PARTITION BY toYYYYMM(date);

-- Hybrid table that unions the local shard with historical data in S3
CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid(
remote('localhost:9000', currentDatabase(), 'btc_blocks_local'), date >= '2025-09-01',
s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01'
) AS btc_blocks_local;

-- Writes target the first (remote) segment
INSERT INTO btc_blocks
SELECT *
FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN)
WHERE date BETWEEN '2025-09-01' AND '2025-09-30';

-- Reads seamlessly combine both predicates
SELECT * FROM btc_blocks WHERE date = '2025-08-01'; -- data from s3
SELECT * FROM btc_blocks WHERE date = '2025-09-05'; -- data from MergeTree (TODO: still analyzes s3)
SELECT * FROM btc_blocks WHERE date IN ('2025-08-31','2025-09-01') -- data from both sources, single copy always


-- Run analytic queries as usual
SELECT
date,
count(),
uniqExact(CAST(hash, 'Nullable(String)')) AS hashes,
sum(CAST(number, 'Nullable(Int64)')) AS blocks_seen
FROM btc_blocks
WHERE date BETWEEN '2025-08-01' AND '2025-09-30'
GROUP BY date
ORDER BY date;
```

Because the predicates are applied inside every segment, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
150 changes: 150 additions & 0 deletions src/Analyzer/Passes/HybridCastsPass.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include <Analyzer/Passes/HybridCastsPass.h>

#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryTreePassManager.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Resolve/IdentifierResolver.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/UnionNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>

#include <Storages/IStorage.h>
#include <Storages/StorageDistributed.h>

#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
#include <Common/Exception.h>

namespace DB
{

namespace Setting
{
extern const SettingsBool hybrid_table_auto_cast_columns;
}

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

namespace
{

/// Collect Hybrid table expressions that require casts to normalize headers across segments.
///
/// Hybrid is currently exposed only as an engine (TableNode). If it ever gets a table function
/// wrapper, this visitor must also look at TableFunctionNode and unwrap to the underlying
/// StorageDistributed so cached casts can be picked up there as well.
class HybridCastTablesCollector : public InDepthQueryTreeVisitor<HybridCastTablesCollector>
{
public:
explicit HybridCastTablesCollector(std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_)
: cast_map(cast_map_)
{}

static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr &) { return true; }

void visitImpl(QueryTreeNodePtr & node)
{
const auto * table = node->as<TableNode>();
if (!table)
return;

const auto * storage = table->getStorage().get();
if (const auto * distributed = typeid_cast<const StorageDistributed *>(storage))
{
ColumnsDescription to_cast = distributed->getColumnsToCast();
if (!to_cast.empty())
cast_map.emplace(node.get(), std::move(to_cast)); // repeated table_expression can overwrite
}
}

private:
std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
};

// Visitor replaces all usages of the column with CAST(column, type) in the query tree.
class HybridCastVisitor : public InDepthQueryTreeVisitor<HybridCastVisitor>
{
public:
HybridCastVisitor(
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_,
ContextPtr context_)
: cast_map(cast_map_)
, context(std::move(context_))
{}

bool shouldTraverseTopToBottom() const { return false; }

static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child)
{
/// Traverse all child nodes so casts also apply inside subqueries and UNION branches.
(void)child;
return true;
}

void visitImpl(QueryTreeNodePtr & node)
{
auto * column_node = node->as<ColumnNode>();
if (!column_node)
return;

auto column_source = column_node->getColumnSourceOrNull();
if (!column_source)
return;

auto it = cast_map.find(column_source.get());
if (it == cast_map.end())
return;

const auto & column_name = column_node->getColumnName();
auto expected_column_opt = it->second.tryGetPhysical(column_name);
if (!expected_column_opt)
return;

auto column_clone = std::static_pointer_cast<ColumnNode>(column_node->clone());

auto cast_node = buildCastFunction(column_clone, expected_column_opt->type, context);
const auto & alias = node->getAlias();
if (!alias.empty())
cast_node->setAlias(alias);
else
cast_node->setAlias(expected_column_opt->name);

node = cast_node;
}

private:
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
ContextPtr context;
};


} // namespace

void HybridCastsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
const auto & settings = context->getSettingsRef();
if (!settings[Setting::hybrid_table_auto_cast_columns])
return;

auto * query = query_tree_node->as<QueryNode>();
if (!query)
return;

std::unordered_map<const IQueryTreeNode *, ColumnsDescription> cast_map;
HybridCastTablesCollector collector(cast_map);
collector.visit(query_tree_node);
if (cast_map.empty())
return;

HybridCastVisitor visitor(cast_map, context);
visitor.visit(query_tree_node);
}

}
32 changes: 32 additions & 0 deletions src/Analyzer/Passes/HybridCastsPass.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <Analyzer/IQueryTreePass.h>
#include <Interpreters/Context_fwd.h>

namespace DB
{

/// Adds CASTs for Hybrid segments when physical types differ from the Hybrid schema
///
/// It normalizes headers coming from different segments when table structure in some segments
/// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table,
/// but Int64 in an additional segment.
///
/// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible
/// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example:
/// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported"
/// (CANNOT_CONVERT_TYPE).
///
/// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines
/// from different segments would return different headers (with or without CAST), leading to errors
/// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]"
/// (THERE_IS_NO_COLUMN).
class HybridCastsPass : public IQueryTreePass
{
public:
String getName() override { return "HybridCastsPass"; }
String getDescription() override { return "Inject casts for Hybrid columns to match schema types"; }
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

}
3 changes: 3 additions & 0 deletions src/Analyzer/QueryTreePassManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/HybridCastsPass.h>
#include <Analyzer/Utils.h>

namespace DB
Expand Down Expand Up @@ -325,6 +326,8 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
manager.addPass(std::make_unique<InverseDictionaryLookupPass>());

manager.addPass(std::make_unique<DisableParallelReplicasPass>());

manager.addPass(std::make_unique<HybridCastsPass>());
}

}
11 changes: 11 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2280,6 +2280,11 @@ Show internal aliases (such as __table1) in EXPLAIN PLAN instead of those specif
\
DECLARE(UInt64, query_plan_max_step_description_length, 500, R"(
Maximum length of step description in EXPLAIN PLAN.
)", 0) \
\
DECLARE(Bool, enable_alias_marker, true, R"(
Enable __aliasMarker injection for ALIAS column expressions when using the analyzer.
This stabilizes action node names across planner/analyzer stages without changing query semantics.
)", 0) \
\
DECLARE(UInt64, preferred_block_size_bytes, 1000000, R"(
Expand Down Expand Up @@ -7338,6 +7343,12 @@ Allows creation of tables with the [TimeSeries](../../engines/table-engines/inte
- 0 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is disabled.
- 1 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is enabled.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_hybrid_table, false, R"(
Allows creation of tables with the [Hybrid](../../engines/table-engines/special/hybrid.md) table engine.
)", EXPERIMENTAL) \
DECLARE(Bool, hybrid_table_auto_cast_columns, true, R"(
Automatically cast columns to the schema defined in Hybrid tables when remote segments expose different physical types. Works only with analyzer. Enabled by default, does nothing if (experimental) Hybrid tables are disabled; disable it if it causes issues. Segment schemas are cached when the Hybrid table is created or attached; if a segment schema changes later, detach/attach or recreate the Hybrid table so the cached headers stay in sync.
)", 0) \
DECLARE(Bool, allow_experimental_codecs, false, R"(
If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).
)", EXPERIMENTAL) \
Expand Down
5 changes: 5 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "26.1",
{
// altinity: antalya-specific features
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"enable_alias_marker", true, true, "New setting."},
// altinity: antalya-specific features
{"parallel_replicas_filter_pushdown", false, false, "New setting"},
{"use_statistics", true, true, "Enable this optimization by default."},
{"ignore_on_cluster_for_replicated_database", false, false, "Add a new setting to ignore ON CLUSTER clause for DDL queries with a replicated database."},
Expand Down
1 change: 1 addition & 0 deletions src/Databases/enableAllExperimentalSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context)
context->setSetting("allow_experimental_ytsaurus_table_engine", 1);
context->setSetting("allow_experimental_ytsaurus_dictionary_source", 1);
context->setSetting("allow_experimental_time_series_aggregate_functions", 1);
context->setSetting("allow_experimental_hybrid_table", 1);
context->setSetting("allow_experimental_lightweight_update", 1);
context->setSetting("allow_experimental_insert_into_iceberg", 1);
context->setSetting("allow_experimental_iceberg_compaction", 1);
Expand Down
18 changes: 18 additions & 0 deletions src/Functions/identity.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Functions/identity.h>
#include <Functions/FunctionFactory.h>
#include <Common/FunctionDocumentation.h>

namespace DB
{
Expand Down Expand Up @@ -33,4 +34,21 @@ REGISTER_FUNCTION(ActionName)
factory.registerFunction<FunctionActionName>();
}

REGISTER_FUNCTION(AliasMarker)
{
factory.registerFunction<FunctionAliasMarker>(FunctionDocumentation{
.description = R"(
Internal function that marks ALIAS column expressions for the analyzer. Not intended for direct use.
)",
.syntax = {"__aliasMarker(expr, alias_name)"},
.arguments = {
{"expr", "Expression to mark.", {"Any"}},
{"alias_name", "Alias name attached to the expression.", {"String"}},
},
.returned_value = {"Returns expr unchanged.", {"Any"}},
.introduced_in = {25, 8},
.category = FunctionDocumentation::Category::Other,
});
}

}
Loading
Loading