Skip to content

Commit 00992be

Browse files
committed
feat: add --environment flag to sqlmesh janitor for scoped cleanup
Signed-off-by: Michael Day <michael.day@cloudkitchens.com> Signed-off-by: mday-io <mdaytn@gmail.com>
1 parent c5dbce5 commit 00992be

7 files changed

Lines changed: 343 additions & 35 deletions

File tree

sqlmesh/cli/main.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -629,24 +629,36 @@ def invalidate(ctx: click.Context, environment: str, **kwargs: t.Any) -> None:
629629
@click.option(
630630
"--ignore-ttl",
631631
is_flag=True,
632-
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
632+
help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire. Has no effect when --environment is specified.",
633633
)
634634
@click.option(
635635
"--force-delete",
636636
is_flag=True,
637637
help="Delete expired environment and snapshot state records even when the physical table or view drops fail. "
638638
"Any objects that could not be dropped become orphaned and must be removed manually.",
639639
)
640+
@click.option(
641+
"--environment",
642+
"-e",
643+
default=None,
644+
help="Scope cleanup to a single expired environment. Global snapshot and interval compaction are skipped.",
645+
)
640646
@click.pass_context
641647
@error_handler
642648
@cli_analytics
643-
def janitor(ctx: click.Context, ignore_ttl: bool, force_delete: bool, **kwargs: t.Any) -> None:
649+
def janitor(
650+
ctx: click.Context,
651+
ignore_ttl: bool,
652+
force_delete: bool,
653+
environment: t.Optional[str],
654+
**kwargs: t.Any,
655+
) -> None:
644656
"""
645657
Run the janitor process on-demand.
646658
647659
The janitor cleans up old environments and expired snapshots.
648660
"""
649-
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, **kwargs)
661+
ctx.obj.run_janitor(ignore_ttl, force_delete=force_delete, environment=environment, **kwargs)
650662

651663

652664
@cli.command("destroy")

sqlmesh/core/context.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -887,12 +887,20 @@ def _has_environment_changed() -> bool:
887887
return completion_status
888888

889889
@python_api_analytics
890-
def run_janitor(self, ignore_ttl: bool, force_delete: bool = False) -> bool:
890+
def run_janitor(
891+
self,
892+
ignore_ttl: bool,
893+
force_delete: bool = False,
894+
environment: t.Optional[str] = None,
895+
) -> bool:
896+
if environment is not None:
897+
environment = Environment.sanitize_name(environment)
898+
891899
success = False
892900

893901
if self.console.start_cleanup(ignore_ttl):
894902
try:
895-
self._run_janitor(ignore_ttl, force_delete=force_delete)
903+
self._run_janitor(ignore_ttl, force_delete=force_delete, environment=environment)
896904
success = True
897905
finally:
898906
self.console.stop_cleanup(success=success)
@@ -1825,7 +1833,7 @@ def invalidate_environment(self, name: str, sync: bool = False) -> None:
18251833
name = Environment.sanitize_name(name)
18261834
self.state_sync.invalidate_environment(name)
18271835
if sync:
1828-
self._cleanup_environments()
1836+
self._cleanup_environments(name=name)
18291837
self.console.log_success(f"Environment '{name}' deleted.")
18301838
else:
18311839
self.console.log_success(f"Environment '{name}' invalidated.")
@@ -2896,27 +2904,35 @@ def _destroy(self) -> bool:
28962904

28972905
return True
28982906

2899-
def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) -> None:
2907+
def _run_janitor(
2908+
self,
2909+
ignore_ttl: bool = False,
2910+
force_delete: bool = False,
2911+
environment: t.Optional[str] = None,
2912+
) -> None:
29002913
current_ts = now_timestamp()
29012914
failures: t.List[str] = []
29022915

29032916
# Clean up expired environments by removing their views and schemas
29042917
failures.extend(
2905-
self._cleanup_environments(current_ts=current_ts, force_delete=force_delete)
2918+
self._cleanup_environments(
2919+
current_ts=current_ts, force_delete=force_delete, name=environment
2920+
)
29062921
)
29072922

2908-
failures.extend(
2909-
delete_expired_snapshots(
2910-
self.state_sync,
2911-
self.snapshot_evaluator,
2912-
current_ts=current_ts,
2913-
ignore_ttl=ignore_ttl,
2914-
force_delete=force_delete,
2915-
console=self.console,
2916-
batch_size=self.config.janitor.expired_snapshots_batch_size,
2923+
if environment is None:
2924+
failures.extend(
2925+
delete_expired_snapshots(
2926+
self.state_sync,
2927+
self.snapshot_evaluator,
2928+
current_ts=current_ts,
2929+
ignore_ttl=ignore_ttl,
2930+
force_delete=force_delete,
2931+
console=self.console,
2932+
batch_size=self.config.janitor.expired_snapshots_batch_size,
2933+
)
29172934
)
2918-
)
2919-
self.state_sync.compact_intervals()
2935+
self.state_sync.compact_intervals()
29202936

29212937
if failures:
29222938
failure_string = "\n - ".join(failures)
@@ -2929,15 +2945,23 @@ def _run_janitor(self, ignore_ttl: bool = False, force_delete: bool = False) ->
29292945
raise SQLMeshError(summary)
29302946

29312947
def _cleanup_environments(
2932-
self, current_ts: t.Optional[int] = None, force_delete: bool = False
2948+
self,
2949+
current_ts: t.Optional[int] = None,
2950+
force_delete: bool = False,
2951+
name: t.Optional[str] = None,
29332952
) -> t.List[str]:
29342953
current_ts = current_ts or now_timestamp()
29352954
failures: t.List[str] = []
29362955

29372956
expired_environments_summaries = self.state_sync.get_expired_environments(
2938-
current_ts=current_ts
2957+
current_ts=current_ts, name=name
29392958
)
29402959

2960+
if name is not None and not expired_environments_summaries:
2961+
self.console.log_warning(
2962+
f"Environment '{name}' is not expired or does not exist. Nothing to clean up."
2963+
)
2964+
29412965
for expired_env_summary in expired_environments_summaries:
29422966
expired_env = self.state_reader.get_environment(expired_env_summary.name)
29432967

@@ -2954,7 +2978,7 @@ def _cleanup_environments(
29542978
# we want to retry on the next janitor pass if drops failed, unless
29552979
# force_delete is set in which case we purge state records regardless
29562980
if not failures or force_delete:
2957-
self.state_sync.delete_expired_environments(current_ts=current_ts)
2981+
self.state_sync.delete_expired_environments(current_ts=current_ts, name=name)
29582982
return failures
29592983

29602984
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:

sqlmesh/core/state_sync/base.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,17 @@ def get_expired_snapshots(
321321
"""
322322

323323
@abc.abstractmethod
324-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
324+
def get_expired_environments(
325+
self, current_ts: int, name: t.Optional[str] = None
326+
) -> t.List[EnvironmentSummary]:
325327
"""Returns the expired environments.
326328
327329
Expired environments are environments that have exceeded their time-to-live value.
330+
331+
Args:
332+
current_ts: The current timestamp in milliseconds used to determine expiration.
333+
name: If provided, only the environment with this name is considered.
334+
328335
Returns:
329336
The list of environment summaries to remove.
330337
"""
@@ -436,12 +443,16 @@ def finalize(self, environment: Environment) -> None:
436443

437444
@abc.abstractmethod
438445
def delete_expired_environments(
439-
self, current_ts: t.Optional[int] = None
446+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
440447
) -> t.List[EnvironmentSummary]:
441448
"""Removes expired environments.
442449
443450
Expired environments are environments that have exceeded their time-to-live value.
444451
452+
Args:
453+
current_ts: The current timestamp in milliseconds. Defaults to now.
454+
name: If provided, only the environment with this name is deleted.
455+
445456
Returns:
446457
The list of removed environments.
447458
"""

sqlmesh/core/state_sync/db/environment.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,31 +167,46 @@ def finalize(self, environment: Environment) -> None:
167167
where=environment_filter,
168168
)
169169

170-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
170+
def get_expired_environments(
171+
self, current_ts: int, name: t.Optional[str] = None
172+
) -> t.List[EnvironmentSummary]:
171173
"""Returns the expired environments.
172174
173175
Expired environments are environments that have exceeded their time-to-live value.
176+
177+
Args:
178+
current_ts: The current timestamp in milliseconds used to determine expiration.
179+
name: If provided, only the environment with this name is considered.
180+
174181
Returns:
175182
The list of environment summaries to remove.
176183
"""
177-
return self._fetch_environment_summaries(
178-
where=self._create_expiration_filter_expr(current_ts)
179-
)
184+
where: exp.Expression = self._create_expiration_filter_expr(current_ts)
185+
if name is not None:
186+
where = exp.and_(where, exp.column("name").eq(name))
187+
return self._fetch_environment_summaries(where=where)
180188

181189
def delete_expired_environments(
182-
self, current_ts: t.Optional[int] = None
190+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
183191
) -> t.List[EnvironmentSummary]:
184192
"""Deletes expired environments.
185193
194+
Args:
195+
current_ts: The current timestamp in milliseconds. Defaults to now.
196+
name: If provided, only the environment with this name is deleted.
197+
186198
Returns:
187199
A list of deleted environments.
188200
"""
189201
current_ts = current_ts or now_timestamp()
190-
expired_environments = self.get_expired_environments(current_ts=current_ts)
202+
expired_environments = self.get_expired_environments(current_ts=current_ts, name=name)
191203

204+
where: exp.Expression = self._create_expiration_filter_expr(current_ts)
205+
if name is not None:
206+
where = exp.and_(where, exp.column("name").eq(name))
192207
self.engine_adapter.delete_from(
193208
self.environments_table,
194-
where=self._create_expiration_filter_expr(current_ts),
209+
where=where,
195210
)
196211

197212
# Delete the expired environments' corresponding environment statements

sqlmesh/core/state_sync/db/facade.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,10 @@ def get_expired_snapshots(
276276
batch_range=batch_range,
277277
)
278278

279-
def get_expired_environments(self, current_ts: int) -> t.List[EnvironmentSummary]:
280-
return self.environment_state.get_expired_environments(current_ts=current_ts)
279+
def get_expired_environments(
280+
self, current_ts: int, name: t.Optional[str] = None
281+
) -> t.List[EnvironmentSummary]:
282+
return self.environment_state.get_expired_environments(current_ts=current_ts, name=name)
281283

282284
@transactional()
283285
def delete_expired_snapshots(
@@ -297,10 +299,10 @@ def delete_expired_snapshots(
297299

298300
@transactional()
299301
def delete_expired_environments(
300-
self, current_ts: t.Optional[int] = None
302+
self, current_ts: t.Optional[int] = None, name: t.Optional[str] = None
301303
) -> t.List[EnvironmentSummary]:
302304
current_ts = current_ts or now_timestamp()
303-
return self.environment_state.delete_expired_environments(current_ts=current_ts)
305+
return self.environment_state.delete_expired_environments(current_ts=current_ts, name=name)
304306

305307
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
306308
self.snapshot_state.delete_snapshots(snapshot_ids)

0 commit comments

Comments
 (0)