Skip to content

Commit c75f376

Browse files
Gabe Pescoclaude
authored andcommitted
feat: add callable resolver support to OwnershipConfig
- OwnershipConfig gains environment_owner_resolver and physical_owner_resolver callable fields for cases where the owner principal must be resolved at plan execution time (e.g. dynamic SPN identity via adapter.current_user()). - Callable resolvers take precedence over the static mapping/string fields when both are set. - resolve_owner() and resolve_physical_owner() now accept the active EngineAdapter so callables can query the connection. - Add current_user() to the base EngineAdapter (SELECT CURRENT_USER()). - Replace the getattr/isinstance guard in BuiltInSchedulerConfig with OwnershipConfig.is_active. - Update all resolve_owner/resolve_physical_owner call sites in BuiltInPlanEvaluator to thread the adapter through. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent ed2ce65 commit c75f376

6 files changed

Lines changed: 176 additions & 36 deletions

File tree

sqlmesh/core/config/ownership.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,70 @@
99
from sqlmesh.core.config.common import compile_regex_mapping
1010

1111
if t.TYPE_CHECKING:
12+
from sqlmesh.core.engine_adapter.base import EngineAdapter
1213
OwnershipMapping = t.Dict[re.Pattern, str]
14+
EnvironmentOwnerResolver = t.Callable[[str, EngineAdapter], t.Optional[str]]
15+
PhysicalOwnerResolver = t.Callable[[EngineAdapter], t.Optional[str]]
1316
else:
1417
OwnershipMapping = t.Annotated[t.Dict[re.Pattern, str], BeforeValidator(compile_regex_mapping)]
18+
EnvironmentOwnerResolver = t.Callable
19+
PhysicalOwnerResolver = t.Callable
1520

1621

1722
class OwnershipConfig(BaseConfig):
1823
"""Configuration for object ownership rules applied at creation time.
1924
20-
Maps environment name regex patterns to owner principals. The first
21-
matching pattern wins. Ownership is applied immediately when schemas and
22-
views are created, so even a partially-completed run leaves objects in a
23-
manageable state.
25+
For static YAML-based config, use ``environment_owner_mapping`` and
26+
``physical_owner``. For programmatic config where the principal must be
27+
resolved at plan-execution time (e.g. via ``adapter.current_user()`` or a
28+
Databricks API call), supply ``environment_owner_resolver`` and/or
29+
``physical_owner_resolver`` instead — callables take precedence over the
30+
static fields.
2431
25-
Example::
32+
Example (YAML)::
2633
2734
ownership:
2835
environment_owner_mapping:
2936
"^prod$": "svc_prod_spn"
3037
".*": "group:shared-developers"
3138
physical_owner: "group:shared-developers"
39+
40+
Example (Python)::
41+
42+
OwnershipConfig(
43+
environment_owner_resolver=lambda env, adapter: (
44+
adapter.current_user() if env == "prod" else "group:shared-developers"
45+
),
46+
physical_owner="group:shared-developers",
47+
)
3248
"""
3349

3450
environment_owner_mapping: OwnershipMapping = {}
51+
environment_owner_resolver: t.Optional[EnvironmentOwnerResolver] = None
3552
physical_owner: t.Optional[str] = None
53+
physical_owner_resolver: t.Optional[PhysicalOwnerResolver] = None
3654

37-
def resolve_owner(self, environment_name: str) -> t.Optional[str]:
38-
"""Return the configured owner for the given environment name, or None."""
55+
@property
56+
def is_active(self) -> bool:
57+
"""True when any ownership rule is configured."""
58+
return bool(
59+
self.environment_owner_resolver is not None
60+
or self.environment_owner_mapping
61+
or self.physical_owner is not None
62+
or self.physical_owner_resolver is not None
63+
)
64+
65+
def resolve_owner(self, environment_name: str, adapter: "EngineAdapter") -> t.Optional[str]:
66+
"""Return the configured owner for the given environment, or None."""
67+
if self.environment_owner_resolver is not None:
68+
return self.environment_owner_resolver(environment_name, adapter)
3969
for pattern, owner in self.environment_owner_mapping.items():
4070
if pattern.fullmatch(environment_name):
4171
return owner
4272
return None
73+
74+
def resolve_physical_owner(self, adapter: "EngineAdapter") -> t.Optional[str]:
75+
"""Return the configured physical-layer owner, or None."""
76+
if self.physical_owner_resolver is not None:
77+
return self.physical_owner_resolver(adapter)
78+
return self.physical_owner

sqlmesh/core/config/scheduler.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,8 @@ class BuiltInSchedulerConfig(_EngineAdapterStateSyncSchedulerConfig, BaseConfig)
128128
type_: t.Literal["builtin"] = Field(alias="type", default="builtin")
129129

130130
def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:
131-
from sqlmesh.core.config.ownership import OwnershipConfig
132-
133-
ownership_config = getattr(context.config, "ownership", None)
134-
if (
135-
isinstance(ownership_config, OwnershipConfig)
136-
and not ownership_config.environment_owner_mapping
137-
):
138-
ownership_config = None
131+
ownership = context.config.ownership
132+
ownership_config = ownership if ownership.is_active else None
139133
return BuiltInPlanEvaluator(
140134
state_sync=context.state_sync,
141135
snapshot_evaluator=context.snapshot_evaluator,

sqlmesh/core/engine_adapter/base.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,6 +1418,17 @@ def _create_schema(
14181418
raise
14191419
logger.warning("Failed to create %s '%s': %s", kind.lower(), schema_name, e)
14201420

1421+
def current_user(self) -> str:
1422+
"""Return the identity of the currently-connected principal.
1423+
1424+
Uses SQL ``CURRENT_USER()`` which is supported by Spark/Databricks and
1425+
DuckDB. Override in adapters where a different mechanism is required.
1426+
"""
1427+
row = self.fetchone("SELECT CURRENT_USER()")
1428+
if not row:
1429+
raise SQLMeshError("Could not determine current user: CURRENT_USER() returned no rows")
1430+
return row[0]
1431+
14211432
def alter_schema_owner(self, schema_name: SchemaName, owner: str) -> None:
14221433
"""Set the owner of a schema.
14231434

sqlmesh/core/plan/evaluator.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,11 @@ def visit_physical_layer_update_stage(
175175
self.console.log_success(skip_message)
176176
return
177177

178-
physical_owner = self.ownership_config.physical_owner if self.ownership_config else None
178+
physical_owner = (
179+
self.ownership_config.resolve_physical_owner(self.snapshot_evaluator.adapter)
180+
if self.ownership_config
181+
else None
182+
)
179183
completion_status = None
180184
progress_stopped = False
181185
try:
@@ -214,7 +218,11 @@ def visit_physical_layer_update_stage(
214218
def visit_physical_layer_schema_creation_stage(
215219
self, stage: stages.PhysicalLayerSchemaCreationStage, plan: EvaluatablePlan
216220
) -> None:
217-
physical_owner = self.ownership_config.physical_owner if self.ownership_config else None
221+
physical_owner = (
222+
self.ownership_config.resolve_physical_owner(self.snapshot_evaluator.adapter)
223+
if self.ownership_config
224+
else None
225+
)
218226
try:
219227
self.snapshot_evaluator.create_physical_schemas(
220228
stage.snapshots, stage.deployability_index, owner=physical_owner
@@ -442,7 +450,9 @@ def _promote_snapshots(
442450
) -> None:
443451
owner: t.Optional[str] = None
444452
if self.ownership_config:
445-
owner = self.ownership_config.resolve_owner(environment_naming_info.name)
453+
owner = self.ownership_config.resolve_owner(
454+
environment_naming_info.name, self.snapshot_evaluator.adapter
455+
)
446456
self.snapshot_evaluator.promote(
447457
target_snapshots,
448458
start=plan.start,

tests/core/engine_adapter/test_spark.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,24 @@ def test_alter_schema_owner_base_noop(make_mocked_engine_adapter: t.Callable):
11541154
assert alter_calls == []
11551155

11561156

1157+
def test_current_user(make_mocked_engine_adapter: t.Callable):
1158+
adapter = make_mocked_engine_adapter(SparkEngineAdapter)
1159+
adapter.cursor.fetchone.return_value = ("spn-abc-123",)
1160+
result = adapter.current_user()
1161+
assert result == "spn-abc-123"
1162+
sql_calls = to_sql_calls(adapter)
1163+
assert any("CURRENT_USER" in s.upper() for s in sql_calls)
1164+
1165+
1166+
def test_current_user_base_noop(make_mocked_engine_adapter: t.Callable):
1167+
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
1168+
1169+
adapter = make_mocked_engine_adapter(DuckDBEngineAdapter)
1170+
adapter.cursor.fetchone.return_value = ("duckdb-user",)
1171+
result = adapter.current_user()
1172+
assert result == "duckdb-user"
1173+
1174+
11571175
def test_get_data_object_wap_branch(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
11581176
adapter = make_mocked_engine_adapter(SparkEngineAdapter, patch_get_data_objects=False)
11591177
mocker.patch.object(adapter, "_get_data_objects", return_value=[])

tests/core/test_config.py

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -614,52 +614,58 @@ def test_load_duckdb_attach_config(tmp_path):
614614

615615

616616
def test_ownership_config_resolve_owner():
617+
mock_adapter = mock.MagicMock()
617618
config = OwnershipConfig(
618619
environment_owner_mapping={
619620
"^prod$": "svc_prod_spn",
620621
".*": "group:shared-developers",
621622
}
622623
)
623-
assert config.resolve_owner("prod") == "svc_prod_spn"
624-
assert config.resolve_owner("dev_alice") == "group:shared-developers"
625-
assert config.resolve_owner("staging") == "group:shared-developers"
624+
assert config.resolve_owner("prod", mock_adapter) == "svc_prod_spn"
625+
assert config.resolve_owner("dev_alice", mock_adapter) == "group:shared-developers"
626+
assert config.resolve_owner("staging", mock_adapter) == "group:shared-developers"
626627
# "production" does not match ^prod$ so falls through to .*
627-
assert config.resolve_owner("production") == "group:shared-developers"
628+
assert config.resolve_owner("production", mock_adapter) == "group:shared-developers"
628629

629630

630631
def test_ownership_config_empty_returns_none():
631-
assert OwnershipConfig().resolve_owner("prod") is None
632-
assert OwnershipConfig().resolve_owner("dev_env") is None
632+
mock_adapter = mock.MagicMock()
633+
assert OwnershipConfig().resolve_owner("prod", mock_adapter) is None
634+
assert OwnershipConfig().resolve_owner("dev_env", mock_adapter) is None
633635

634636

635637
def test_ownership_config_first_match_wins():
636638
# The catch-all .* comes before a more specific pattern — it always wins.
637639
# This documents the ordering contract: users must put specific patterns first.
640+
mock_adapter = mock.MagicMock()
638641
config = OwnershipConfig(
639642
environment_owner_mapping={
640643
".*": "catch_all_owner",
641644
"^prod$": "prod_owner",
642645
}
643646
)
644-
assert config.resolve_owner("prod") == "catch_all_owner"
647+
assert config.resolve_owner("prod", mock_adapter) == "catch_all_owner"
645648

646649

647650
def test_ownership_config_case_sensitive():
648651
# Patterns are compiled without re.IGNORECASE, so matching is case-sensitive.
652+
mock_adapter = mock.MagicMock()
649653
config = OwnershipConfig(environment_owner_mapping={"^prod$": "svc_prod"})
650-
assert config.resolve_owner("prod") == "svc_prod"
651-
assert config.resolve_owner("PROD") is None
652-
assert config.resolve_owner("Prod") is None
654+
assert config.resolve_owner("prod", mock_adapter) == "svc_prod"
655+
assert config.resolve_owner("PROD", mock_adapter) is None
656+
assert config.resolve_owner("Prod", mock_adapter) is None
653657

654658

655659
def test_ownership_config_no_match_returns_none():
660+
mock_adapter = mock.MagicMock()
656661
config = OwnershipConfig(environment_owner_mapping={"^prod$": "svc_prod"})
657-
assert config.resolve_owner("staging") is None
658-
assert config.resolve_owner("dev_bob") is None
662+
assert config.resolve_owner("staging", mock_adapter) is None
663+
assert config.resolve_owner("dev_bob", mock_adapter) is None
659664

660665

661666
def test_ownership_config_deserialization_from_dict():
662667
# Simulates YAML/dict-based config loading (as produced by load_config_from_yaml).
668+
mock_adapter = mock.MagicMock()
663669
config = Config(
664670
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
665671
ownership={
@@ -669,15 +675,16 @@ def test_ownership_config_deserialization_from_dict():
669675
}
670676
},
671677
)
672-
assert config.ownership.resolve_owner("prod") == "svc_prod_spn"
673-
assert config.ownership.resolve_owner("dev") == "group:shared-developers"
678+
assert config.ownership.resolve_owner("prod", mock_adapter) == "svc_prod_spn"
679+
assert config.ownership.resolve_owner("dev", mock_adapter) == "group:shared-developers"
674680

675681

676682
def test_ownership_config_nested_update():
677683
# Config.ownership uses UpdateStrategy.NESTED_UPDATE.
678684
# When two Configs are merged, the second one's environment_owner_mapping
679685
# replaces the first's (REPLACE semantics within OwnershipConfig since
680686
# environment_owner_mapping has no explicit strategy).
687+
mock_adapter = mock.MagicMock()
681688
c1 = Config(
682689
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
683690
ownership=OwnershipConfig(environment_owner_mapping={"^prod$": "spn_prod"}),
@@ -688,15 +695,16 @@ def test_ownership_config_nested_update():
688695
)
689696
merged = c1.update_with(c2)
690697
# c2's mapping fully replaces c1's — the ^prod$ pattern is gone
691-
assert merged.ownership.resolve_owner("prod") == "grp_devs"
692-
assert merged.ownership.resolve_owner("dev_alice") == "grp_devs"
698+
assert merged.ownership.resolve_owner("prod", mock_adapter) == "grp_devs"
699+
assert merged.ownership.resolve_owner("dev_alice", mock_adapter) == "grp_devs"
693700

694701

695702
def test_config_ownership_defaults_to_empty():
696703
# Configs without an explicit ownership block have a no-op OwnershipConfig.
704+
mock_adapter = mock.MagicMock()
697705
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
698706
assert config.ownership.environment_owner_mapping == {}
699-
assert config.ownership.resolve_owner("prod") is None
707+
assert config.ownership.resolve_owner("prod", mock_adapter) is None
700708

701709

702710
def test_ownership_config_physical_owner():
@@ -718,7 +726,70 @@ def test_ownership_config_physical_owner_deserialization():
718726
},
719727
)
720728
assert config.ownership.physical_owner == "group:data-platform"
721-
assert config.ownership.resolve_owner("prod") == "svc_prod"
729+
assert config.ownership.resolve_owner("prod", mock.MagicMock()) == "svc_prod"
730+
731+
732+
def test_ownership_config_resolve_owner_callable():
733+
# A callable resolver takes precedence over environment_owner_mapping and
734+
# receives (env_name, adapter) so it can call adapter.current_user() etc.
735+
mock_adapter = mock.MagicMock()
736+
mock_adapter.current_user.return_value = "spn-dynamic-uuid"
737+
738+
config = OwnershipConfig(
739+
environment_owner_mapping={".*": "group:fallback"},
740+
environment_owner_resolver=lambda env, adapter: (
741+
adapter.current_user() if env == "prod" else "group:shared-developers"
742+
),
743+
)
744+
745+
assert config.resolve_owner("prod", mock_adapter) == "spn-dynamic-uuid"
746+
assert config.resolve_owner("dev_alice", mock_adapter) == "group:shared-developers"
747+
mock_adapter.current_user.assert_called_once()
748+
749+
750+
def test_ownership_config_resolver_overrides_mapping():
751+
# Resolver always wins when set, even if the mapping would also match.
752+
mock_adapter = mock.MagicMock()
753+
config = OwnershipConfig(
754+
environment_owner_mapping={"^prod$": "static-owner"},
755+
environment_owner_resolver=lambda env, adapter: "dynamic-owner",
756+
)
757+
assert config.resolve_owner("prod", mock_adapter) == "dynamic-owner"
758+
759+
760+
def test_ownership_config_resolve_physical_owner_callable():
761+
mock_adapter = mock.MagicMock()
762+
mock_adapter.current_user.return_value = "spn-uuid-123"
763+
764+
config = OwnershipConfig(
765+
physical_owner_resolver=lambda adapter: adapter.current_user(),
766+
)
767+
assert config.resolve_physical_owner(mock_adapter) == "spn-uuid-123"
768+
mock_adapter.current_user.assert_called_once()
769+
770+
771+
def test_ownership_config_resolve_physical_owner_static():
772+
mock_adapter = mock.MagicMock()
773+
config = OwnershipConfig(physical_owner="group:data-platform")
774+
assert config.resolve_physical_owner(mock_adapter) == "group:data-platform"
775+
mock_adapter.current_user.assert_not_called()
776+
777+
778+
def test_ownership_config_physical_owner_resolver_overrides_static():
779+
mock_adapter = mock.MagicMock()
780+
config = OwnershipConfig(
781+
physical_owner="static-owner",
782+
physical_owner_resolver=lambda adapter: "dynamic-owner",
783+
)
784+
assert config.resolve_physical_owner(mock_adapter) == "dynamic-owner"
785+
786+
787+
def test_ownership_config_is_active():
788+
assert not OwnershipConfig().is_active
789+
assert OwnershipConfig(environment_owner_mapping={".*": "grp"}).is_active
790+
assert OwnershipConfig(environment_owner_resolver=lambda e, a: None).is_active
791+
assert OwnershipConfig(physical_owner="grp").is_active
792+
assert OwnershipConfig(physical_owner_resolver=lambda a: "grp").is_active
722793

723794

724795
def test_load_model_defaults_audits(tmp_path):

0 commit comments

Comments
 (0)