Skip to content

Commit 419d277

Browse files
authored
Merge branch 'main' into fix/local-only-format
2 parents 6427cc8 + c5dbce5 commit 419d277

7 files changed

Lines changed: 292 additions & 62 deletions

File tree

sqlmesh/cli/main.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,16 +635,22 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
635635
is_flag=True,
636636
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
637637
)
638+
@click.option(
639+
"--force-delete",
640+
is_flag=True,
641+
help="Delete expired environment and snapshot state records even when the physical table or view drops fail. "
642+
"Any objects that could not be dropped become orphaned and must be removed manually.",
643+
)
638644
@click.pass_context
639645
@error_handler
640646
@cli_analytics
641-
def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None:
647+
def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None:
642648
"""
643649
Run the janitor process on-demand.
644650
645651
The janitor cleans up old environments and expired snapshots.
646652
"""
647-
ctx.obj.run_janitor(ignore_ttl, **kwargs)
653+
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs)
648654

649655

650656
@cli.command("destroy")

sqlmesh/core/context.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -890,12 +890,12 @@ def _has_environment_changed() -> bool:
890890
return completion_status
891891

892892
@python_api_analytics
893-
def run_janitor(self, ignore_ttl: bool) -> bool:
893+
def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool:
894894
success = False
895895

896896
if self.console.start_cleanup(ignore_ttl):
897897
try:
898-
self._run_janitor(ignore_ttl)
898+
self._run_janitor(ignore_ttl, force_delete=force_delete)
899899
success = True
900900
finally:
901901
self.console.stop_cleanup(success=success)
@@ -2899,24 +2899,43 @@ def _destroy(self) -> bool:
28992899

29002900
return True
29012901

2902-
def _run_janitor(self, ignore_ttl: bool = False) -> None:
2902+
def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None:
29032903
current_ts = now_timestamp()
2904+
failures: t.List[str] = []
29042905

29052906
# Clean up expired environments by removing their views and schemas
2906-
self._cleanup_environments(current_ts=current_ts)
2907+
failures.extend(
2908+
self._cleanup_environments(current_ts=current_ts, force_delete=force_delete)
2909+
)
29072910

2908-
delete_expired_snapshots(
2909-
self.state_sync,
2910-
self.snapshot_evaluator,
2911-
current_ts=current_ts,
2912-
ignore_ttl=ignore_ttl,
2913-
console=self.console,
2914-
batch_size=self.config.janitor.expired_snapshots_batch_size,
2911+
failures.extend(
2912+
delete_expired_snapshots(
2913+
self.state_sync,
2914+
self.snapshot_evaluator,
2915+
current_ts=current_ts,
2916+
ignore_ttl=ignore_ttl,
2917+
force_delete=force_delete,
2918+
console=self.console,
2919+
batch_size=self.config.janitor.expired_snapshots_batch_size,
2920+
)
29152921
)
29162922
self.state_sync.compact_intervals()
29172923

2918-
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
2924+
if failures:
2925+
failure_string = "\n - ".join(failures)
2926+
summary = f"Janitor completed with failures:\n {failure_string}"
2927+
if force_delete:
2928+
summary += "\nState records have been deleted, but the underlying objects may still exist in the database.\nPlease investigate and clean up manually the above if necessary."
2929+
if self.config.janitor.warn_on_delete_failure:
2930+
self.console.log_warning(summary)
2931+
else:
2932+
raise SQLMeshError(summary)
2933+
2934+
def _cleanup_environments(
2935+
self, current_ts: t.Optional[int] = None, force_delete: bool = False
2936+
) -> t.List[str]:
29192937
current_ts = current_ts or now_timestamp()
2938+
failures: t.List[str] = []
29202939

29212940
expired_environments_summaries = self.state_sync.get_expired_environments(
29222941
current_ts=current_ts
@@ -2926,15 +2945,20 @@ def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:
29262945
expired_env = self.state_reader.get_environment(expired_env_summary.name)
29272946

29282947
if expired_env:
2929-
cleanup_expired_views(
2930-
default_adapter=self.engine_adapter,
2931-
engine_adapters=self.engine_adapters,
2932-
environments=[expired_env],
2933-
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
2934-
console=self.console,
2948+
failures.extend(
2949+
cleanup_expired_views(
2950+
default_adapter=self.engine_adapter,
2951+
engine_adapters=self.engine_adapters,
2952+
environments=[expired_env],
2953+
console=self.console,
2954+
)
29352955
)
29362956

2937-
self.state_sync.delete_expired_environments(current_ts=current_ts)
2957+
# we want to retry on the next janitor pass if drops failed, unless
2958+
# force_delete is set in which case we purge state records regardless
2959+
if not failures or force_delete:
2960+
self.state_sync.delete_expired_environments(current_ts=current_ts)
2961+
return failures
29382962

29392963
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
29402964
connection_name = connection_name.capitalize()

sqlmesh/core/janitor.py

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
RowBoundary,
1717
ExpiredBatchRange,
1818
)
19-
from sqlmesh.utils.errors import SQLMeshError
2019

2120

2221
def cleanup_expired_views(
2322
default_adapter: EngineAdapter,
2423
engine_adapters: t.Dict[str, EngineAdapter],
2524
environments: t.List[Environment],
26-
warn_on_delete_failure: bool = False,
2725
console: t.Optional[Console] = None,
28-
) -> None:
26+
) -> t.List[str]:
27+
failures: t.List[str] = []
28+
2929
expired_schema_or_catalog_environments = [
3030
environment
3131
for environment in environments
@@ -85,10 +85,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
8585
console.update_cleanup_progress(expired_view)
8686
except Exception as e:
8787
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
88-
if warn_on_delete_failure:
89-
logger.warning(message)
90-
else:
91-
raise SQLMeshError(message) from e
88+
logger.warning(message)
89+
failures.append(message)
9290

9391
# Drop the schemas for the expired environments
9492
for engine_adapter, schema in schemas_to_drop:
@@ -102,10 +100,8 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
102100
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
103101
except Exception as e:
104102
message = f"Failed to drop the expired environment schema '{schema}': {e}"
105-
if warn_on_delete_failure:
106-
logger.warning(message)
107-
else:
108-
raise SQLMeshError(message) from e
103+
logger.warning(message)
104+
failures.append(message)
109105

110106
# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
111107
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
@@ -117,10 +113,10 @@ def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> Engin
117113
console.update_cleanup_progress(catalog)
118114
except Exception as e:
119115
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
120-
if warn_on_delete_failure:
121-
logger.warning(message)
122-
else:
123-
raise SQLMeshError(message) from e
116+
logger.warning(message)
117+
failures.append(message)
118+
119+
return failures
124120

125121

126122
def delete_expired_snapshots(
@@ -129,9 +125,10 @@ def delete_expired_snapshots(
129125
*,
130126
current_ts: int,
131127
ignore_ttl: bool = False,
128+
force_delete: bool = False,
132129
batch_size: t.Optional[int] = None,
133130
console: t.Optional[Console] = None,
134-
) -> None:
131+
) -> t.List[str]:
135132
"""Delete all expired snapshots in batches.
136133
137134
This helper function encapsulates the logic for deleting expired snapshots in batches,
@@ -142,12 +139,14 @@ def delete_expired_snapshots(
142139
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
143140
current_ts: Timestamp used to evaluate expiration.
144141
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
142+
force_delete: If True, delete snapshot state records even when physical table cleanup fails.
145143
batch_size: Maximum number of snapshots to fetch per batch.
146144
console: Optional console for reporting progress.
147145
148146
Returns:
149-
The total number of deleted expired snapshots.
147+
List of failure messages so callers can surface them at the end of the janitor run.
150148
"""
149+
failures: t.List[str] = []
151150
num_expired_snapshots = 0
152151
for batch in iter_expired_snapshot_batches(
153152
state_reader=state_sync,
@@ -165,17 +164,32 @@ def delete_expired_snapshots(
165164
len(batch.expired_snapshot_ids),
166165
end_info,
167166
)
168-
snapshot_evaluator.cleanup(
169-
target_snapshots=batch.cleanup_tasks,
170-
on_complete=console.update_cleanup_progress if console else None,
171-
)
172-
state_sync.delete_expired_snapshots(
173-
batch_range=ExpiredBatchRange(
174-
start=RowBoundary.lowest_boundary(),
175-
end=batch.batch_range.end,
176-
),
177-
ignore_ttl=ignore_ttl,
178-
)
179-
logger.info("Cleaned up expired snapshots batch")
180-
num_expired_snapshots += len(batch.expired_snapshot_ids)
167+
cleanup_succeeded = True
168+
try:
169+
snapshot_evaluator.cleanup(
170+
target_snapshots=batch.cleanup_tasks,
171+
on_complete=console.update_cleanup_progress if console else None,
172+
)
173+
except Exception as failed_drops:
174+
message = f"Failed to clean up: {failed_drops}"
175+
logger.warning(message)
176+
failures.append(message)
177+
cleanup_succeeded = False
178+
179+
if cleanup_succeeded or force_delete:
180+
try:
181+
state_sync.delete_expired_snapshots(
182+
batch_range=ExpiredBatchRange(
183+
start=RowBoundary.lowest_boundary(),
184+
end=batch.batch_range.end,
185+
),
186+
ignore_ttl=ignore_ttl,
187+
)
188+
logger.info("Cleaned up expired snapshots batch")
189+
num_expired_snapshots += len(batch.expired_snapshot_ids)
190+
except Exception as e:
191+
message = f"Failed to delete expired snapshot state records: {e}"
192+
logger.warning(message)
193+
failures.append(message)
181194
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)
195+
return failures

sqlmesh/core/snapshot/evaluator.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ def cleanup(
547547
t.snapshot.snapshot_id: t.dev_table_only for t in filtered_targets
548548
}
549549
with self.concurrent_context():
550-
concurrent_apply_to_snapshots(
550+
errors, _ = concurrent_apply_to_snapshots(
551551
[t.snapshot for t in filtered_targets],
552552
lambda s: self._cleanup_snapshot(
553553
s,
@@ -557,7 +557,11 @@ def cleanup(
557557
),
558558
self.ddl_concurrent_tasks,
559559
reverse_order=True,
560+
raise_on_error=False,
560561
)
562+
if errors:
563+
errored_snapshots = "\n".join(f" {e.node.name}: {e.__cause__}" for e in errors)
564+
raise SQLMeshError(f"\n{errored_snapshots}")
561565

562566
def audit(
563567
self,

0 commit comments

Comments
 (0)