Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-54645.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added garbage collection metrics to `SingleQuantumExecutor` with metrics stored in task metadata under `quantum.gc_metrics` key.
129 changes: 129 additions & 0 deletions python/lsst/pipe/base/gc_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# This file is part of pipe_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ["GcMetrics"]

import gc
from collections import defaultdict
from types import TracebackType
from typing import Self

import pydantic

from ._task_metadata import TaskMetadata


def _gc_stats() -> dict[str, list[int]]:
"""Convert result of `gc.get_stats` to a dictionary of lists."""
result: dict[str, list[int]] = defaultdict(list)
for gen_stats in gc.get_stats():
for key, stat in gen_stats.items():
result[key].append(stat)
return result


class GcMetrics(pydantic.BaseModel):
"""Context manager which collects GC metrics and converts them into
a dictionary suitable for TaskMetadata.
"""

start_isenabled: bool | None = None
"""Whether GC is enabled on entering context (`bool` or `None`)."""

end_isenabled: bool | None = None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are worried a task is going to disable GC?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not worried, but I want to know if it happens. I imagine some tasks may want to do that (but they may also want to re-enable it on return).

"""Whether GC is enabled on exiting context (`bool` or `None`)."""

start_threshold: list[int] | None = None
"""GC thresholds on entering context (`list`[`int`] or `None`)."""

end_threshold: list[int] | None = None
"""GC thresholds on exiting context (`list`[`int`] or `None`)."""

start_count: list[int] | None = None
"""GC collection counts on entering context (`list`[`int`] or `None`)."""

end_count: list[int] | None = None
"""GC collection counts on exiting context (`list`[`int`] or `None`)."""

start_stats: dict[str, list[int]] | None = None
"""GC stats on entering context (`dict`[`str`, `list`[`int`]] or `None`).

These are the same values as returned from `gc.get_stats` but rearranged
to be indexed by string key first and generation second.
"""

end_stats: dict[str, list[int]] | None = None
"""GC stats on exiting context, same format as `start_stats`
(`dict`[`str`, `list`[`int`]] or `None`).
"""

def __enter__(self) -> Self:
self.start_isenabled = gc.isenabled()
self.start_threshold = list(gc.get_threshold())
self.start_count = list(gc.get_count())
self.start_stats = _gc_stats()
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.end_isenabled = gc.isenabled()
self.end_threshold = list(gc.get_threshold())
Comment thread
andy-slac marked this conversation as resolved.
self.end_count = list(gc.get_count())
self.end_stats = _gc_stats()

@classmethod
def from_task_metadata(cls, metadata: TaskMetadata) -> GcMetrics | None:
"""Extract GC metrics from task metadata.

Parameters
----------
metadata : `TaskMetadata`
Metadata written by
`.single_quantum_executor.SingleQuantumExecutor`.

Returns
-------
gc_metrics : `GcMetrics` or `None`
GC metrics for this quantum, or `None` if the expected fields were
not found.
"""
try:
quantum_metadata = metadata["quantum"]
except KeyError:
return None
try:
gc_metadata = quantum_metadata["gc_metrics"]
except KeyError:
return None

return GcMetrics(**gc_metadata.to_dict())
3 changes: 2 additions & 1 deletion python/lsst/pipe/base/quantum_graph/aggregator/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
__all__ = ("Writer",)

import dataclasses
from collections.abc import ByteString

import zstandard

Expand Down Expand Up @@ -163,7 +164,7 @@ def make_compression_dictionary(self) -> zstandard.ZstdCompressionDict:
self.comms.log.info("Making compressor with no dictionary.")
return zstandard.ZstdCompressionDict(b"")
self.comms.log.info("Training compression dictionary.")
training_inputs: list[bytes] = []
training_inputs: list[ByteString] = []
# We start the dictionary training with *predicted* quantum dataset
# models, since those have almost all of the same attributes as the
# provenance quantum and dataset models, and we can get a nice random
Expand Down
4 changes: 3 additions & 1 deletion python/lsst/pipe/base/single_quantum_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
QuantumSuccessCaveats,
)
from .connections import AdjustQuantumHelper
from .gc_metrics import GcMetrics
from .log_capture import LogCapture, _ExecutionLogRecordsExtra
from .pipeline_graph import TaskNode
from .pipelineTask import PipelineTask
Expand Down Expand Up @@ -307,7 +308,7 @@ def _execute_with_limited_butler(
task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs)
logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
outputs_put: list[uuid.UUID] = []
with limited_butler.record_metrics() as butler_metrics:
with limited_butler.record_metrics() as butler_metrics, GcMetrics() as gc_metrics:
caveats = self._run_quantum(
task, quantum, task_node, limited_butler, quantum_id=quantum_id, ids_put=outputs_put
)
Expand All @@ -327,6 +328,7 @@ def _execute_with_limited_butler(
raise
else:
quantumMetadata["butler_metrics"] = butler_metrics.model_dump()
quantumMetadata["gc_metrics"] = gc_metrics.model_dump()
quantumMetadata["caveats"] = caveats.value
# Stringify the UUID for easier compatibility with
# PropertyList.
Expand Down
13 changes: 13 additions & 0 deletions tests/test_single_quantum_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import unittest

import lsst.pipe.base.automatic_connection_constants as acc
from lsst.pipe.base.gc_metrics import GcMetrics
from lsst.pipe.base.resource_usage import QuantumResourceUsage
from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor
from lsst.pipe.base.tests.mocks import InMemoryRepo
Expand Down Expand Up @@ -71,6 +72,18 @@ def test_simple_execute(self) -> None:
self.assertGreater(ru.total_time, 0)
self.assertLess(ru.total_time, t2 - t1)

# Check that GC metrics are filled.
gc_metrics = GcMetrics.from_task_metadata(md)
self.assertIsNotNone(gc_metrics)
self.assertTrue(gc_metrics.start_isenabled)
self.assertTrue(gc_metrics.end_isenabled)
self.assertEqual(len(gc_metrics.start_threshold), 3)
self.assertEqual(len(gc_metrics.end_threshold), 3)
self.assertEqual(len(gc_metrics.start_count), 3)
self.assertEqual(len(gc_metrics.end_count), 3)
self.assertEqual(set(gc_metrics.start_stats), {"collections", "collected", "uncollectable"})
self.assertEqual(set(gc_metrics.end_stats), {"collections", "collected", "uncollectable"})

def test_skip_existing_execute(self) -> None:
"""Run execute() method twice, with skip_existing_in."""
helper = InMemoryRepo("base.yaml")
Expand Down
Loading