Skip to content

Commit 671f762

Browse files
Gabe PescoGabe Pesco
authored andcommitted
feat: extend ownership control to physical layer tables
Add `physical_owner` field to `OwnershipConfig` so that SQLMesh__* physical tables get ownership applied at creation time, not just virtual-layer views and schemas. * `OwnershipConfig.physical_owner` - optional plain string, no env-pattern matching needed * `EngineAdapterBase.alter_table_owner` - no-op default * `SparkEngineAdapter.alter_table_owner` - ALTER TABLE ... OWNER TO ... with backtick-quoting * `SnapshotEvaluator.create_snapshot` - calls alter_table_owner after _execute_create (skips ViewKind) * `SnapshotEvaluator.create` / `create_physical_schemas` - accept and thread owner param * `BuiltInPlanEvaluator` - resolves physical_owner in both PhysicalLayerSchemaCreation and PhysicalLayerUpdate stages Signed-off-by: Gabe Pesco <gabe.pesco@milliman.com> Signed-off-by: Gabe Pesco <PescoG@medinsight.milliman.com>
1 parent 1b226d3 commit 671f762

8 files changed

Lines changed: 162 additions & 5 deletions

File tree

sqlmesh/core/config/ownership.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ class OwnershipConfig(BaseConfig):
3030
environment_owner_mapping:
3131
"^prod$": "svc_prod_spn"
3232
".*": "group:shared-developers"
33+
physical_owner: "group:shared-developers"
3334
"""
3435

3536
environment_owner_mapping: OwnershipMapping = {}
37+
physical_owner: t.Optional[str] = None
3638

3739
def resolve_owner(self, environment_name: str) -> t.Optional[str]:
3840
"""Return the configured owner for the given environment name, or None."""

sqlmesh/core/engine_adapter/base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,6 +1432,13 @@ def alter_view_owner(self, view_name: TableName, owner: str) -> None:
14321432
(e.g. Spark/Databricks Unity Catalog: ALTER VIEW ... OWNER TO ...).
14331433
"""
14341434

1435+
def alter_table_owner(self, table_name: TableName, owner: str) -> None:
1436+
"""Set the owner of a table.
1437+
1438+
No-op by default. Override in dialect-specific adapters that support ownership control
1439+
(e.g. Spark/Databricks Unity Catalog: ALTER TABLE ... OWNER TO ...).
1440+
"""
1441+
14351442
def drop_schema(
14361443
self,
14371444
schema_name: SchemaName,

sqlmesh/core/engine_adapter/spark.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,13 @@ def alter_view_owner(self, view_name: TableName, owner: str) -> None:
567567
owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect)
568568
self.execute(f"ALTER VIEW {view_sql} OWNER TO {owner_sql}")
569569

570+
def alter_table_owner(self, table_name: TableName, owner: str) -> None:
571+
table_sql = exp.to_table(table_name, dialect=self.dialect).sql(
572+
dialect=self.dialect, identify=True
573+
)
574+
owner_sql = exp.to_identifier(owner, quoted=True).sql(dialect=self.dialect)
575+
self.execute(f"ALTER TABLE {table_sql} OWNER TO {owner_sql}")
576+
570577
@classmethod
571578
def _wap_branch_name(cls, wap_id: str) -> str:
572579
return f"{cls.WAP_PREFIX}{wap_id}"

sqlmesh/core/plan/evaluator.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ def visit_physical_layer_update_stage(
175175
self.console.log_success(skip_message)
176176
return
177177

178+
physical_owner = self.ownership_config.physical_owner if self.ownership_config else None
178179
completion_status = None
179180
progress_stopped = False
180181
try:
@@ -188,6 +189,7 @@ def visit_physical_layer_update_stage(
188189
x, plan.environment, self.default_catalog
189190
),
190191
on_complete=self.console.update_creation_progress,
192+
owner=physical_owner,
191193
)
192194
if completion_status.is_nothing_to_do:
193195
self.console.log_success(skip_message)
@@ -212,9 +214,10 @@ def visit_physical_layer_update_stage(
212214
def visit_physical_layer_schema_creation_stage(
213215
self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan
214216
) -> None:
217+
physical_owner = self.ownership_config.physical_owner if self.ownership_config else None
215218
try:
216219
self.snapshot_evaluator.create_physical_schemas(
217-
stage.snapshots, stage.deployability_index
220+
stage.snapshots, stage.deployability_index, owner=physical_owner
218221
)
219222
except Exception as ex:
220223
raise PlanError("Plan application failed.") from ex

sqlmesh/core/snapshot/evaluator.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ def create(
368368
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
369369
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
370370
allow_additive_snapshots: t.Optional[t.Set[str]] = None,
371+
owner: t.Optional[str] = None,
371372
) -> CompletionStatus:
372373
"""Creates a physical snapshot schema and table for the given collection of snapshots.
373374
@@ -379,6 +380,7 @@ def create(
379380
on_complete: A callback to call on each successfully created snapshot.
380381
allow_destructive_snapshots: Set of snapshots that are allowed to have destructive schema changes.
381382
allow_additive_snapshots: Set of snapshots that are allowed to have additive schema changes.
383+
owner: Optional principal to set as table owner after creation.
382384
383385
Returns:
384386
CompletionStatus: The status of the creation operation (success, failure, nothing to do).
@@ -398,17 +400,22 @@ def create(
398400
on_complete=on_complete,
399401
allow_destructive_snapshots=allow_destructive_snapshots or set(),
400402
allow_additive_snapshots=allow_additive_snapshots or set(),
403+
owner=owner,
401404
)
402405
return CompletionStatus.SUCCESS
403406

404407
def create_physical_schemas(
405-
self, snapshots: t.Iterable[Snapshot], deployability_index: DeployabilityIndex
408+
self,
409+
snapshots: t.Iterable[Snapshot],
410+
deployability_index: DeployabilityIndex,
411+
owner: t.Optional[str] = None,
406412
) -> None:
407413
"""Creates the physical schemas for the given snapshots.
408414
409415
Args:
410416
snapshots: Snapshots to create physical schemas for.
411417
deployability_index: Determines snapshots that are deployable in the context of this creation.
418+
owner: Optional principal to set as schema owner after creation.
412419
"""
413420
tables_by_gateway: t.Dict[t.Optional[str], t.List[str]] = defaultdict(list)
414421
for snapshot in snapshots:
@@ -420,7 +427,7 @@ def create_physical_schemas(
420427
gateway_table_pairs = [
421428
(gateway, table) for gateway, tables in tables_by_gateway.items() for table in tables
422429
]
423-
self._create_schemas(gateway_table_pairs=gateway_table_pairs)
430+
self._create_schemas(gateway_table_pairs=gateway_table_pairs, owner=owner)
424431

425432
def get_snapshots_to_create(
426433
self, target_snapshots: t.Iterable[Snapshot], deployability_index: DeployabilityIndex
@@ -453,6 +460,7 @@ def _create_snapshots(
453460
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]],
454461
allow_destructive_snapshots: t.Set[str],
455462
allow_additive_snapshots: t.Set[str],
463+
owner: t.Optional[str] = None,
456464
) -> None:
457465
"""Internal method to create tables in parallel."""
458466
with self.concurrent_context():
@@ -465,6 +473,7 @@ def _create_snapshots(
465473
allow_destructive_snapshots=allow_destructive_snapshots,
466474
allow_additive_snapshots=allow_additive_snapshots,
467475
on_complete=on_complete,
476+
owner=owner,
468477
),
469478
self.ddl_concurrent_tasks,
470479
raise_on_error=False,
@@ -870,6 +879,7 @@ def create_snapshot(
870879
allow_destructive_snapshots: t.Set[str],
871880
allow_additive_snapshots: t.Set[str],
872881
on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None,
882+
owner: t.Optional[str] = None,
873883
) -> None:
874884
"""Creates a physical table for the given snapshot.
875885
@@ -880,6 +890,7 @@ def create_snapshot(
880890
on_complete: A callback to call on each successfully created database object.
881891
allow_destructive_snapshots: Snapshots for which destructive schema changes are allowed.
882892
allow_additive_snapshots: Snapshots for which additive schema changes are allowed.
893+
owner: Optional principal to set as table owner after creation.
883894
"""
884895
if not snapshot.is_model:
885896
return
@@ -907,6 +918,9 @@ def create_snapshot(
907918
**create_render_kwargs
908919
)
909920

921+
is_table_deployable = deployability_index.is_deployable(snapshot)
922+
table_name = snapshot.table_name(is_deployable=is_table_deployable)
923+
910924
if self._can_clone(snapshot, deployability_index):
911925
self._clone_snapshot_in_dev(
912926
snapshot=snapshot,
@@ -919,17 +933,19 @@ def create_snapshot(
919933
run_pre_post_statements=True,
920934
)
921935
else:
922-
is_table_deployable = deployability_index.is_deployable(snapshot)
923936
self._execute_create(
924937
snapshot=snapshot,
925-
table_name=snapshot.table_name(is_deployable=is_table_deployable),
938+
table_name=table_name,
926939
is_table_deployable=is_table_deployable,
927940
deployability_index=deployability_index,
928941
create_render_kwargs=create_render_kwargs,
929942
rendered_physical_properties=rendered_physical_properties,
930943
dry_run=True,
931944
)
932945

946+
if owner and not isinstance(snapshot.model.kind, ViewKind):
947+
adapter.alter_table_owner(table_name, owner)
948+
933949
evaluation_strategy.run_post_statements(
934950
snapshot=snapshot, render_kwargs={**create_render_kwargs, "inside_transaction": False}
935951
)

tests/core/engine_adapter/test_spark.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,6 +1125,23 @@ def test_alter_view_owner_special_chars_in_principal(make_mocked_engine_adapter:
11251125
]
11261126

11271127

1128+
def test_alter_table_owner(make_mocked_engine_adapter: t.Callable):
1129+
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
1130+
adapter.alter_table_owner("catalog.sqlmesh__sushi.orders__abc123", "svc_prod_spn")
1131+
assert to_sql_calls(adapter) == [
1132+
"ALTER TABLE `catalog`.`sqlmesh__sushi`.`orders__abc123` OWNER TO `svc_prod_spn`"
1133+
]
1134+
1135+
1136+
def test_alter_table_owner_special_chars_in_principal(make_mocked_engine_adapter: t.Callable):
1137+
# Databricks Unity Catalog principals can contain colons and @ signs.
1138+
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
1139+
adapter.alter_table_owner("catalog.sqlmesh__sushi.orders__abc123", "group:data@company.com")
1140+
assert to_sql_calls(adapter) == [
1141+
"ALTER TABLE `catalog`.`sqlmesh__sushi`.`orders__abc123` OWNER TO `group:data@company.com`"
1142+
]
1143+
1144+
11281145
def test_alter_schema_owner_base_noop(make_mocked_engine_adapter: t.Callable):
11291146
# The base EngineAdapter.alter_schema_owner is a no-op: adapters that don't
11301147
# support ownership control silently skip it without emitting any SQL.
@@ -1133,6 +1150,7 @@ def test_alter_schema_owner_base_noop(make_mocked_engine_adapter: t.Callable):
11331150
adapter = make_mocked_engine_adapter(DuckDBEngineAdapter)
11341151
adapter.alter_schema_owner("my_schema", "some_owner")
11351152
adapter.alter_view_owner("my_schema.my_view", "some_owner")
1153+
adapter.alter_table_owner("my_schema.my_table", "some_owner")
11361154
# No ALTER SQL should have been emitted
11371155
alter_calls = [s for s in to_sql_calls(adapter) if "OWNER" in s.upper()]
11381156
assert alter_calls == []

tests/core/test_config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,28 @@ def test_config_ownership_defaults_to_empty():
713713
assert attach_config_2.read_only is True
714714

715715

716+
def test_ownership_config_physical_owner():
717+
# physical_owner is a simple optional string — no pattern matching.
718+
config = OwnershipConfig(physical_owner="group:data-platform")
719+
assert config.physical_owner == "group:data-platform"
720+
721+
722+
def test_ownership_config_physical_owner_default_none():
723+
assert OwnershipConfig().physical_owner is None
724+
725+
726+
def test_ownership_config_physical_owner_deserialization():
727+
config = Config(
728+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
729+
ownership={
730+
"environment_owner_mapping": {"^prod$": "svc_prod"},
731+
"physical_owner": "group:data-platform",
732+
},
733+
)
734+
assert config.ownership.physical_owner == "group:data-platform"
735+
assert config.ownership.resolve_owner("prod") == "svc_prod"
736+
737+
716738
def test_load_model_defaults_audits(tmp_path):
717739
config_path = tmp_path / "config_model_defaults_audits.yaml"
718740
with open(config_path, "w", encoding="utf-8") as fd:

tests/core/test_snapshot_evaluator.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,88 @@ def test_promote_owner_applied_per_view(mocker: MockerFixture, adapter_mock, mak
504504
assert called_owners == {"svc_prod_spn"}
505505

506506

507+
def test_create_with_physical_owner(mocker: MockerFixture, adapter_mock, make_snapshot):
508+
"""alter_table_owner is called for each non-view table when physical owner is set."""
509+
adapter_mock.get_data_objects.return_value = []
510+
evaluator = SnapshotEvaluator(adapter_mock)
511+
512+
model = SqlModel(
513+
name="test_schema.test_model",
514+
kind=IncrementalByTimeRangeKind(time_column="ds"),
515+
storage_format="parquet",
516+
query=parse_one("SELECT a, ds FROM tbl WHERE ds BETWEEN @start_ds AND @end_ds"),
517+
)
518+
snapshot = make_snapshot(model)
519+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
520+
521+
evaluator.create([snapshot], {}, owner="group:data-platform")
522+
523+
adapter_mock.alter_table_owner.assert_called_once()
524+
call_args = adapter_mock.alter_table_owner.call_args
525+
assert call_args.args[1] == "group:data-platform"
526+
527+
528+
def test_create_without_physical_owner_skips_alter(
529+
mocker: MockerFixture, adapter_mock, make_snapshot
530+
):
531+
"""When no physical owner is set, alter_table_owner is never called."""
532+
adapter_mock.get_data_objects.return_value = []
533+
evaluator = SnapshotEvaluator(adapter_mock)
534+
535+
model = SqlModel(
536+
name="test_schema.test_model",
537+
kind=IncrementalByTimeRangeKind(time_column="ds"),
538+
storage_format="parquet",
539+
query=parse_one("SELECT a, ds FROM tbl WHERE ds BETWEEN @start_ds AND @end_ds"),
540+
)
541+
snapshot = make_snapshot(model)
542+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
543+
544+
evaluator.create([snapshot], {})
545+
546+
adapter_mock.alter_table_owner.assert_not_called()
547+
548+
549+
def test_create_view_kind_skips_physical_owner(
550+
mocker: MockerFixture, adapter_mock, make_snapshot
551+
):
552+
"""ViewKind snapshots skip alter_table_owner even when physical_owner is set."""
553+
adapter_mock.get_data_objects.return_value = []
554+
evaluator = SnapshotEvaluator(adapter_mock)
555+
556+
model = SqlModel(
557+
name="test_schema.test_view",
558+
kind=ViewKind(),
559+
query=parse_one("SELECT 1"),
560+
)
561+
snapshot = make_snapshot(model)
562+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
563+
564+
evaluator.create([snapshot], {}, owner="group:data-platform")
565+
566+
adapter_mock.alter_table_owner.assert_not_called()
567+
568+
569+
def test_create_physical_schemas_with_owner(mocker: MockerFixture, adapter_mock, make_snapshot):
570+
"""create_physical_schemas passes owner to _create_schemas so alter_schema_owner is called."""
571+
evaluator = SnapshotEvaluator(adapter_mock)
572+
deployability_index = DeployabilityIndex.all_deployable()
573+
574+
model = SqlModel(
575+
name="test_schema.test_model",
576+
kind=IncrementalByTimeRangeKind(time_column="ds"),
577+
storage_format="parquet",
578+
query=parse_one("SELECT a, ds FROM tbl WHERE ds BETWEEN @start_ds AND @end_ds"),
579+
)
580+
snapshot = make_snapshot(model)
581+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
582+
583+
evaluator.create_physical_schemas([snapshot], deployability_index, owner="svc_prod_spn")
584+
585+
adapter_mock.alter_schema_owner.assert_called_once()
586+
assert adapter_mock.alter_schema_owner.call_args.args[1] == "svc_prod_spn"
587+
588+
507589
def test_cleanup(mocker: MockerFixture, adapter_mock, make_snapshot):
508590
evaluator = SnapshotEvaluator(adapter_mock)
509591

0 commit comments

Comments
 (0)