Skip to content

Commit c3a7bff

Browse files
authored
Merge branch 'main' into fix/local-only-format
2 parents 4ac598f + 87e179b commit c3a7bff

14 files changed

Lines changed: 527 additions & 61 deletions

File tree

docs/reference/model_configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ The SQLMesh project-level `model_defaults` key supports the following options, d
178178
- kind
179179
- dialect
180180
- cron
181+
- cron_tz
181182
- owner
182183
- start
183184
- table_format

sqlmesh/core/config/model.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
OnAdditiveChange,
1515
)
1616
from sqlmesh.core.model.meta import FunctionCall
17-
from sqlmesh.core.node import IntervalUnit
17+
from sqlmesh.core.node import IntervalUnit, cron_tz_validator
1818
from sqlmesh.utils.date import TimeLike
1919
from sqlmesh.utils.pydantic import field_validator
2020

@@ -27,6 +27,7 @@ class ModelDefaultsConfig(BaseConfig):
2727
dialect: The SQL dialect that the model's query is written in.
2828
cron: A cron string specifying how often the model should be refreshed, leveraging the
2929
[croniter](https://github.com/kiorky/croniter) library.
30+
cron_tz: The timezone for the cron expression, defaults to UTC. [IANA time zones](https://docs.python.org/3/library/zoneinfo.html).
3031
owner: The owner of the model.
3132
start: The earliest date that the model will be backfilled for. If this is None,
3233
then the date is inferred by taking the most recent start date of its ancestors.
@@ -55,6 +56,7 @@ class ModelDefaultsConfig(BaseConfig):
5556
kind: t.Optional[ModelKind] = None
5657
dialect: t.Optional[str] = None
5758
cron: t.Optional[str] = None
59+
cron_tz: t.Any = None
5860
owner: t.Optional[str] = None
5961
start: t.Optional[TimeLike] = None
6062
table_format: t.Optional[str] = None
@@ -78,6 +80,7 @@ class ModelDefaultsConfig(BaseConfig):
7880
_model_kind_validator = model_kind_validator
7981
_on_destructive_change_validator = on_destructive_change_validator
8082
_on_additive_change_validator = on_additive_change_validator
83+
_cron_tz_validator = cron_tz_validator
8184

8285
@field_validator("audits", mode="before")
8386
def _audits_validator(cls, v: t.Any) -> t.Any:

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -716,8 +716,12 @@ def use_server_nulls_for_unmatched_after_join(
716716
return query
717717

718718
def _build_settings_property(
719-
self, key: str, value: exp.Expr | str | int | float
719+
self, settings: t.Mapping[str, exp.Expr | str | int | float]
720720
) -> exp.SettingsProperty:
721+
# ClickHouse requires every key=value pair to live under a single
722+
# SETTINGS clause (`SETTINGS a = 1, b = 2`). Emitting one
723+
# SettingsProperty per pair produces repeated SETTINGS keywords and a
724+
# syntax error at execution time.
721725
return exp.SettingsProperty(
722726
expressions=[
723727
exp.EQ(
@@ -726,6 +730,7 @@ def _build_settings_property(
726730
if isinstance(value, exp.Expr)
727731
else exp.Literal(this=value, is_string=isinstance(value, str)),
728732
)
733+
for key, value in settings.items()
729734
]
730735
)
731736

@@ -827,9 +832,7 @@ def _build_table_properties_exp(
827832
properties.append(exp.EmptyProperty())
828833

829834
if table_properties_copy:
830-
properties.extend(
831-
[self._build_settings_property(k, v) for k, v in table_properties_copy.items()]
832-
)
835+
properties.append(self._build_settings_property(table_properties_copy))
833836

834837
if table_description:
835838
properties.append(
@@ -858,9 +861,7 @@ def _build_view_properties_exp(
858861
properties.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))
859862

860863
if view_properties_copy:
861-
properties.extend(
862-
[self._build_settings_property(k, v) for k, v in view_properties_copy.items()]
863-
)
864+
properties.append(self._build_settings_property(view_properties_copy))
864865

865866
if table_description:
866867
properties.append(

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import typing as t
55

66
from sqlglot import exp
7+
from sqlglot.helper import ensure_list
78

89
from sqlmesh.core.dialect import to_schema
910
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
@@ -30,6 +31,7 @@
3031

3132
from sqlmesh.core._typing import SchemaName, TableName
3233
from sqlmesh.core.engine_adapter.base import QueryOrDF, Query
34+
from sqlmesh.core.node import IntervalUnit
3335

3436
logger = logging.getLogger(__name__)
3537

@@ -249,6 +251,63 @@ def create_view(
249251
**create_kwargs,
250252
)
251253

254+
def _build_table_properties_exp(
255+
self,
256+
catalog_name: t.Optional[str] = None,
257+
table_format: t.Optional[str] = None,
258+
storage_format: t.Optional[str] = None,
259+
partitioned_by: t.Optional[t.List[exp.Expr]] = None,
260+
partition_interval_unit: t.Optional[IntervalUnit] = None,
261+
clustered_by: t.Optional[t.List[exp.Expr]] = None,
262+
table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
263+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
264+
table_description: t.Optional[str] = None,
265+
table_kind: t.Optional[str] = None,
266+
**kwargs: t.Any,
267+
) -> t.Optional[exp.Properties]:
268+
properties: t.List[exp.Expr] = []
269+
270+
if table_description:
271+
properties.append(
272+
exp.SchemaCommentProperty(
273+
this=exp.Literal.string(self._truncate_table_comment(table_description))
274+
)
275+
)
276+
277+
def _to_identifier_if_string(expression: exp.Expr) -> exp.Expr:
278+
if isinstance(expression, exp.Literal) and expression.is_string:
279+
return exp.to_identifier(expression.this)
280+
return expression.copy()
281+
282+
if table_properties:
283+
table_properties = {k.upper(): v for k, v in table_properties.items()}
284+
285+
table_type = self._pop_creatable_type_from_properties(table_properties)
286+
properties.extend(ensure_list(table_type))
287+
288+
diststyle = table_properties.get("DISTSTYLE")
289+
if diststyle:
290+
properties.append(exp.DistStyleProperty(this=exp.var(diststyle.name.upper())))
291+
292+
distkey = table_properties.get("DISTKEY")
293+
if distkey:
294+
properties.append(exp.DistKeyProperty(this=_to_identifier_if_string(distkey)))
295+
296+
sortkey = table_properties.get("SORTKEY")
297+
if sortkey:
298+
sortkey_expressions = sortkey.expressions if sortkey.expressions else [sortkey]
299+
properties.append(
300+
exp.SortKeyProperty(
301+
this=[
302+
_to_identifier_if_string(expression)
303+
for expression in sortkey_expressions
304+
],
305+
compound=False,
306+
)
307+
)
308+
309+
return exp.Properties(expressions=properties) if properties else None
310+
252311
def replace_query(
253312
self,
254313
table_name: TableName,

sqlmesh/core/node.py

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,30 @@ def dbt_fqn(self) -> t.Optional[str]:
260260
}
261261

262262

263+
def _cron_tz_validator(cls: t.Type, v: t.Any) -> t.Optional[zoneinfo.ZoneInfo]:
264+
if not v or v == "UTC":
265+
return None
266+
267+
v = str_or_exp_to_str(v)
268+
269+
try:
270+
return zoneinfo.ZoneInfo(v)
271+
except Exception as e:
272+
available_timezones = zoneinfo.available_timezones()
273+
274+
if available_timezones:
275+
raise ConfigError(f"{e}. {v} must be in {available_timezones}.")
276+
else:
277+
raise ConfigError(
278+
f"{e}. IANA time zone data is not available on your system. `pip install tzdata` to leverage cron time zones or remove this field which will default to UTC."
279+
)
280+
281+
return None
282+
283+
284+
cron_tz_validator = field_validator("cron_tz", mode="before")(_cron_tz_validator)
285+
286+
263287
class _Node(DbtInfoMixin, PydanticModel):
264288
"""
265289
Node is the core abstraction for entity that can be executed within the scheduler.
@@ -302,6 +326,8 @@ class _Node(DbtInfoMixin, PydanticModel):
302326
_croniter: t.Optional[CroniterCache] = None
303327
__inferred_interval_unit: t.Optional[IntervalUnit] = None
304328

329+
_cron_tz_validator = cron_tz_validator
330+
305331
def __str__(self) -> str:
306332
path = f": {self._path.name}" if self._path else ""
307333
return f"{self.__class__.__name__}<{self.name}{path}>"
@@ -328,27 +354,6 @@ def _name_validator(cls, v: t.Any) -> t.Optional[str]:
328354
return v.meta["sql"]
329355
return str(v)
330356

331-
@field_validator("cron_tz", mode="before")
332-
def _cron_tz_validator(cls, v: t.Any) -> t.Optional[zoneinfo.ZoneInfo]:
333-
if not v or v == "UTC":
334-
return None
335-
336-
v = str_or_exp_to_str(v)
337-
338-
try:
339-
return zoneinfo.ZoneInfo(v)
340-
except Exception as e:
341-
available_timezones = zoneinfo.available_timezones()
342-
343-
if available_timezones:
344-
raise ConfigError(f"{e}. {v} must be in {available_timezones}.")
345-
else:
346-
raise ConfigError(
347-
f"{e}. IANA time zone data is not available on your system. `pip install tzdata` to leverage cron time zones or remove this field which will default to UTC."
348-
)
349-
350-
return None
351-
352357
@field_validator("start", "end", mode="before")
353358
@classmethod
354359
def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]:

sqlmesh/core/test/definition.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ def _to_hashable(x: t.Any) -> t.Any:
308308
expected,
309309
actual,
310310
check_dtype=False,
311-
check_datetimelike_compat=True,
312311
check_like=True, # Ignore column order
313312
)
314313
except AssertionError as e:

tests/core/engine_adapter/test_clickhouse.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,13 +316,32 @@ def build_properties_sql(storage_format="", order_by="", primary_key="", propert
316316
== "ENGINE=MergeTree ORDER BY (a, b + 1) PRIMARY KEY (a, b)"
317317
)
318318

319+
# Multiple physical_properties must be combined into a single comma-separated
320+
# SETTINGS clause. ClickHouse rejects repeated SETTINGS keywords with a syntax
321+
# error (see https://github.com/SQLMesh/sqlmesh/issues/5803).
319322
assert (
320323
build_properties_sql(
321324
order_by="ORDER_BY = (a, b + 1),",
322325
primary_key="PRIMARY_KEY = (a, b),",
323326
properties="PROP1 = 1, PROP2 = '2'",
324327
)
325-
== "ENGINE=MergeTree ORDER BY (a, b + 1) PRIMARY KEY (a, b) SETTINGS prop1 = 1 SETTINGS prop2 = '2'"
328+
== "ENGINE=MergeTree ORDER BY (a, b + 1) PRIMARY KEY (a, b) SETTINGS prop1 = 1, prop2 = '2'"
329+
)
330+
331+
# Regression test for #5803: three or more SETTINGS entries also combine.
332+
assert (
333+
build_properties_sql(
334+
order_by="ORDER_BY = (orders_id),",
335+
properties=(
336+
"min_age_to_force_merge_seconds = 3600, "
337+
"min_age_to_force_merge_on_partition_only = 1, "
338+
"index_granularity = 8192"
339+
),
340+
)
341+
== "ENGINE=MergeTree ORDER BY (orders_id) "
342+
"SETTINGS min_age_to_force_merge_seconds = 3600, "
343+
"min_age_to_force_merge_on_partition_only = 1, "
344+
"index_granularity = 8192"
326345
)
327346

328347
assert (
@@ -345,6 +364,20 @@ def build_properties_sql(storage_format="", order_by="", primary_key="", propert
345364
)
346365

347366

367+
def test_view_properties_combine_settings(adapter: ClickhouseEngineAdapter):
368+
# View properties hit the same SettingsProperty code path as table
369+
# properties (#5803): multiple entries must collapse into one SETTINGS
370+
# clause rather than emit repeated SETTINGS keywords.
371+
view_properties_exp = adapter._build_view_properties_exp(
372+
view_properties={
373+
"prop1": exp.Literal.number(1),
374+
"prop2": exp.Literal.string("2"),
375+
}
376+
)
377+
assert view_properties_exp is not None
378+
assert view_properties_exp.sql("clickhouse") == "SETTINGS prop1 = 1, prop2 = '2'"
379+
380+
348381
def test_partitioned_by_expr(make_mocked_engine_adapter: t.Callable):
349382
# user doesn't specify, unknown time column type
350383
model = load_sql_based_model(

0 commit comments

Comments
 (0)