Skip to content

Commit 96bf873

Browse files
Fix: Make the janitor best effort
Signed-off-by: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com>
1 parent 41d9906 commit 96bf873

4 files changed

Lines changed: 183 additions & 55 deletions

File tree

sqlmesh/core/context.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2898,22 +2898,34 @@ def _destroy(self) -> bool:
28982898

28992899
def _run_janitor(self, ignore_ttl: bool = False) -> None:
29002900
current_ts = now_timestamp()
2901+
failures: t.List[str] = []
29012902

29022903
# Clean up expired environments by removing their views and schemas
2903-
self._cleanup_environments(current_ts=current_ts)
2904-
2905-
delete_expired_snapshots(
2906-
self.state_sync,
2907-
self.snapshot_evaluator,
2908-
current_ts=current_ts,
2909-
ignore_ttl=ignore_ttl,
2910-
console=self.console,
2911-
batch_size=self.config.janitor.expired_snapshots_batch_size,
2904+
failures.extend(self._cleanup_environments(current_ts=current_ts))
2905+
2906+
failures.extend(
2907+
delete_expired_snapshots(
2908+
self.state_sync,
2909+
self.snapshot_evaluator,
2910+
current_ts=current_ts,
2911+
ignore_ttl=ignore_ttl,
2912+
console=self.console,
2913+
batch_size=self.config.janitor.expired_snapshots_batch_size,
2914+
)
29122915
)
29132916
self.state_sync.compact_intervals()
29142917

2915-
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
2918+
if failures:
2919+
failure_string = "\n - ".join(failures)
2920+
summary = f"Janitor completed with failures:\n {failure_string}"
2921+
if self.config.janitor.warn_on_delete_failure:
2922+
self.console.log_warning(summary)
2923+
else:
2924+
raise SQLMeshError(summary)
2925+
2926+
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> t.List[str]:
29162927
current_ts = current_ts or now_timestamp()
2928+
failures: t.List[str] = []
29172929

29182930
expired_environments_summaries = self.state_sync.get_expired_environments(
29192931
current_ts=current_ts
@@ -2923,15 +2935,19 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
29232935
expired_env = self.state_reader.get_environment(expired_env_summary.name)
29242936

29252937
if expired_env:
2926-
cleanup_expired_views(
2927-
default_adapter=self.engine_adapter,
2928-
engine_adapters=self.engine_adapters,
2929-
environments=[expired_env],
2930-
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
2931-
console=self.console,
2938+
failures.extend(
2939+
cleanup_expired_views(
2940+
default_adapter=self.engine_adapter,
2941+
engine_adapters=self.engine_adapters,
2942+
environments=[expired_env],
2943+
console=self.console,
2944+
)
29322945
)
29332946

2934-
self.state_sync.delete_expired_environments(current_ts=current_ts)
2947+
# we want to retry on the next janitor pass if drops failed
2948+
if not failures:
2949+
self.state_sync.delete_expired_environments(current_ts=current_ts)
2950+
return failures
29352951

29362952
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
29372953
connection_name = connection_name.capitalize()

sqlmesh/core/janitor.py

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
RowBoundary,
1717
ExpiredBatchRange,
1818
)
19-
from sqlmesh.utils.errors import SQLMeshError
2019

2120

2221
def cleanup_expired_views(
2322
default_adapter: EngineAdapter,
2423
engine_adapters: t.Dict[str, EngineAdapter],
2524
environments: t.List[Environment],
26-
warn_on_delete_failure: bool = False,
2725
console: t.Optional[Console] = None,
28-
) -> None:
26+
) -> t.List[str]:
27+
failures: t.List[str] = []
28+
2929
expired_schema_or_catalog_environments = [
3030
environment
3131
for environment in environments
@@ -85,10 +85,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
8585
console.update_cleanup_progress(expired_view)
8686
except Exception as e:
8787
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
88-
if warn_on_delete_failure:
89-
logger.warning(message)
90-
else:
91-
raise SQLMeshError(message) from e
88+
logger.warning(message)
89+
failures.append(message)
9290

9391
# Drop the schemas for the expired environments
9492
for engine_adapter, schema in schemas_to_drop:
@@ -102,10 +100,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
102100
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
103101
except Exception as e:
104102
message = f"Failed to drop the expired environment schema '{schema}': {e}"
105-
if warn_on_delete_failure:
106-
logger.warning(message)
107-
else:
108-
raise SQLMeshError(message) from e
103+
logger.warning(message)
104+
failures.append(message)
109105

110106
# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
111107
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
@@ -117,10 +113,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
117113
console.update_cleanup_progress(catalog)
118114
except Exception as e:
119115
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
120-
if warn_on_delete_failure:
121-
logger.warning(message)
122-
else:
123-
raise SQLMeshError(message) from e
116+
logger.warning(message)
117+
failures.append(message)
118+
119+
return failures
124120

125121

126122
def delete_expired_snapshots(
@@ -131,7 +127,7 @@ def delete_expired_snapshots(
131127
ignore_ttl: bool = False,
132128
batch_size: t.Optional[int] = None,
133129
console: t.Optional[Console] = None,
134-
) -> None:
130+
) -> t.List[str]:
135131
"""Delete all expired snapshots in batches.
136132
137133
This helper function encapsulates the logic for deleting expired snapshots in batches,
@@ -146,8 +142,9 @@ def delete_expired_snapshots(
146142
console: Optional console for reporting progress.
147143
148144
Returns:
149-
The total number of deleted expired snapshots.
145+
List of failure messages so callers can surface them at the end of the janitor run.
150146
"""
147+
failures: t.List[str] = []
151148
num_expired_snapshots = 0
152149
for batch in iter_expired_snapshot_batches(
153150
state_reader=state_sync,
@@ -165,17 +162,23 @@ def delete_expired_snapshots(
165162
len(batch.expired_snapshot_ids),
166163
end_info,
167164
)
168-
snapshot_evaluator.cleanup(
169-
target_snapshots=batch.cleanup_tasks,
170-
on_complete=console.update_cleanup_progress if console else None,
171-
)
172-
state_sync.delete_expired_snapshots(
173-
batch_range=ExpiredBatchRange(
174-
start=RowBoundary.lowest_boundary(),
175-
end=batch.batch_range.end,
176-
),
177-
ignore_ttl=ignore_ttl,
178-
)
179-
logger.info("Cleaned up expired snapshots batch")
180-
num_expired_snapshots += len(batch.expired_snapshot_ids)
165+
try:
166+
snapshot_evaluator.cleanup(
167+
target_snapshots=batch.cleanup_tasks,
168+
on_complete=console.update_cleanup_progress if console else None,
169+
)
170+
state_sync.delete_expired_snapshots(
171+
batch_range=ExpiredBatchRange(
172+
start=RowBoundary.lowest_boundary(),
173+
end=batch.batch_range.end,
174+
),
175+
ignore_ttl=ignore_ttl,
176+
)
177+
logger.info("Cleaned up expired snapshots batch")
178+
num_expired_snapshots += len(batch.expired_snapshot_ids)
179+
except Exception as e:
180+
message = f"Failed to clean up an expired snapshots batch: {e}"
181+
logger.warning(message)
182+
failures.append(message)
181183
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
184+
return failures

tests/core/integration/test_aux_commands.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
ModelDefaultsConfig,
1616
DuckDBConnectionConfig,
1717
)
18+
from sqlmesh.core.config.janitor import JanitorConfig
1819
from sqlmesh.core.context import Context
1920
from sqlmesh.core.model import (
2021
SqlModel,
@@ -146,7 +147,8 @@ def setup_scenario():
146147
# Case 2: Assume that the view cleanup yields an error, the enviroment
147148
# record should still exist
148149
mocker.patch(
149-
"sqlmesh.core.context.cleanup_expired_views", side_effect=Exception("view cleanup error")
150+
"sqlmesh.core.context.cleanup_expired_views",
151+
return_value=["view cleanup error"],
150152
)
151153
ctx, model1_snapshot = setup_scenario()
152154

@@ -157,13 +159,71 @@ def setup_scenario():
157159
assert ctx.state_sync.get_environment("dev")
158160

159161
# - Run the janitor again, this time it should succeed
160-
mocker.patch("sqlmesh.core.context.cleanup_expired_views")
162+
mocker.patch("sqlmesh.core.context.cleanup_expired_views", return_value=[])
161163
ctx._run_janitor(ignore_ttl=True)
162164

163165
# - Check that the environment record does not exist in the state sync anymore
164166
assert not ctx.state_sync.get_environment("dev")
165167

166168

169+
def test_janitor_aggregates_failures_into_single_error(mocker: MockerFixture, tmp_path: Path):
170+
models_dir = tmp_path / "models"
171+
models_dir.mkdir()
172+
(models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")
173+
174+
ctx = Context(
175+
paths=[tmp_path],
176+
config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")),
177+
)
178+
ctx.plan("dev", no_prompts=True, auto_apply=True)
179+
ctx.invalidate_environment("dev")
180+
181+
mocker.patch(
182+
"sqlmesh.core.context.cleanup_expired_views",
183+
return_value=["view drop error A", "view drop error B"],
184+
)
185+
mocker.patch(
186+
"sqlmesh.core.janitor.iter_expired_snapshot_batches",
187+
return_value=iter([]),
188+
)
189+
190+
with pytest.raises(SQLMeshError, match="Janitor completed with failures"):
191+
ctx._run_janitor(ignore_ttl=True)
192+
193+
194+
def test_janitor_warn_on_delete_failure_downgrades_aggregated_error(
195+
mocker: MockerFixture, tmp_path: Path
196+
):
197+
models_dir = tmp_path / "models"
198+
models_dir.mkdir()
199+
(models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")
200+
201+
ctx = Context(
202+
paths=[tmp_path],
203+
config=Config(
204+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
205+
janitor=JanitorConfig(warn_on_delete_failure=True),
206+
),
207+
)
208+
ctx.plan("dev", no_prompts=True, auto_apply=True)
209+
ctx.invalidate_environment("dev")
210+
211+
mocker.patch(
212+
"sqlmesh.core.context.cleanup_expired_views",
213+
return_value=["view drop error"],
214+
)
215+
mocker.patch(
216+
"sqlmesh.core.janitor.iter_expired_snapshot_batches",
217+
return_value=iter([]),
218+
)
219+
220+
warn_spy = mocker.patch.object(ctx.console, "log_warning")
221+
222+
ctx._run_janitor(ignore_ttl=True)
223+
assert warn_spy.called
224+
assert "Janitor completed with failures" in warn_spy.call_args[0][0]
225+
226+
167227
@use_terminal_console
168228
def test_destroy(copy_to_temp_path):
169229
# Testing project with two gateways to verify cleanup is performed across engines

tests/core/test_janitor.py

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
)
2323
from sqlmesh.core.janitor import cleanup_expired_views, delete_expired_snapshots
2424
from sqlmesh.utils.date import now_timestamp
25-
from sqlmesh.utils.errors import SQLMeshError
2625

2726
pytestmark = pytest.mark.slow
2827

@@ -101,7 +100,7 @@ def test_cleanup_expired_views(mocker: MockerFixture, make_snapshot: t.Callable)
101100
@pytest.mark.parametrize(
102101
"suffix_target", [EnvironmentSuffixTarget.SCHEMA, EnvironmentSuffixTarget.TABLE]
103102
)
104-
def test_cleanup_expired_environment_schema_warn_on_delete_failure(
103+
def test_cleanup_expired_views_collects_failures(
105104
mocker: MockerFixture, make_snapshot: t.Callable, suffix_target: EnvironmentSuffixTarget
106105
):
107106
adapter = mocker.MagicMock()
@@ -124,17 +123,67 @@ def test_cleanup_expired_environment_schema_warn_on_delete_failure(
124123
catalog_name_override="catalog_override",
125124
)
126125

127-
with pytest.raises(SQLMeshError, match="Failed to drop the expired environment .*"):
128-
cleanup_expired_views(adapter, {}, [schema_environment], warn_on_delete_failure=False)
129-
130-
cleanup_expired_views(adapter, {}, [schema_environment], warn_on_delete_failure=True)
126+
# Janitor is now best-effort: failures are returned, not raised.
127+
failures = cleanup_expired_views(adapter, {}, [schema_environment])
128+
assert len(failures) == 1
129+
assert "Failed to drop the expired environment" in failures[0]
131130

132131
if suffix_target == EnvironmentSuffixTarget.SCHEMA:
133132
assert adapter.drop_schema.called
134133
else:
135134
assert adapter.drop_view.called
136135

137136

137+
def test_cleanup_expired_views_continues_past_failures(
138+
mocker: MockerFixture, make_snapshot: t.Callable
139+
):
140+
adapter = mocker.MagicMock()
141+
adapter.dialect = None
142+
143+
snapshot_failing = make_snapshot(
144+
SqlModel(name="catalog.schema.failing", query=parse_one("select 1"))
145+
)
146+
snapshot_failing.categorize_as(SnapshotChangeCategory.BREAKING)
147+
snapshot_succeeding = make_snapshot(
148+
SqlModel(name="catalog.schema.succeeding", query=parse_one("select 1"))
149+
)
150+
snapshot_succeeding.categorize_as(SnapshotChangeCategory.BREAKING)
151+
152+
failing_env = Environment(
153+
name="failing_env",
154+
suffix_target=EnvironmentSuffixTarget.TABLE,
155+
snapshots=[snapshot_failing.table_info],
156+
start_at="2022-01-01",
157+
end_at="2022-01-01",
158+
plan_id="p",
159+
previous_plan_id="p",
160+
)
161+
succeeding_env = Environment(
162+
name="succeeding_env",
163+
suffix_target=EnvironmentSuffixTarget.TABLE,
164+
snapshots=[snapshot_succeeding.table_info],
165+
start_at="2022-01-01",
166+
end_at="2022-01-01",
167+
plan_id="p",
168+
previous_plan_id="p",
169+
)
170+
171+
def drop_view_side_effect(view, ignore_if_not_exists=True):
172+
if "failing" in str(view):
173+
raise Exception("boom")
174+
175+
adapter.drop_view.side_effect = drop_view_side_effect
176+
177+
failures = cleanup_expired_views(adapter, {}, [failing_env, succeeding_env])
178+
179+
# Both drops were attempted
180+
assert adapter.drop_view.call_count == 2
181+
182+
# Only the failing one is reported
183+
assert len(failures) == 1
184+
assert "failing" in failures[0]
185+
186+
138187
def test_delete_expired_snapshots_common_function_batching(
139188
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable, mocker: MockerFixture
140189
):

0 commit comments

Comments
 (0)