Skip to content

Commit 1b226d3

Browse files
Gabe PescoGabe Pesco
authored andcommitted
Adding schema ownership
Signed-off-by: Gabe Pesco <PescoG@medinsight.milliman.com>
1 parent 41d9906 commit 1b226d3

12 files changed

Lines changed: 349 additions & 1 deletion

File tree

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
load_configs as load_configs,
3333
)
3434
from sqlmesh.core.config.migration import MigrationConfig as MigrationConfig
35+
from sqlmesh.core.config.ownership import OwnershipConfig as OwnershipConfig
3536
from sqlmesh.core.config.model import ModelDefaultsConfig as ModelDefaultsConfig
3637
from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
3738
from sqlmesh.core.config.linter import LinterConfig as LinterConfig

sqlmesh/core/config/ownership.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from __future__ import annotations
2+
3+
import re
4+
import typing as t
5+
6+
from pydantic.functional_validators import BeforeValidator
7+
8+
from sqlmesh.core.config.base import BaseConfig
9+
from sqlmesh.core.config.common import compile_regex_mapping
10+
11+
if t.TYPE_CHECKING:
12+
OwnershipMapping = t.Dict[re.Pattern, str]
13+
else:
14+
OwnershipMapping = t.Annotated[
15+
t.Dict[re.Pattern, str], BeforeValidator(compile_regex_mapping)
16+
]
17+
18+
19+
class OwnershipConfig(BaseConfig):
20+
"""Configuration for object ownership rules applied at creation time.
21+
22+
Maps environment name regex patterns to owner principals. The first
23+
matching pattern wins. Ownership is applied immediately when schemas and
24+
views are created, so even a partially-completed run leaves objects in a
25+
manageable state.
26+
27+
Example::
28+
29+
ownership:
30+
environment_owner_mapping:
31+
"^prod$": "svc_prod_spn"
32+
".*": "group:shared-developers"
33+
"""
34+
35+
environment_owner_mapping: OwnershipMapping = {}
36+
37+
def resolve_owner(self, environment_name: str) -> t.Optional[str]:
38+
"""Return the configured owner for the given environment name, or None."""
39+
for pattern, owner in self.environment_owner_mapping.items():
40+
if pattern.fullmatch(environment_name):
41+
return owner
42+
return None

sqlmesh/core/config/root.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from sqlmesh.core.config.format import FormatConfig
3131
from sqlmesh.core.config.gateway import GatewayConfig
3232
from sqlmesh.core.config.janitor import JanitorConfig
33+
from sqlmesh.core.config.ownership import OwnershipConfig
3334
from sqlmesh.core.config.migration import MigrationConfig
3435
from sqlmesh.core.config.model import ModelDefaultsConfig
3536
from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
@@ -118,6 +119,7 @@ class Config(BaseConfig):
118119
gateway_managed_virtual_layer: Whether the models' views in the virtual layer are created by the model-specific gateway rather than the default gateway.
119120
infer_python_dependencies: Whether to statically analyze Python code to automatically infer Python package requirements.
120121
environment_catalog_mapping: A mapping from regular expressions to catalog names. The catalog name is used to determine the target catalog for a given environment.
122+
ownership: Ownership rules applied at schema/view creation time. Maps environment name patterns to owner principals so objects are correctly owned even after a partial run.
121123
default_target_environment: The name of the environment that will be the default target for the `sqlmesh plan` and `sqlmesh run` commands.
122124
log_limit: The default number of logs to keep.
123125
format: The formatting options for SQL code.
@@ -175,6 +177,7 @@ class Config(BaseConfig):
175177
janitor: JanitorConfig = JanitorConfig()
176178
cache_dir: t.Optional[str] = None
177179
dbt: t.Optional[DbtConfig] = None
180+
ownership: OwnershipConfig = Field(default_factory=OwnershipConfig)
178181

179182
_FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
180183
"gateways": UpdateStrategy.NESTED_UPDATE,
@@ -194,6 +197,7 @@ class Config(BaseConfig):
194197
"after_all": UpdateStrategy.EXTEND,
195198
"linter": UpdateStrategy.NESTED_UPDATE,
196199
"dbt": UpdateStrategy.NESTED_UPDATE,
200+
"ownership": UpdateStrategy.NESTED_UPDATE,
197201
}
198202

199203
_connection_config_validator = connection_config_validator

sqlmesh/core/config/scheduler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,18 @@ class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig)
128128
type_: t.Literal["builtin"] = Field(alias="type", default="builtin")
129129

130130
def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
131+
from sqlmesh.core.config.ownership import OwnershipConfig
132+
133+
ownership_config = getattr(context.config, "ownership", None)
134+
if isinstance(ownership_config, OwnershipConfig) and not ownership_config.environment_owner_mapping:
135+
ownership_config = None
131136
return BuiltInPlanEvaluator(
132137
state_sync=context.state_sync,
133138
snapshot_evaluator=context.snapshot_evaluator,
134139
create_scheduler=context.create_scheduler,
135140
default_catalog=context.default_catalog,
136141
console=context.console,
142+
ownership_config=ownership_config,
137143
)
138144

139145
def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:

sqlmesh/core/engine_adapter/base.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,6 +1418,20 @@ def _create_schema(
14181418
raise
14191419
logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e)
14201420

1421+
def alter_schema_owner(self, schema_name: SchemaName, owner: str) -> None:
1422+
"""Set the owner of a schema.
1423+
1424+
No-op by default. Override in dialect-specific adapters that support ownership control
1425+
(e.g. Spark/Databricks Unity Catalog: ALTER SCHEMA ... OWNER TO ...).
1426+
"""
1427+
1428+
def alter_view_owner(self, view_name: TableName, owner: str) -> None:
1429+
"""Set the owner of a view.
1430+
1431+
No-op by default. Override in dialect-specific adapters that support ownership control
1432+
(e.g. Spark/Databricks Unity Catalog: ALTER VIEW ... OWNER TO ...).
1433+
"""
1434+
14211435
def drop_schema(
14221436
self,
14231437
schema_name: SchemaName,

sqlmesh/core/engine_adapter/spark.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,20 @@ def _build_create_comment_column_exp(
553553

554554
return f"ALTER TABLE {table_sql} ALTER COLUMN {column_sql} COMMENT {comment_sql}"
555555

556+
def alter_schema_owner(self, schema_name: SchemaName, owner: str) -> None:
557+
schema_sql = exp.to_table(schema_name, dialect=self.dialect).sql(
558+
dialect=self.dialect, identify=True
559+
)
560+
owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect)
561+
self.execute(f"ALTER SCHEMA {schema_sql} OWNER TO {owner_sql}")
562+
563+
def alter_view_owner(self, view_name: TableName, owner: str) -> None:
564+
view_sql = exp.to_table(view_name, dialect=self.dialect).sql(
565+
dialect=self.dialect, identify=True
566+
)
567+
owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect)
568+
self.execute(f"ALTER VIEW {view_sql} OWNER TO {owner_sql}")
569+
556570
@classmethod
557571
def _wap_branch_name(cls, wap_id: str) -> str:
558572
return f"{cls.WAP_PREFIX}{wap_id}"

sqlmesh/core/plan/evaluator.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from sqlmesh.core.plan.common import identify_restatement_intervals_across_snapshot_versions
4141
from sqlmesh.utils import CorrelationId
4242
from sqlmesh.utils.concurrency import NodeExecutionFailedError
43+
from sqlmesh.core.config.ownership import OwnershipConfig
4344
from sqlmesh.utils.errors import PlanError, ConflictingPlanError, SQLMeshError
4445
from sqlmesh.utils.date import now, to_timestamp
4546

@@ -74,12 +75,14 @@ def __init__(
7475
create_scheduler: t.Callable[[t.Iterable[Snapshot], SnapshotEvaluator], Scheduler],
7576
default_catalog: t.Optional[str],
7677
console: t.Optional[Console] = None,
78+
ownership_config: t.Optional[OwnershipConfig] = None,
7779
):
7880
self.state_sync = state_sync
7981
self.snapshot_evaluator = snapshot_evaluator
8082
self.create_scheduler = create_scheduler
8183
self.default_catalog = default_catalog
8284
self.console = console or get_console()
85+
self.ownership_config = ownership_config
8386
self._circuit_breaker: t.Optional[t.Callable[[], bool]] = None
8487

8588
def evaluate(
@@ -434,6 +437,9 @@ def _promote_snapshots(
434437
deployability_index: t.Optional[DeployabilityIndex] = None,
435438
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
436439
) -> None:
440+
owner: t.Optional[str] = None
441+
if self.ownership_config:
442+
owner = self.ownership_config.resolve_owner(environment_naming_info.name)
437443
self.snapshot_evaluator.promote(
438444
target_snapshots,
439445
start=plan.start,
@@ -449,6 +455,7 @@ def _promote_snapshots(
449455
environment_naming_info=environment_naming_info,
450456
deployability_index=deployability_index,
451457
on_complete=on_complete,
458+
owner=owner,
452459
)
453460

454461
def _demote_snapshots(

sqlmesh/core/snapshot/evaluator.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ def promote(
275275
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
276276
table_mapping: t.Optional[t.Dict[str, str]] = None,
277277
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
278+
owner: t.Optional[str] = None,
278279
) -> None:
279280
"""Promotes the given collection of snapshots in the target environment by replacing a corresponding
280281
view with a physical table associated with the given snapshot.
@@ -306,7 +307,7 @@ def promote(
306307
gateway_table_pairs = [
307308
(gateway, table) for gateway, tables in tables_by_gateway.items() for table in tables
308309
]
309-
self._create_schemas(gateway_table_pairs=gateway_table_pairs)
310+
self._create_schemas(gateway_table_pairs=gateway_table_pairs, owner=owner)
310311

311312
# Fetch the view data objects for the promoted snapshots to get them cached
312313
self._get_virtual_data_objects(target_snapshots, environment_naming_info)
@@ -325,6 +326,7 @@ def promote(
325326
environment_naming_info=environment_naming_info,
326327
deployability_index=deployability_index, # type: ignore
327328
on_complete=on_complete,
329+
owner=owner,
328330
),
329331
self.ddl_concurrent_tasks,
330332
)
@@ -1257,6 +1259,7 @@ def _promote_snapshot(
12571259
execution_time: t.Optional[TimeLike] = None,
12581260
snapshots: t.Optional[t.Dict[SnapshotId, Snapshot]] = None,
12591261
table_mapping: t.Optional[t.Dict[str, str]] = None,
1262+
owner: t.Optional[str] = None,
12601263
) -> None:
12611264
if not snapshot.is_model:
12621265
return
@@ -1298,6 +1301,9 @@ def _promote_snapshot(
12981301
render_kwargs["snapshots"] = snapshot_by_name
12991302
adapter.execute(snapshot.model.render_on_virtual_update(**render_kwargs))
13001303

1304+
if owner:
1305+
adapter.alter_view_owner(view_name, owner)
1306+
13011307
if on_complete is not None:
13021308
on_complete(snapshot)
13031309

@@ -1449,6 +1455,7 @@ def _create_catalogs(
14491455
def _create_schemas(
14501456
self,
14511457
gateway_table_pairs: t.Iterable[t.Tuple[t.Optional[str], t.Union[exp.Table, str]]],
1458+
owner: t.Optional[str] = None,
14521459
) -> None:
14531460
table_exprs = [(gateway, exp.to_table(t)) for gateway, t in gateway_table_pairs]
14541461
unique_schemas = {
@@ -1464,6 +1471,8 @@ def _create_schema(
14641471
logger.info("Creating schema '%s'", schema)
14651472
adapter = self.get_adapter(gateway)
14661473
adapter.create_schema(schema)
1474+
if owner:
1475+
adapter.alter_schema_owner(schema, owner)
14671476

14681477
with self.concurrent_context():
14691478
concurrent_apply_to_values(

tests/core/engine_adapter/test_spark.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,6 +1092,52 @@ def test_table_format(adapter: SparkEngineAdapter, mocker: MockerFixture):
10921092
]
10931093

10941094

1095+
def test_alter_schema_owner(make_mocked_engine_adapter: t.Callable):
1096+
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
1097+
adapter.alter_schema_owner("catalog.my_schema", "svc_prod_spn")
1098+
assert to_sql_calls(adapter) == [
1099+
"ALTER SCHEMA `catalog`.`my_schema` OWNER TO `svc_prod_spn`"
1100+
]
1101+
1102+
1103+
def test_alter_schema_owner_three_part_name(make_mocked_engine_adapter: t.Callable):
1104+
# Schema references are typically 2-part (catalog.schema), but verify quoting is correct.
1105+
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
1106+
adapter.alter_schema_owner("my_schema", "svc_prod_spn")
1107+
assert to_sql_calls(adapter) == ["ALTER SCHEMA `my_schema` OWNER TO `svc_prod_spn`"]
1108+
1109+
1110+
def test_alter_view_owner(make_mocked_engine_adapter: t.Callable):
1111+
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
1112+
adapter.alter_view_owner("catalog.my_schema.my_view", "svc_prod_spn")
1113+
assert to_sql_calls(adapter) == [
1114+
"ALTER VIEW `catalog`.`my_schema`.`my_view` OWNER TO `svc_prod_spn`"
1115+
]
1116+
1117+
1118+
def test_alter_view_owner_special_chars_in_principal(make_mocked_engine_adapter: t.Callable):
1119+
# Databricks Unity Catalog principals can contain colons and @ signs.
1120+
# Verify they are safely backtick-quoted and not interpreted as SQL syntax.
1121+
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
1122+
adapter.alter_view_owner("catalog.sushi__dev.orders", "group:devs@company.com")
1123+
assert to_sql_calls(adapter) == [
1124+
"ALTER VIEW `catalog`.`sushi__dev`.`orders` OWNER TO `group:devs@company.com`"
1125+
]
1126+
1127+
1128+
def test_alter_schema_owner_base_noop(make_mocked_engine_adapter: t.Callable):
1129+
# The base EngineAdapter.alter_schema_owner is a no-op: adapters that don't
1130+
# support ownership control silently skip it without emitting any SQL.
1131+
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
1132+
1133+
adapter = make_mocked_engine_adapter(DuckDBEngineAdapter)
1134+
adapter.alter_schema_owner("my_schema", "some_owner")
1135+
adapter.alter_view_owner("my_schema.my_view", "some_owner")
1136+
# No ALTER SQL should have been emitted
1137+
alter_calls = [s for s in to_sql_calls(adapter) if "OWNER" in s.upper()]
1138+
assert alter_calls == []
1139+
1140+
10951141
def test_get_data_object_wap_branch(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
10961142
adapter = make_mocked_engine_adapter(SparkEngineAdapter, patch_get_data_objects=False)
10971143
mocker.patch.object(adapter, "_get_data_objects", return_value=[])

tests/core/integration/test_config.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
GatewayConfig,
1616
ModelDefaultsConfig,
1717
DuckDBConnectionConfig,
18+
OwnershipConfig,
1819
TableNamingConvention,
1920
AutoCategorizationMode,
2021
)
@@ -578,3 +579,47 @@ def test_auto_categorization(sushi_context: Context):
578579
sushi_context.get_snapshot("sushi.waiter_as_customer_by_day", raise_if_missing=True).version
579580
== version
580581
)
582+
583+
584+
def test_ownership_config_plan_applies_without_error(
585+
tmp_path: Path, monkeypatch: MonkeyPatch
586+
) -> None:
587+
"""OwnershipConfig flows through the full plan/apply lifecycle without errors.
588+
589+
DuckDB's alter_schema_owner/alter_view_owner are no-ops, so we cannot verify
590+
that ownership was actually changed — but we confirm the config plumbing
591+
doesn't break schema creation, view promotion, or dev environment application.
592+
"""
593+
monkeypatch.chdir(tmp_path)
594+
595+
config = Config(
596+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
597+
default_connection=DuckDBConnectionConfig(),
598+
ownership=OwnershipConfig(
599+
environment_owner_mapping={
600+
"^prod$": "svc_prod_owner",
601+
".*": "group:shared-developers",
602+
}
603+
),
604+
)
605+
606+
models_dir = tmp_path / "models"
607+
models_dir.mkdir()
608+
(models_dir / "model.sql").write_text(
609+
"""
610+
MODEL (name example_schema.test_model, kind FULL);
611+
SELECT '1' AS a
612+
"""
613+
)
614+
615+
ctx = Context(config=config, paths=tmp_path)
616+
617+
# Prod plan/apply — exercises virtual layer schema and view creation with ownership config
618+
ctx.plan(auto_apply=True)
619+
assert ctx.engine_adapter.table_exists("example_schema.test_model")
620+
621+
# Dev plan/apply — exercises env-suffixed schema and view creation with ownership config
622+
ctx.plan(environment="dev", include_unmodified=True, auto_apply=True)
623+
metadata = DuckDBMetadata.from_context(ctx)
624+
dev_schemas = {s for s in metadata.schemas if "__dev" in s}
625+
assert len(dev_schemas) > 0

0 commit comments

Comments
 (0)