Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
- name: Run tests
shell: bash -l {0}
run: |
pytest -r a -v -n 3 --cov=lsst.ctrl.mpexec --cov=tests --cov-report=xml --cov-report=term --cov-branch \
pytest -Wd -r a -v -n 3 --cov=lsst.ctrl.mpexec --cov=tests --cov-report=xml --cov-report=term --cov-branch \
--junitxml=junit.xml -o junit_family=legacy
- name: Upload coverage to codecov
uses: codecov/codecov-action@v5
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repos:
# Ruff version.
rev: v0.13.3
hooks:
- id: ruff
- id: ruff-check
args: [--fix]
- id: ruff-format
- repo: https://github.com/numpy/numpydoc
Expand Down
117 changes: 30 additions & 87 deletions python/lsst/ctrl/mpexec/cli/butler_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,96 +311,39 @@ def _make_read_parts(
of ``args``.
"""
butler = Butler.from_config(butler_config, writeable=False)
self = cls(
butler,
output=output,
output_run=output_run,
inputs=inputs,
extend_run=extend_run,
rebase=rebase,
writeable=False,
)
self.check(extend_run=extend_run, replace_run=replace_run, prune_replaced=prune_replaced)
if self.output and self.output.exists:
if replace_run:
replaced = self.output.chain[0]
inputs = list(self.output.chain[1:])
_LOG.debug(
"Simulating collection search in '%s' after removing '%s'.", self.output.name, replaced
)
try:
self = cls(
butler,
output=output,
output_run=output_run,
inputs=inputs,
extend_run=extend_run,
rebase=rebase,
writeable=False,
)
self.check(extend_run=extend_run, replace_run=replace_run, prune_replaced=prune_replaced)
if self.output and self.output.exists:
if replace_run:
replaced = self.output.chain[0]
inputs = list(self.output.chain[1:])
_LOG.debug(
"Simulating collection search in '%s' after removing '%s'.",
self.output.name,
replaced,
)
else:
inputs = [self.output.name]
else:
inputs = [self.output.name]
else:
inputs = list(self.inputs)
if extend_run:
assert self.output_run is not None, "Output collection has to be specified."
inputs.insert(0, self.output_run.name)
collSearch = CollectionWildcard.from_expression(inputs).require_ordered()
inputs = list(self.inputs)
if extend_run:
assert self.output_run is not None, "Output collection has to be specified."
inputs.insert(0, self.output_run.name)
collSearch = CollectionWildcard.from_expression(inputs).require_ordered()
except Exception:
butler.close()
raise
return butler, collSearch, self

@classmethod
def make_read_butler(
cls,
butler_config: ResourcePathExpression,
*,
output: str | None,
output_run: str | None,
inputs: str | Iterable[str],
extend_run: bool = False,
rebase: bool = False,
replace_run: bool,
prune_replaced: str | None = None,
) -> Butler:
"""Construct a read-only butler according to the given command-line
arguments.

Parameters
----------
butler_config : convertible to `lsst.resources.ResourcePath`
Path to configuration for the butler.
output : `str` or `None`
The name of a `~lsst.daf.butler.CollectionType.CHAINED`
input/output collection.
output_run : `str` or `None`
The name of a `~lsst.daf.butler.CollectionType.RUN` input/output
collection.
inputs : `str` or `~collections.abc.Iterable` [`str`]
Input collection name or iterable of collection names.
extend_run : `bool`
A boolean indicating whether ``output_run`` should already exist
and be extended.
rebase : `bool`
A boolean indicating whether to force the ``output`` collection to
be consistent with ``inputs`` and ``output`` run such that the
``output`` collection has output run collections first (i.e. those
that start with the same prefix), then the new inputs, then any
original inputs not included in the new inputs.
replace_run : `bool`
Whether the ``output_run`` should be replaced in the ``output``
chain.
prune_replaced : `str` or `None`
If ``replace_run=True``, whether/how datasets in the old run should
be removed. Options are ``"purge"``, ``"unstore"``, and `None`.

Returns
-------
butler : `lsst.daf.butler.Butler`
A read-only butler initialized with the given collections.
"""
cls.define_datastore_cache() # Ensure that this butler can use a shared cache.
butler, inputs, _ = cls._make_read_parts(
butler_config,
output=output,
output_run=output_run,
inputs=inputs,
extend_run=extend_run,
rebase=rebase,
replace_run=replace_run,
prune_replaced=prune_replaced,
)
_LOG.debug("Preparing butler to read from %s.", inputs)
return Butler.from_config(butler=butler, collections=inputs)

@classmethod
def make_butler_and_collections(
cls,
Expand Down
6 changes: 5 additions & 1 deletion python/lsst/ctrl/mpexec/cli/script/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ def build(
if butler_config:
butler = Butler.from_config(butler_config, writeable=False)

pipeline_graph_factory = PipelineGraphFactory(pipeline, butler, select_tasks)
try:
pipeline_graph_factory = PipelineGraphFactory(pipeline, butler, select_tasks)
finally:
if butler is not None:
butler.close()

if pipeline_dot:
with open(pipeline_dot, "w") as stream:
Expand Down
45 changes: 22 additions & 23 deletions python/lsst/ctrl/mpexec/cli/script/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ def describe(self, will: bool) -> str:
return msg

def on_confirmation(self) -> None:
butler = Butler.from_config(self.butler_config, writeable=True)
with butler.transaction():
with Butler.from_config(self.butler_config, writeable=True) as butler, butler.transaction():
for collection in self.others_to_remove:
butler.registry.removeCollection(collection)
butler.removeRuns(self.runs_to_remove)
Expand Down Expand Up @@ -128,25 +127,25 @@ def cleanup(
collection : str
The name of the chained collection.
"""
butler = Butler.from_config(butler_config)
result = CleanupResult(butler_config)
try:
to_keep = set(butler.registry.getCollectionChain(collection))
except MissingCollectionError:
result.failure = NoSuchCollectionFailure(collection)
return result
except CollectionTypeError:
result.failure = NotChainedCollectionFailure(
collection, butler.registry.getCollectionType(collection).name
)
return result
to_keep.add(collection)
glob = collection + "*"
to_consider = set(butler.registry.queryCollections(glob))
to_remove = to_consider - to_keep
for r in to_remove:
if butler.registry.getCollectionType(r) == CollectionType.RUN:
result.runs_to_remove.append(r)
else:
result.others_to_remove.append(r)
with Butler.from_config(butler_config) as butler:
result = CleanupResult(butler_config)
try:
to_keep = set(butler.registry.getCollectionChain(collection))
except MissingCollectionError:
result.failure = NoSuchCollectionFailure(collection)
return result
except CollectionTypeError:
result.failure = NotChainedCollectionFailure(
collection, butler.registry.getCollectionType(collection).name
)
return result
to_keep.add(collection)
glob = collection + "*"
to_consider = set(butler.registry.queryCollections(glob))
to_remove = to_consider - to_keep
for r in to_remove:
if butler.registry.getCollectionType(r) == CollectionType.RUN:
result.runs_to_remove.append(r)
else:
result.others_to_remove.append(r)
return result
8 changes: 4 additions & 4 deletions python/lsst/ctrl/mpexec/cli/script/pre_exec_init_qbb.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def pre_exec_init_qbb(

# Make QBB.
_LOG.verbose("Initializing quantum-backed butler.")
butler = qg.make_init_qbb(butler_config, config_search_paths=config_search_path)
# Save all InitOutputs, configs, etc.
_LOG.verbose("Instantiating tasks and saving init-outputs.")
qg.init_output_run(butler)
with qg.make_init_qbb(butler_config, config_search_paths=config_search_path) as butler:
# Save all InitOutputs, configs, etc.
_LOG.verbose("Instantiating tasks and saving init-outputs.")
qg.init_output_run(butler)
42 changes: 20 additions & 22 deletions python/lsst/ctrl/mpexec/cli/script/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ def on_confirmation(self) -> None:
if self.failure:
# This should not happen, it is a logic error.
raise RuntimeError("Can not purge, there were errors preparing collections.")
butler = Butler.from_config(self.butler_config, writeable=True)
with butler.transaction():
with Butler.from_config(self.butler_config, writeable=True) as butler, butler.transaction():
for c in itertools.chain(self.others_to_remove, self.chains_to_remove):
butler.registry.removeCollection(c)
butler.removeRuns(self.runs_to_remove)
Expand Down Expand Up @@ -290,24 +289,23 @@ def purge(
to remove the datasets after confirmation, if needed.
"""
result = PurgeResult(butler_config)
butler = Butler.from_config(butler_config)

try:
collection_type = butler.registry.getCollectionType(collection)
except MissingCollectionError:
result.fail(TopCollectionNotFoundFailure(collection))
return result

if collection_type != CollectionType.CHAINED:
result.fail(TopCollectionIsNotChainedFailure(collection, collection_type))
elif parents := check_parents(butler, collection, []):
result.fail(TopCollectionHasParentsFailure(collection, parents))
else:
prepare_to_remove(
top_collection=collection,
parent_collection=collection,
purge_result=result,
butler=butler,
recursive=recursive,
)
with Butler.from_config(butler_config) as butler:
try:
collection_type = butler.registry.getCollectionType(collection)
except MissingCollectionError:
result.fail(TopCollectionNotFoundFailure(collection))
return result

if collection_type != CollectionType.CHAINED:
result.fail(TopCollectionIsNotChainedFailure(collection, collection_type))
elif parents := check_parents(butler, collection, []):
result.fail(TopCollectionHasParentsFailure(collection, parents))
else:
prepare_to_remove(
top_collection=collection,
parent_collection=collection,
purge_result=result,
butler=butler,
recursive=recursive,
)
return result
Loading
Loading