Skip to content

Commit c01f7f4

Browse files
authored
Merge branch 'main' into add-databricks-query-tags-session-properties
2 parents 17074af + 58bed01 commit c01f7f4

5 files changed

Lines changed: 116 additions & 4 deletions

File tree

sqlmesh/core/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,6 +2495,7 @@ def create_external_models(self, strict: bool = False) -> None:
24952495
gateway=external_models_gateway,
24962496
max_workers=self.concurrent_tasks,
24972497
strict=strict,
2498+
all_models=self._models,
24982499
)
24992500

25002501
@python_api_analytics

sqlmesh/core/schema_loader.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,31 @@ def create_external_models_file(
2424
gateway: t.Optional[str] = None,
2525
max_workers: int = 1,
2626
strict: bool = False,
27+
all_models: t.Optional[t.Dict[str, Model]] = None,
2728
) -> None:
2829
"""Create or replace a YAML file with column and types of all columns in all external models.
2930
3031
Args:
3132
path: The path to store the YAML file.
32-
models: FQN to model
33+
models: FQN to model for the current repo/config being processed.
3334
adapter: The engine adapter.
3435
state_reader: The state reader.
3536
dialect: The dialect to serialize the schema as.
3637
gateway: If the model should be associated with a specific gateway; the gateway key
3738
max_workers: The max concurrent workers to fetch columns.
3839
strict: If True, raise an error if the external model is missing in the database.
40+
all_models: FQN to model across all loaded repos. When provided, a dependency is only
41+
classified as external if it is absent from this full set. This prevents cross-repo
42+
internal models from being misclassified as external in multi-repo setups.
3943
"""
44+
known_models: t.Dict[str, Model] = all_models if all_models is not None else models
4045
external_model_fqns = set()
4146

4247
for fqn, model in models.items():
4348
if model.kind.is_external:
4449
external_model_fqns.add(fqn)
4550
for dep in model.depends_on:
46-
if dep not in models:
51+
if dep not in known_models:
4752
external_model_fqns.add(dep)
4853

4954
# Make sure we don't convert internal models into external ones.

sqlmesh/core/snapshot/evaluator.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,12 +528,27 @@ def cleanup(
528528
target_snapshots = [
529529
t for t in target_snapshots if t.snapshot.is_model and not t.snapshot.is_symbolic
530530
]
531+
available_gateways = set(self.adapters.keys())
532+
skipped = []
533+
filtered_targets = []
534+
for t in target_snapshots:
535+
gw = t.snapshot.model_gateway
536+
if gw and gw not in available_gateways:
537+
skipped.append((t.snapshot.snapshot_id, gw))
538+
else:
539+
filtered_targets.append(t)
540+
if skipped:
541+
logger.warning(
542+
"Skipping cleanup of %d snapshot(s) with unavailable gateway(s): %s",
543+
len(skipped),
544+
", ".join(f"{sid} (gateway={gw})" for sid, gw in skipped),
545+
)
531546
snapshots_to_dev_table_only = {
532-
t.snapshot.snapshot_id: t.dev_table_only for t in target_snapshots
547+
t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets
533548
}
534549
with self.concurrent_context():
535550
concurrent_apply_to_snapshots(
536-
[t.snapshot for t in target_snapshots],
551+
[t.snapshot for t in filtered_targets],
537552
lambda s: self._cleanup_snapshot(
538553
s,
539554
snapshots_to_dev_table_only[s.snapshot_id],

tests/core/integration/test_multi_repo.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222
from sqlmesh.core.console import get_console
2323
from sqlmesh.core.context import Context
24+
from sqlmesh.utils import yaml
2425
from sqlmesh.utils.date import now
2526
from tests.conftest import DuckDBMetadata
2627
from tests.utils.test_helpers import use_terminal_console
@@ -559,3 +560,58 @@ def test_engine_adapters_multi_repo_all_gateways_gathered(copy_to_temp_path):
559560
gathered_gateways = context.engine_adapters.keys()
560561
expected_gateways = {"local", "memory", "extra"}
561562
assert gathered_gateways == expected_gateways
563+
564+
565+
@use_terminal_console
566+
def test_multi_repo_create_external_models(copy_to_temp_path):
567+
"""create_external_models should not classify cross-repo models as external (sqlmesh#5326).
568+
569+
silver.c and silver.e (repo_2) depend on bronze.a (repo_1). When running
570+
create_external_models with both repos loaded, bronze.a must NOT be treated
571+
as an external model because it is an internal model defined in repo_1.
572+
573+
The observable symptom of the bug is a warning: "Unable to get schema for
574+
'bronze.a'" — because SQLMesh tries to query the schema of what it wrongly
575+
thinks is an external table. A correct implementation never attempts this
576+
lookup and therefore emits no such warning.
577+
"""
578+
paths = copy_to_temp_path("examples/multi")
579+
repo_1_path = f"{paths[0]}/repo_1"
580+
repo_2_path = f"{paths[0]}/repo_2"
581+
582+
context = Context(paths=[repo_1_path, repo_2_path], gateway="memory")
583+
context._new_state_sync().reset(default_catalog=context.default_catalog)
584+
585+
with patch.object(context.console, "log_warning") as mock_warning:
586+
context.create_external_models()
587+
588+
warning_messages = [str(call) for call in mock_warning.call_args_list]
589+
schema_lookup_warnings = [
590+
msg
591+
for msg in warning_messages
592+
if "bronze" in msg and "a" in msg and "schema" in msg.lower()
593+
]
594+
assert not schema_lookup_warnings, (
595+
"bronze.a should not be looked up as an external model, but got warnings: "
596+
+ str(schema_lookup_warnings)
597+
)
598+
599+
# repo_2's external_models.yaml must not contain bronze.a
600+
repo_2_external = Path(repo_2_path) / c.EXTERNAL_MODELS_YAML
601+
if repo_2_external.exists():
602+
contents = yaml.load(repo_2_external)
603+
external_names = [e["name"] for e in contents]
604+
assert not any("bronze" in name and "a" in name for name in external_names), (
605+
f"bronze.a should not be in repo_2's external models, but found: {external_names}"
606+
)
607+
608+
# repo_1 has no external dependencies at all
609+
repo_1_external = Path(repo_1_path) / c.EXTERNAL_MODELS_YAML
610+
if repo_1_external.exists():
611+
contents = yaml.load(repo_1_external)
612+
assert len(contents) == 0, f"repo_1 should have no external models, got: {contents}"
613+
614+
# Plan should still resolve all 5 models as internal after create_external_models
615+
context.load()
616+
plan = context.plan_builder().build()
617+
assert len(plan.new_snapshots) == 5

tests/core/test_snapshot_evaluator.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4503,6 +4503,41 @@ def test_multiple_engine_cleanup(snapshot: Snapshot, adapters, make_snapshot):
45034503
)
45044504

45054505

4506+
def test_cleanup_skips_unavailable_gateway(snapshot: Snapshot, adapters, make_snapshot):
4507+
engine_adapters = {"default": adapters[0]}
4508+
evaluator = SnapshotEvaluator(engine_adapters)
4509+
4510+
model_with_missing_gw = load_sql_based_model(
4511+
parse( # type: ignore
4512+
"""
4513+
MODEL (
4514+
name test_schema.test_model,
4515+
kind FULL,
4516+
gateway nonexistent_gateway,
4517+
);
4518+
SELECT a::int FROM tbl;
4519+
"""
4520+
),
4521+
)
4522+
4523+
snapshot_missing_gw = make_snapshot(model_with_missing_gw)
4524+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
4525+
snapshot_missing_gw.categorize_as(SnapshotChangeCategory.BREAKING)
4526+
4527+
evaluator.create([snapshot], {}, DeployabilityIndex.all_deployable())
4528+
4529+
evaluator.cleanup(
4530+
[
4531+
SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True),
4532+
SnapshotTableCleanupTask(snapshot=snapshot_missing_gw.table_info, dev_table_only=True),
4533+
],
4534+
)
4535+
4536+
engine_adapters["default"].drop_table.assert_called_once_with(
4537+
f"sqlmesh__db.db__model__{snapshot.version}__dev", cascade=True
4538+
)
4539+
4540+
45064541
def test_multi_engine_python_model_with_macros(adapters, make_snapshot):
45074542
engine_adapters = {"default": adapters[0], "secondary": adapters[1]}
45084543
evaluator = SnapshotEvaluator(engine_adapters)

0 commit comments

Comments
 (0)