Skip to content

Commit 7ad7a4c

Browse files
authored
Merge branch 'main' into mday-io/feature/janitor-environment-flag
2 parents e3ca817 + c5d008b commit 7ad7a4c

16 files changed

Lines changed: 556 additions & 39 deletions

File tree

docs/integrations/engines/duckdb.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ SQLMesh will place models with the explicit catalog "ephemeral", such as `epheme
7979
type: ducklake
8080
path: 'catalog.ducklake'
8181
data_path: data/ducklake
82+
override_data_path: true
8283
encrypted: True
8384
data_inlining_row_limit: 10
8485
metadata_schema: main
@@ -105,6 +106,7 @@ SQLMesh will place models with the explicit catalog "ephemeral", such as `epheme
105106
type="ducklake",
106107
path="catalog.ducklake",
107108
data_path="data/ducklake",
109+
override_data_path=False,
108110
encrypted=True,
109111
data_inlining_row_limit=10,
110112
metadata_schema="main",
@@ -120,6 +122,7 @@ SQLMesh will place models with the explicit catalog "ephemeral", such as `epheme
120122

121123
- `path`: Path to the DuckLake catalog file
122124
- `data_path`: Path where DuckLake data files are stored
125+
- `override_data_path`: Whether data_override_path option is set
123126
- `encrypted`: Whether to enable encryption for the catalog (default: `False`)
124127
- `data_inlining_row_limit`: Maximum number of rows to inline in the catalog (default: `0`)
125128
- `metadata_schema`: The schema in the catalog server in which to store the DuckLake metadata tables (default: `main`)
@@ -364,6 +367,7 @@ The `filesystems` accepts a list of file systems to register in the DuckDB conne
364367
type: ducklake
365368
path: myducklakecatalog.duckdb
366369
data_path: abfs://MyFabricWorkspace/MyFabricLakehouse.Lakehouse/Files/DuckLake.Files
370+
override_data_path: False
367371
extensions:
368372
- ducklake
369373
filesystems:

sqlmesh/core/config/connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ class DuckDBAttachOptions(BaseConfig):
238238

239239
# DuckLake specific options
240240
data_path: t.Optional[str] = None
241+
override_data_path: t.Optional[bool] = False
241242
encrypted: bool = False
242243
data_inlining_row_limit: t.Optional[int] = None
243244
metadata_schema: t.Optional[str] = None
@@ -258,6 +259,8 @@ def to_sql(self, alias: str) -> str:
258259
path = f"ducklake:{path}"
259260
if self.data_path is not None:
260261
options.append(f"DATA_PATH '{self.data_path}'")
262+
if self.override_data_path:
263+
options.append("OVERRIDE_DATA_PATH true")
261264
if self.encrypted:
262265
options.append("ENCRYPTED")
263266
if self.data_inlining_row_limit is not None:
@@ -2097,6 +2100,7 @@ class ClickhouseConnectionConfig(ConnectionConfig):
20972100
https_proxy: t.Optional[str] = None
20982101
server_host_name: t.Optional[str] = None
20992102
tls_mode: t.Optional[str] = None
2103+
secure: bool = False
21002104

21012105
concurrent_tasks: int = 1
21022106
register_comments: bool = True
@@ -2133,6 +2137,7 @@ def _connection_kwargs_keys(self) -> t.Set[str]:
21332137
"https_proxy",
21342138
"server_host_name",
21352139
"tls_mode",
2140+
"secure",
21362141
}
21372142
return kwargs
21382143

sqlmesh/core/dialect.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,8 @@ def format_model_expressions(
774774
if rewrite_casts:
775775

776776
def cast_to_colon(node: exp.Expr) -> exp.Expr:
777-
if isinstance(node, exp.Cast) and not any(
777+
# Directly check type instead of isinstance to avoid rewriting subclasses of CAST, e.g. JSONCast
778+
if type(node) is exp.Cast and not any(
778779
# Only convert CAST into :: if it doesn't have additional args set, otherwise this
779780
# conversion could alter the semantics (eg. changing SAFE_CAST in BigQuery to CAST)
780781
arg

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -716,8 +716,12 @@ def use_server_nulls_for_unmatched_after_join(
716716
return query
717717

718718
def _build_settings_property(
719-
self, key: str, value: exp.Expr | str | int | float
719+
self, settings: t.Mapping[str, exp.Expr | str | int | float]
720720
) -> exp.SettingsProperty:
721+
# ClickHouse requires every key=value pair to live under a single
722+
# SETTINGS clause (`SETTINGS a = 1, b = 2`). Emitting one
723+
# SettingsProperty per pair produces repeated SETTINGS keywords and a
724+
# syntax error at execution time.
721725
return exp.SettingsProperty(
722726
expressions=[
723727
exp.EQ(
@@ -726,6 +730,7 @@ def _build_settings_property(
726730
if isinstance(value, exp.Expr)
727731
else exp.Literal(this=value, is_string=isinstance(value, str)),
728732
)
733+
for key, value in settings.items()
729734
]
730735
)
731736

@@ -827,9 +832,7 @@ def _build_table_properties_exp(
827832
properties.append(exp.EmptyProperty())
828833

829834
if table_properties_copy:
830-
properties.extend(
831-
[self._build_settings_property(k, v) for k, v in table_properties_copy.items()]
832-
)
835+
properties.append(self._build_settings_property(table_properties_copy))
833836

834837
if table_description:
835838
properties.append(
@@ -858,9 +861,7 @@ def _build_view_properties_exp(
858861
properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
859862

860863
if view_properties_copy:
861-
properties.extend(
862-
[self._build_settings_property(k, v) for k, v in view_properties_copy.items()]
863-
)
864+
properties.append(self._build_settings_property(view_properties_copy))
864865

865866
if table_description:
866867
properties.append(

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,3 +411,27 @@ def _build_column_defs(
411411
return super()._build_column_defs(
412412
target_columns_to_types, column_descriptions, is_view, materialized
413413
)
414+
415+
def columns(
416+
self, table_name: TableName, include_pseudo_columns: bool = False
417+
) -> t.Dict[str, exp.DataType]:
418+
table = exp.to_table(table_name)
419+
420+
column_catalog = table.catalog or self.get_current_catalog()
421+
query = (
422+
exp.select("columns.column_name", "columns.full_data_type")
423+
.from_("system.information_schema.columns")
424+
.where(
425+
exp.and_(
426+
exp.column("table_name").eq(table.name),
427+
exp.column("table_schema").eq(table.db),
428+
exp.column("table_catalog").eq(column_catalog),
429+
)
430+
)
431+
.order_by("ordinal_position ASC")
432+
)
433+
434+
self.cursor.execute(query)
435+
result = self.cursor.fetchall()
436+
437+
return {row[0]: exp.DataType.build(row[1], dialect=self.dialect) for row in result}

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import typing as t
55

66
from sqlglot import exp
7+
from sqlglot.helper import ensure_list
78

89
from sqlmesh.core.dialect import to_schema
910
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
@@ -30,6 +31,7 @@
3031

3132
from sqlmesh.core._typing import SchemaName, TableName
3233
from sqlmesh.core.engine_adapter.base import QueryOrDF, Query
34+
from sqlmesh.core.node import IntervalUnit
3335

3436
logger = logging.getLogger(__name__)
3537

@@ -249,6 +251,63 @@ def create_view(
249251
**create_kwargs,
250252
)
251253

254+
def _build_table_properties_exp(
255+
self,
256+
catalog_name: t.Optional[str] = None,
257+
table_format: t.Optional[str] = None,
258+
storage_format: t.Optional[str] = None,
259+
partitioned_by: t.Optional[t.List[exp.Expr]] = None,
260+
partition_interval_unit: t.Optional[IntervalUnit] = None,
261+
clustered_by: t.Optional[t.List[exp.Expr]] = None,
262+
table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
263+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
264+
table_description: t.Optional[str] = None,
265+
table_kind: t.Optional[str] = None,
266+
**kwargs: t.Any,
267+
) -> t.Optional[exp.Properties]:
268+
properties: t.List[exp.Expr] = []
269+
270+
if table_description:
271+
properties.append(
272+
exp.SchemaCommentProperty(
273+
this=exp.Literal.string(self._truncate_table_comment(table_description))
274+
)
275+
)
276+
277+
def _to_identifier_if_string(expression: exp.Expr) -> exp.Expr:
278+
if isinstance(expression, exp.Literal) and expression.is_string:
279+
return exp.to_identifier(expression.this)
280+
return expression.copy()
281+
282+
if table_properties:
283+
table_properties = {k.upper(): v for k, v in table_properties.items()}
284+
285+
table_type = self._pop_creatable_type_from_properties(table_properties)
286+
properties.extend(ensure_list(table_type))
287+
288+
diststyle = table_properties.get("DISTSTYLE")
289+
if diststyle:
290+
properties.append(exp.DistStyleProperty(this=exp.var(diststyle.name.upper())))
291+
292+
distkey = table_properties.get("DISTKEY")
293+
if distkey:
294+
properties.append(exp.DistKeyProperty(this=_to_identifier_if_string(distkey)))
295+
296+
sortkey = table_properties.get("SORTKEY")
297+
if sortkey:
298+
sortkey_expressions = sortkey.expressions if sortkey.expressions else [sortkey]
299+
properties.append(
300+
exp.SortKeyProperty(
301+
this=[
302+
_to_identifier_if_string(expression)
303+
for expression in sortkey_expressions
304+
],
305+
compound=False,
306+
)
307+
)
308+
309+
return exp.Properties(expressions=properties) if properties else None
310+
252311
def replace_query(
253312
self,
254313
table_name: TableName,

tests/core/engine_adapter/test_clickhouse.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,13 +316,32 @@ def build_properties_sql(storage_format="", order_by="", primary_key="", propert
316316
== "ENGINE=MergeTree ORDER BY (a, b + 1) PRIMARY KEY (a, b)"
317317
)
318318

319+
# Multiple physical_properties must be combined into a single comma-separated
320+
# SETTINGS clause. ClickHouse rejects repeated SETTINGS keywords with a syntax
321+
# error (see https://github.com/SQLMesh/sqlmesh/issues/5803).
319322
assert (
320323
build_properties_sql(
321324
order_by="ORDER_BY = (a, b + 1),",
322325
primary_key="PRIMARY_KEY = (a, b),",
323326
properties="PROP1 = 1, PROP2 = '2'",
324327
)
325-
== "ENGINE=MergeTree ORDER BY (a, b + 1) PRIMARY KEY (a, b) SETTINGS prop1 = 1 SETTINGS prop2 = '2'"
328+
== "ENGINE=MergeTree ORDER BY (a, b + 1) PRIMARY KEY (a, b) SETTINGS prop1 = 1, prop2 = '2'"
329+
)
330+
331+
# Regression test for #5803: three or more SETTINGS entries also combine.
332+
assert (
333+
build_properties_sql(
334+
order_by="ORDER_BY = (orders_id),",
335+
properties=(
336+
"min_age_to_force_merge_seconds = 3600, "
337+
"min_age_to_force_merge_on_partition_only = 1, "
338+
"index_granularity = 8192"
339+
),
340+
)
341+
== "ENGINE=MergeTree ORDER BY (orders_id) "
342+
"SETTINGS min_age_to_force_merge_seconds = 3600, "
343+
"min_age_to_force_merge_on_partition_only = 1, "
344+
"index_granularity = 8192"
326345
)
327346

328347
assert (
@@ -345,6 +364,20 @@ def build_properties_sql(storage_format="", order_by="", primary_key="", propert
345364
)
346365

347366

367+
def test_view_properties_combine_settings(adapter: ClickhouseEngineAdapter):
368+
# View properties hit the same SettingsProperty code path as table
369+
# properties (#5803): multiple entries must collapse into one SETTINGS
370+
# clause rather than emit repeated SETTINGS keywords.
371+
view_properties_exp = adapter._build_view_properties_exp(
372+
view_properties={
373+
"prop1": exp.Literal.number(1),
374+
"prop2": exp.Literal.string("2"),
375+
}
376+
)
377+
assert view_properties_exp is not None
378+
assert view_properties_exp.sql("clickhouse") == "SETTINGS prop1 = 1, prop2 = '2'"
379+
380+
348381
def test_partitioned_by_expr(make_mocked_engine_adapter: t.Callable):
349382
# user doesn't specify, unknown time column type
350383
model = load_sql_based_model(

tests/core/engine_adapter/test_databricks.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,3 +526,61 @@ def test_drop_data_object_materialized_view_calls_correct_drop(mocker: MockFixtu
526526
drop_view_mock.assert_called_once_with(
527527
mv_data_object.to_table(), ignore_if_not_exists=True, materialized=True
528528
)
529+
530+
531+
def test_columns(mocker: MockFixture, make_mocked_engine_adapter: t.Callable):
532+
adapter = make_mocked_engine_adapter(DatabricksEngineAdapter, default_catalog="test_catalog")
533+
534+
# Override/mock get_current_catalog to return default
535+
current_catalog_mock = mocker.patch.object(
536+
adapter, "get_current_catalog", return_value="test_catalog"
537+
)
538+
# create long struct columns datatype
539+
long_struct_cols = [f"a_{i}:int" for i in range(50)]
540+
adapter.cursor.fetchall.return_value = [
541+
("bigint_col", "bigint"),
542+
("binary_col", "binary"),
543+
("boolean_col", "boolean"),
544+
("date_col", "date"),
545+
("decimal_col", "decimal(38,4)"),
546+
("double_col", "double"),
547+
("float_col", "float"),
548+
("int_col", "int"),
549+
("small_int", "smallint"),
550+
("string_col", "string"),
551+
("timestamp_col", "timestamp"),
552+
("timestamp_ntz_col", "timestamp_ntz"),
553+
("tinyint_col", "tinyint"),
554+
("array_col", "array<int>"),
555+
("simple_struct_col", "struct<a:int,b:string>"),
556+
("long_struct_col", f"struct<{','.join(long_struct_cols)}>"),
557+
]
558+
559+
resp = adapter.columns("test_db.test_table")
560+
assert resp == {
561+
"bigint_col": exp.DataType.build("bigint", dialect=adapter.dialect),
562+
"binary_col": exp.DataType.build("binary", dialect=adapter.dialect),
563+
"boolean_col": exp.DataType.build("boolean", dialect=adapter.dialect),
564+
"date_col": exp.DataType.build("date", dialect=adapter.dialect),
565+
"decimal_col": exp.DataType.build("decimal(38,4)", dialect=adapter.dialect),
566+
"double_col": exp.DataType.build("double", dialect=adapter.dialect),
567+
"float_col": exp.DataType.build("float", dialect=adapter.dialect),
568+
"int_col": exp.DataType.build("int", dialect=adapter.dialect),
569+
"small_int": exp.DataType.build("smallint", dialect=adapter.dialect),
570+
"string_col": exp.DataType.build("string", dialect=adapter.dialect),
571+
"timestamp_col": exp.DataType.build("timestamp", dialect=adapter.dialect),
572+
"timestamp_ntz_col": exp.DataType.build("timestamp_ntz", dialect=adapter.dialect),
573+
"tinyint_col": exp.DataType.build("tinyint", dialect=adapter.dialect),
574+
"array_col": exp.DataType.build("array<int>", dialect=adapter.dialect),
575+
"simple_struct_col": exp.DataType.build("struct<a:int,b:string>", dialect=adapter.dialect),
576+
"long_struct_col": exp.DataType.build(
577+
f"struct<{','.join(long_struct_cols)}>", dialect=adapter.dialect
578+
),
579+
}
580+
581+
adapter.cursor.execute.assert_called_once_with(
582+
parse_one(
583+
"""SELECT columns.column_name, columns.full_data_type FROM system.information_schema.columns WHERE table_name = 'test_table' AND table_schema = 'test_db' AND table_catalog = 'test_catalog' ORDER BY ordinal_position ASC""",
584+
dialect="databricks",
585+
)
586+
)

0 commit comments

Comments
 (0)