-
Notifications
You must be signed in to change notification settings - Fork 11
DM-54879: Add --ignore-existing-metadata-for for when upstream outputs are selectively retained #561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
DM-54879: Add --ignore-existing-metadata-for for when upstream outputs are selectively retained #561
Changes from all commits
0db29d2
88ca8ff
13839b2
40cb774
7481f5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Added `retry_missing_outputs_for` parameter to `QuantumGraphBuilder` and `SeparablePipelineExecutor`. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,16 +29,20 @@ | |
|
|
||
| import io | ||
| import logging | ||
| import tempfile | ||
| import unittest | ||
|
|
||
| import numpy | ||
|
|
||
| import lsst.utils.tests | ||
| from lsst.daf.butler import Butler, DatasetType | ||
| from lsst.daf.butler.registry import UserExpressionError | ||
| from lsst.pipe.base import PipelineGraph, QuantumGraph | ||
| from lsst.pipe.base import PipelineGraph, QuantumGraph, TaskMetadata | ||
| from lsst.pipe.base.all_dimensions_quantum_graph_builder import ( | ||
| AllDimensionsQuantumGraphBuilder, | ||
| DatasetQueryConstraintVariant, | ||
| ) | ||
| from lsst.pipe.base.quantum_graph_builder import OutputExistsError | ||
| from lsst.pipe.base.tests import simpleQGraph | ||
| from lsst.pipe.base.tests.mocks import ( | ||
| DynamicConnectionConfig, | ||
|
|
@@ -228,6 +232,172 @@ | |
| self.assertEqual(quantum.datastore_records, {}) | ||
|
|
||
|
|
||
| class SkipExistingInTestCase(unittest.TestCase): | ||
| """Tests for the skip_existing_in behavior of QuantumGraphBuilder.""" | ||
|
|
||
| def setUp(self): | ||
| repodir = tempfile.TemporaryDirectory() | ||
| self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) | ||
| pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1) | ||
| butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is worth changing at this point, but for new tests I'd generally recommend |
||
| self.enterContext(butler) | ||
| self.butler = butler | ||
| self.pipeline_graph = pipeline.to_graph() | ||
| self.butler.registry.registerRun("run") | ||
|
|
||
| def test_not_skipped_without_skip_existing_in(self): | ||
| """Without skip_existing_in, a quantum is never skipped even if | ||
| metadata exists in an input collection. | ||
| """ | ||
| self.butler.put( | ||
| TaskMetadata(), | ||
| "task0_metadata", | ||
| run="run", | ||
| instrument="INSTR", | ||
| detector=0, | ||
| ) | ||
|
|
||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, self.butler, input_collections=["test"], output_run="new_run" | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 1) | ||
|
|
||
| def test_skipped_when_metadata_exists(self): | ||
| """With skip_existing_in, a quantum is skipped when its metadata | ||
| dataset is present in the specified collections. | ||
| """ | ||
| self.butler.put( | ||
| TaskMetadata(), | ||
| "task0_metadata", | ||
| run="run", | ||
| instrument="INSTR", | ||
| detector=0, | ||
| ) | ||
| # Init-outputs required, otherwise InitInputMissingError. | ||
| self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") | ||
| self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") | ||
|
|
||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| input_collections=["test"], | ||
| output_run="new_run", | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 0) | ||
|
|
||
| def test_not_skipped_when_metadata_absent(self): | ||
| """With skip_existing_in, a quantum is not skipped when its metadata | ||
| dataset is absent from the specified collections. | ||
| """ | ||
| # No metadata put — run exists but is empty. | ||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| input_collections=["test"], | ||
| output_run="new_run", | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 1) | ||
|
|
||
|
|
||
| class RetryMissingOutputsForTestCase(unittest.TestCase): | ||
| """Tests for QuantumGraphBuilder.retry_missing_outputs_for.""" | ||
|
|
||
| def setUp(self): | ||
| repodir = tempfile.TemporaryDirectory() | ||
| self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) | ||
| pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1) | ||
| butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1) | ||
| self.enterContext(butler) | ||
| self.butler = butler | ||
| self.pipeline_graph = pipeline.to_graph() | ||
| # Simulate a prior run and put a metadata. | ||
| self.butler.registry.registerRun("run") | ||
| self.butler.put( | ||
| TaskMetadata(), | ||
| "task0_metadata", | ||
| run="run", | ||
| instrument="INSTR", | ||
| detector=0, | ||
| ) | ||
|
|
||
| def test_not_skipped_when_outputs_missing(self): | ||
| """With retry_missing_outputs_for, quantum is not skipped when regular | ||
| outputs are absent from skip_existing_in, even if metadata is present. | ||
|
|
||
| A scenario is that an upstream pipeline ran and wrote | ||
| metadata but did not retain output datasets. | ||
| """ | ||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| retry_missing_outputs_for=["task0"], | ||
| input_collections=["test"], | ||
| output_run="new_run", | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 1) | ||
|
|
||
| def test_skips_when_all_outputs_present(self): | ||
| """With retry_missing_outputs_for, quantum is skipped when all regular | ||
| outputs are present in skip_existing_in. | ||
| """ | ||
| self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) | ||
| self.butler.put(numpy.array([0.0]), "add2_dataset1", run="run", instrument="INSTR", detector=0) | ||
| # Init-outputs required when all quanta are skipped. | ||
| self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") | ||
| self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") | ||
|
|
||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| retry_missing_outputs_for=["task0"], | ||
| input_collections=["test"], | ||
| output_run="new_run", | ||
| ).build() | ||
| # All outputs found, so quantum should be skipped. | ||
| self.assertEqual(len(qgraph), 0) | ||
|
|
||
| def test_output_exists_error_when_partial_outputs(self): | ||
| """With retry_missing_outputs_for, OutputExistsError is raised when some | ||
| outputs exist in the output run and clobber is off. | ||
| """ | ||
| self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) | ||
| # add2_dataset1 absent -> not all outputs present -> task not skipped | ||
|
|
||
| with self.assertRaises(OutputExistsError): | ||
| AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| retry_missing_outputs_for=["task0"], | ||
| input_collections=["test"], | ||
| # Use the same run so that partial output is in the way. | ||
| output_run="run", | ||
| ).build() | ||
|
|
||
| def test_partial_outputs_clobber(self): | ||
| """With retry_missing_outputs_for and clobber=True, partial outputs in the | ||
| output run are discarded and the task runs. | ||
| """ | ||
| self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) | ||
| # add2_dataset1 absent -> not all outputs present -> task not skipped | ||
| # clobber=True -> add_dataset1 discarded from graph, task runs | ||
|
|
||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| retry_missing_outputs_for=["task0"], | ||
| input_collections=["test"], | ||
| output_run="run", | ||
| clobber=True, | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 1) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| lsst.utils.tests.init() | ||
| unittest.main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious where this is coming from; AFAIK we don't use
bytearrayormemoryview[int]for any of these.