Skip to content

Commit cc82ab0

Browse files
add force-delete flag to delete from state even on failures
Signed-off-by: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com>
1 parent 96bf873 commit cc82ab0

5 files changed

Lines changed: 123 additions & 21 deletions

File tree

sqlmesh/cli/main.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,16 +631,22 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
631631
is_flag=True,
632632
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
633633
)
634+
@click.option(
635+
"--force-delete",
636+
is_flag=True,
637+
help="Delete expired environment and snapshot state records even when the physical table or view drops fail. "
638+
"Any objects that could not be dropped become orphaned and must be removed manually.",
639+
)
634640
@click.pass_context
635641
@error_handler
636642
@cli_analytics
637-
def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None:
643+
def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None:
638644
"""
639645
Run the janitor process on-demand.
640646
641647
The janitor cleans up old environments and expired snapshots.
642648
"""
643-
ctx.obj.run_janitor(ignore_ttl, **kwargs)
649+
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs)
644650

645651

646652
@cli.command("destroy")

sqlmesh/core/context.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -887,12 +887,12 @@ def _has_environment_changed() -> bool:
887887
return completion_status
888888

889889
@python_api_analytics
890-
def run_janitor(self, ignore_ttl: bool) -> bool:
890+
def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool:
891891
success = False
892892

893893
if self.console.start_cleanup(ignore_ttl):
894894
try:
895-
self._run_janitor(ignore_ttl)
895+
self._run_janitor(ignore_ttl, force_delete=force_delete)
896896
success = True
897897
finally:
898898
self.console.stop_cleanup(success=success)
@@ -2896,19 +2896,22 @@ def _destroy(self) -> bool:
28962896

28972897
return True
28982898

2899-
def _run_janitor(self, ignore_ttl: bool = False) -> None:
2899+
def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None:
29002900
current_ts = now_timestamp()
29012901
failures: t.List[str] = []
29022902

29032903
# Clean up expired environments by removing their views and schemas
2904-
failures.extend(self._cleanup_environments(current_ts=current_ts))
2904+
failures.extend(
2905+
self._cleanup_environments(current_ts=current_ts, force_delete=force_delete)
2906+
)
29052907

29062908
failures.extend(
29072909
delete_expired_snapshots(
29082910
self.state_sync,
29092911
self.snapshot_evaluator,
29102912
current_ts=current_ts,
29112913
ignore_ttl=ignore_ttl,
2914+
force_delete=force_delete,
29122915
console=self.console,
29132916
batch_size=self.config.janitor.expired_snapshots_batch_size,
29142917
)
@@ -2918,12 +2921,16 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
29182921
if failures:
29192922
failure_string = "\n - ".join(failures)
29202923
summary = f"Janitor completed with failures:\n {failure_string}"
2924+
if force_delete:
2925+
summary += "\nState records have been deleted, but the underlying objects may still exist in the database.\nPlease investigate and clean up manually the above if necessary."
29212926
if self.config.janitor.warn_on_delete_failure:
29222927
self.console.log_warning(summary)
29232928
else:
29242929
raise SQLMeshError(summary)
29252930

2926-
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> t.List[str]:
2931+
def _cleanup_environments(
2932+
self, current_ts: t.Optional[int] = None, force_delete: bool = False
2933+
) -> t.List[str]:
29272934
current_ts = current_ts or now_timestamp()
29282935
failures: t.List[str] = []
29292936

@@ -2944,8 +2951,9 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> t.List[st
29442951
)
29452952
)
29462953

2947-
# we want to retry on the next janitor pass if drops failed
2948-
if not failures:
2954+
# we want to retry on the next janitor pass if drops failed, unless
2955+
# force_delete is set in which case we purge state records regardless
2956+
if not failures or force_delete:
29492957
self.state_sync.delete_expired_environments(current_ts=current_ts)
29502958
return failures
29512959

sqlmesh/core/janitor.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def delete_expired_snapshots(
125125
*,
126126
current_ts: int,
127127
ignore_ttl: bool = False,
128+
force_delete: bool = False,
128129
batch_size: t.Optional[int] = None,
129130
console: t.Optional[Console] = None,
130131
) -> t.List[str]:
@@ -138,6 +139,7 @@ def delete_expired_snapshots(
138139
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
139140
current_ts: Timestamp used to evaluate expiration.
140141
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
142+
force_delete: If True, delete snapshot state records even when physical table cleanup fails.
141143
batch_size: Maximum number of snapshots to fetch per batch.
142144
console: Optional console for reporting progress.
143145
@@ -162,23 +164,32 @@ def delete_expired_snapshots(
162164
len(batch.expired_snapshot_ids),
163165
end_info,
164166
)
167+
cleanup_succeeded = True
165168
try:
166169
snapshot_evaluator.cleanup(
167170
target_snapshots=batch.cleanup_tasks,
168171
on_complete=console.update_cleanup_progress if console else None,
169172
)
170-
state_sync.delete_expired_snapshots(
171-
batch_range=ExpiredBatchRange(
172-
start=RowBoundary.lowest_boundary(),
173-
end=batch.batch_range.end,
174-
),
175-
ignore_ttl=ignore_ttl,
176-
)
177-
logger.info("Cleaned up expired snapshots batch")
178-
num_expired_snapshots += len(batch.expired_snapshot_ids)
179-
except Exception as e:
180-
message = f"Failed to clean up an expired snapshots batch: {e}"
173+
except Exception as failed_drops:
174+
message = f"Failed to clean up: {failed_drops}"
181175
logger.warning(message)
182176
failures.append(message)
177+
cleanup_succeeded = False
178+
179+
if cleanup_succeeded or force_delete:
180+
try:
181+
state_sync.delete_expired_snapshots(
182+
batch_range=ExpiredBatchRange(
183+
start=RowBoundary.lowest_boundary(),
184+
end=batch.batch_range.end,
185+
),
186+
ignore_ttl=ignore_ttl,
187+
)
188+
logger.info("Cleaned up expired snapshots batch")
189+
num_expired_snapshots += len(batch.expired_snapshot_ids)
190+
except Exception as e:
191+
message = f"Failed to delete expired snapshot state records: {e}"
192+
logger.warning(message)
193+
failures.append(message)
183194
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
184195
return failures

sqlmesh/core/snapshot/evaluator.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ def cleanup(
547547
t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets
548548
}
549549
with self.concurrent_context():
550-
concurrent_apply_to_snapshots(
550+
errors, _ = concurrent_apply_to_snapshots(
551551
[t.snapshot for t in filtered_targets],
552552
lambda s: self._cleanup_snapshot(
553553
s,
@@ -557,7 +557,11 @@ def cleanup(
557557
),
558558
self.ddl_concurrent_tasks,
559559
reverse_order=True,
560+
raise_on_error=False,
560561
)
562+
if errors:
563+
errored_snapshots = "\n".join(f" {e.node.name}: {e.__cause__}" for e in errors)
564+
raise SQLMeshError(f"\n{errored_snapshots}")
561565

562566
def audit(
563567
self,

tests/core/integration/test_aux_commands.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,79 @@ def test_janitor_warn_on_delete_failure_downgrades_aggregated_error(
224224
assert "Janitor completed with failures" in warn_spy.call_args[0][0]
225225

226226

227+
def test_janitor_force_delete_removes_environment_state_despite_drop_failure(
228+
mocker: MockerFixture, tmp_path: Path
229+
):
230+
models_dir = tmp_path / "models"
231+
models_dir.mkdir()
232+
(models_dir / "model1.sql").write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")
233+
234+
ctx = Context(
235+
paths=[tmp_path],
236+
config=Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")),
237+
)
238+
ctx.plan("dev", no_prompts=True, auto_apply=True)
239+
ctx.invalidate_environment("dev")
240+
241+
mocker.patch(
242+
"sqlmesh.core.context.cleanup_expired_views",
243+
return_value=["view drop error"],
244+
)
245+
mocker.patch(
246+
"sqlmesh.core.janitor.iter_expired_snapshot_batches",
247+
return_value=iter([]),
248+
)
249+
250+
# without force_delete the environment is retained for retry
251+
with pytest.raises(SQLMeshError):
252+
ctx._run_janitor(ignore_ttl=True, force_delete=False)
253+
assert ctx.state_sync.get_environment("dev") is not None
254+
255+
# with force_delete the environment state is purged even though drops failed
256+
with pytest.raises(SQLMeshError):
257+
ctx._run_janitor(ignore_ttl=True, force_delete=True)
258+
assert ctx.state_sync.get_environment("dev") is None
259+
260+
261+
def test_janitor_force_delete_removes_snapshot_state_despite_cleanup_failure(
262+
mocker: MockerFixture, tmp_path: Path
263+
):
264+
models_dir = tmp_path / "models"
265+
models_dir.mkdir()
266+
model1_path = models_dir / "model1.sql"
267+
model1_path.write_text("MODEL(name test.model1, kind FULL); SELECT 1 AS col")
268+
269+
# using warn_on_delete_failure so the janitor completes and we can inspect the state after
270+
ctx = Context(
271+
paths=[tmp_path],
272+
config=Config(
273+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
274+
janitor=JanitorConfig(warn_on_delete_failure=True),
275+
),
276+
)
277+
ctx.plan("dev", no_prompts=True, auto_apply=True)
278+
model1_snapshot = ctx.get_snapshot("test.model1")
279+
280+
# simulating a zombie snapshot
281+
model1_path.unlink()
282+
ctx.load()
283+
ctx.plan("dev", no_prompts=True, auto_apply=True)
284+
ctx.invalidate_environment("dev")
285+
286+
mocker.patch(
287+
"sqlmesh.core.snapshot.evaluator.SnapshotEvaluator.cleanup",
288+
side_effect=Exception("table cleanup error"),
289+
)
290+
291+
# without force_delete the snapshot state is retained for retry
292+
ctx._run_janitor(ignore_ttl=True, force_delete=False)
293+
assert ctx.state_sync.get_snapshots([model1_snapshot.snapshot_id]) # type: ignore
294+
295+
# with force_delete the snapshot state record is purged even though cleanup failed
296+
ctx._run_janitor(ignore_ttl=True, force_delete=True)
297+
assert not ctx.state_sync.get_snapshots([model1_snapshot.snapshot_id]) # type: ignore
298+
299+
227300
@use_terminal_console
228301
def test_destroy(copy_to_temp_path):
229302
# Testing project with two gateways to verify cleanup is performed across engines

0 commit comments

Comments
 (0)