Skip to content

Commit c5dbce5

Browse files
Fix: Make the janitor best effort and introduce force-delete option (#5802)
Signed-off-by: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com>
1 parent 41d9906 commit c5dbce5

7 files changed

Lines changed: 292 additions & 62 deletions

File tree

sqlmesh/cli/main.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,16 +631,22 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
631631
is_flag=True,
632632
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
633633
)
634+
@click.option(
635+
"--force-delete",
636+
is_flag=True,
637+
help="Delete expired environment and snapshot state records even when the physical table or view drops fail. "
638+
"Any objects that could not be dropped become orphaned and must be removed manually.",
639+
)
634640
@click.pass_context
635641
@error_handler
636642
@cli_analytics
637-
def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None:
643+
def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None:
638644
"""
639645
Run the janitor process on-demand.
640646
641647
The janitor cleans up old environments and expired snapshots.
642648
"""
643-
ctx.obj.run_janitor(ignore_ttl, **kwargs)
649+
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs)
644650

645651

646652
@cli.command("destroy")

sqlmesh/core/context.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -887,12 +887,12 @@ def _has_environment_changed() -> bool:
887887
return completion_status
888888

889889
@python_api_analytics
890-
def run_janitor(self, ignore_ttl: bool) -> bool:
890+
def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool:
891891
success = False
892892

893893
if self.console.start_cleanup(ignore_ttl):
894894
try:
895-
self._run_janitor(ignore_ttl)
895+
self._run_janitor(ignore_ttl, force_delete=force_delete)
896896
success = True
897897
finally:
898898
self.console.stop_cleanup(success=success)
@@ -2896,24 +2896,43 @@ def _destroy(self) -> bool:
28962896

28972897
return True
28982898

2899-
def _run_janitor(self, ignore_ttl: bool = False) -> None:
2899+
def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None:
29002900
current_ts = now_timestamp()
2901+
failures: t.List[str] = []
29012902

29022903
# Clean up expired environments by removing their views and schemas
2903-
self._cleanup_environments(current_ts=current_ts)
2904+
failures.extend(
2905+
self._cleanup_environments(current_ts=current_ts, force_delete=force_delete)
2906+
)
29042907

2905-
delete_expired_snapshots(
2906-
self.state_sync,
2907-
self.snapshot_evaluator,
2908-
current_ts=current_ts,
2909-
ignore_ttl=ignore_ttl,
2910-
console=self.console,
2911-
batch_size=self.config.janitor.expired_snapshots_batch_size,
2908+
failures.extend(
2909+
delete_expired_snapshots(
2910+
self.state_sync,
2911+
self.snapshot_evaluator,
2912+
current_ts=current_ts,
2913+
ignore_ttl=ignore_ttl,
2914+
force_delete=force_delete,
2915+
console=self.console,
2916+
batch_size=self.config.janitor.expired_snapshots_batch_size,
2917+
)
29122918
)
29132919
self.state_sync.compact_intervals()
29142920

2915-
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
2921+
if failures:
2922+
failure_string = "\n - ".join(failures)
2923+
summary = f"Janitor completed with failures:\n {failure_string}"
2924+
if force_delete:
2925+
summary += "\nState records have been deleted, but the underlying objects may still exist in the database.\nPlease investigate and clean up manually the above if necessary."
2926+
if self.config.janitor.warn_on_delete_failure:
2927+
self.console.log_warning(summary)
2928+
else:
2929+
raise SQLMeshError(summary)
2930+
2931+
def _cleanup_environments(
2932+
self, current_ts: t.Optional[int] = None, force_delete: bool = False
2933+
) -> t.List[str]:
29162934
current_ts = current_ts or now_timestamp()
2935+
failures: t.List[str] = []
29172936

29182937
expired_environments_summaries = self.state_sync.get_expired_environments(
29192938
current_ts=current_ts
@@ -2923,15 +2942,20 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
29232942
expired_env = self.state_reader.get_environment(expired_env_summary.name)
29242943

29252944
if expired_env:
2926-
cleanup_expired_views(
2927-
default_adapter=self.engine_adapter,
2928-
engine_adapters=self.engine_adapters,
2929-
environments=[expired_env],
2930-
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
2931-
console=self.console,
2945+
failures.extend(
2946+
cleanup_expired_views(
2947+
default_adapter=self.engine_adapter,
2948+
engine_adapters=self.engine_adapters,
2949+
environments=[expired_env],
2950+
console=self.console,
2951+
)
29322952
)
29332953

2934-
self.state_sync.delete_expired_environments(current_ts=current_ts)
2954+
# we want to retry on the next janitor pass if drops failed, unless
2955+
# force_delete is set in which case we purge state records regardless
2956+
if not failures or force_delete:
2957+
self.state_sync.delete_expired_environments(current_ts=current_ts)
2958+
return failures
29352959

29362960
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
29372961
connection_name = connection_name.capitalize()

sqlmesh/core/janitor.py

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
RowBoundary,
1717
ExpiredBatchRange,
1818
)
19-
from sqlmesh.utils.errors import SQLMeshError
2019

2120

2221
def cleanup_expired_views(
2322
default_adapter: EngineAdapter,
2423
engine_adapters: t.Dict[str, EngineAdapter],
2524
environments: t.List[Environment],
26-
warn_on_delete_failure: bool = False,
2725
console: t.Optional[Console] = None,
28-
) -> None:
26+
) -> t.List[str]:
27+
failures: t.List[str] = []
28+
2929
expired_schema_or_catalog_environments = [
3030
environment
3131
for environment in environments
@@ -85,10 +85,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
8585
console.update_cleanup_progress(expired_view)
8686
except Exception as e:
8787
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
88-
if warn_on_delete_failure:
89-
logger.warning(message)
90-
else:
91-
raise SQLMeshError(message) from e
88+
logger.warning(message)
89+
failures.append(message)
9290

9391
# Drop the schemas for the expired environments
9492
for engine_adapter, schema in schemas_to_drop:
@@ -102,10 +100,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
102100
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
103101
except Exception as e:
104102
message = f"Failed to drop the expired environment schema '{schema}': {e}"
105-
if warn_on_delete_failure:
106-
logger.warning(message)
107-
else:
108-
raise SQLMeshError(message) from e
103+
logger.warning(message)
104+
failures.append(message)
109105

110106
# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
111107
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
@@ -117,10 +113,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
117113
console.update_cleanup_progress(catalog)
118114
except Exception as e:
119115
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
120-
if warn_on_delete_failure:
121-
logger.warning(message)
122-
else:
123-
raise SQLMeshError(message) from e
116+
logger.warning(message)
117+
failures.append(message)
118+
119+
return failures
124120

125121

126122
def delete_expired_snapshots(
@@ -129,9 +125,10 @@ def delete_expired_snapshots(
129125
*,
130126
current_ts: int,
131127
ignore_ttl: bool = False,
128+
force_delete: bool = False,
132129
batch_size: t.Optional[int] = None,
133130
console: t.Optional[Console] = None,
134-
) -> None:
131+
) -> t.List[str]:
135132
"""Delete all expired snapshots in batches.
136133
137134
This helper function encapsulates the logic for deleting expired snapshots in batches,
@@ -142,12 +139,14 @@ def delete_expired_snapshots(
142139
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
143140
current_ts: Timestamp used to evaluate expiration.
144141
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
142+
force_delete: If True, delete snapshot state records even when physical table cleanup fails.
145143
batch_size: Maximum number of snapshots to fetch per batch.
146144
console: Optional console for reporting progress.
147145
148146
Returns:
149-
The total number of deleted expired snapshots.
147+
List of failure messages so callers can surface them at the end of the janitor run.
150148
"""
149+
failures: t.List[str] = []
151150
num_expired_snapshots = 0
152151
for batch in iter_expired_snapshot_batches(
153152
state_reader=state_sync,
@@ -165,17 +164,32 @@ def delete_expired_snapshots(
165164
len(batch.expired_snapshot_ids),
166165
end_info,
167166
)
168-
snapshot_evaluator.cleanup(
169-
target_snapshots=batch.cleanup_tasks,
170-
on_complete=console.update_cleanup_progress if console else None,
171-
)
172-
state_sync.delete_expired_snapshots(
173-
batch_range=ExpiredBatchRange(
174-
start=RowBoundary.lowest_boundary(),
175-
end=batch.batch_range.end,
176-
),
177-
ignore_ttl=ignore_ttl,
178-
)
179-
logger.info("Cleaned up expired snapshots batch")
180-
num_expired_snapshots += len(batch.expired_snapshot_ids)
167+
cleanup_succeeded = True
168+
try:
169+
snapshot_evaluator.cleanup(
170+
target_snapshots=batch.cleanup_tasks,
171+
on_complete=console.update_cleanup_progress if console else None,
172+
)
173+
except Exception as failed_drops:
174+
message = f"Failed to clean up: {failed_drops}"
175+
logger.warning(message)
176+
failures.append(message)
177+
cleanup_succeeded = False
178+
179+
if cleanup_succeeded or force_delete:
180+
try:
181+
state_sync.delete_expired_snapshots(
182+
batch_range=ExpiredBatchRange(
183+
start=RowBoundary.lowest_boundary(),
184+
end=batch.batch_range.end,
185+
),
186+
ignore_ttl=ignore_ttl,
187+
)
188+
logger.info("Cleaned up expired snapshots batch")
189+
num_expired_snapshots += len(batch.expired_snapshot_ids)
190+
except Exception as e:
191+
message = f"Failed to delete expired snapshot state records: {e}"
192+
logger.warning(message)
193+
failures.append(message)
181194
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
195+
return failures

sqlmesh/core/snapshot/evaluator.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ def cleanup(
547547
t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets
548548
}
549549
with self.concurrent_context():
550-
concurrent_apply_to_snapshots(
550+
errors, _ = concurrent_apply_to_snapshots(
551551
[t.snapshot for t in filtered_targets],
552552
lambda s: self._cleanup_snapshot(
553553
s,
@@ -557,7 +557,11 @@ def cleanup(
557557
),
558558
self.ddl_concurrent_tasks,
559559
reverse_order=True,
560+
raise_on_error=False,
560561
)
562+
if errors:
563+
errored_snapshots = "\n".join(f" {e.node.name}: {e.__cause__}" for e in errors)
564+
raise SQLMeshError(f"\n{errored_snapshots}")
561565

562566
def audit(
563567
self,

0 commit comments

Comments
 (0)