Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions docs/source/howto/run_httomo.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,22 @@ Once the appropriate step has been done, you will have access to the HTTomo CLI:

Commands:
check Check a YAML pipeline file for errors.
memory-check Estimate CPU memory requirements for processing input...
run Run a processing pipeline defined in YAML on input data.

As can be seen from the output above, there are two HTTomo commands available:
:code:`check` and :code:`run`.
As can be seen from the output above, there are three HTTomo commands
available: :code:`check`, :code:`memory-check`, and :code:`run`.

The :code:`check` command is used for checking a YAML process list file for
errors, and is highly recommended to be run before attempting to run the
pipeline. Please see :ref:`utilities_yamlchecker` for more information about
the checks being performed, the help information that is printed, etc.

The :code:`memory-check` command is for estimating the CPU memory requirements
for processing the input data with a given pipeline and number of processes.
The underlying functionality was primarily developed for use with the DLS
HTTomo launcher, and has been exposed as a CLI command for convenience.

The :code:`run` command is used for running HTTomo with a pipeline on the given
HDF5 input data.

Expand Down Expand Up @@ -147,6 +153,66 @@ Options/flags

The :code:`check` command has *no* options/flags.

The :code:`memory-check` command
++++++++++++++++++++++++++++++++

.. code-block:: console

$ python -m httomo memory-check --help
Usage: python -m httomo memory-check [OPTIONS] IN_DATA_FILE PIPELINE NPROCS

Estimate CPU memory requirements for processing input data with a given
pipeline and number of processes

Options:
--help Show this message and exit.


Arguments
#########

For :code:`memory-check` there are three *required* arguments:
:code:`IN_DATA_FILE`, :code:`PIPELINE`, and :code:`NPROCS`, and zero *optional*
arguments.

:code:`IN_DATA_FILE` (required)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This is the filepath to the HDF5 input data that is intended to be processed.
This is required primarily for querying the size of the input data.

:code:`PIPELINE` (required)
~~~~~~~~~~~~~~~~~~~~~~~~~~~

This is the filepath to the YAML process list file that defines the processing
to be applied to the input data.

This is required for several reasons:

- any cropping of the data via the loader's :code:`preview` parameter will
affect the size of the data being processed
- any methods requiring padding will affect the size of the data being
processed
- any :ref:`info_reslice` in the pipeline will affect the amount of memory
required

:code:`NPROCS` (required)
~~~~~~~~~~~~~~~~~~~~~~~~~~~

This is the number of processes the input data is intended to be processed
with.

This is required primarily due to the number of processes affecting how the
input data is split up, and thus affects the allocations required for
processing the subsets of data.

.. note:: The value of :code:`NPROCS` must be >= 1.

Options/flags
#############

The :code:`memory-check` command has *zero* options/flags.

The :code:`run` command
+++++++++++++++++++++++

Expand Down
69 changes: 69 additions & 0 deletions httomo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import yaml

import click
import h5py
import shutil
from mpi4py import MPI
from loguru import logger
Expand All @@ -16,9 +17,12 @@
from httomo.cli_utils import is_sweep_pipeline
from httomo.logger import setup_logger
from httomo.monitors import MONITORS_MAP, make_monitors
from httomo.runner.dataset_store_backing import estimate_section_memory
from httomo.runner.pipeline import Pipeline
from httomo.runner.section import sectionize
from httomo.sweep_runner.param_sweep_runner import ParamSweepRunner
from httomo.transform_layer import TransformLayer
from httomo.transform_loader_params import parse_config, parse_preview
from httomo.utils import log_exception, log_once, mpi_abort_excepthook
from httomo.yaml_checker import (
validate_yaml_config,
Expand Down Expand Up @@ -71,6 +75,32 @@ def main():
pass


@main.command()
@click.argument(
"in_data_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
)
@click.argument(
"pipeline",
type=PipelineFilePathOrString(
types=[click.Path(exists=True, dir_okay=False, path_type=Path), click.STRING]
),
)
@click.argument(
"nprocs",
type=click.IntRange(1),
)
def memory_check(in_data_file: Path, pipeline: Union[Path, str], nprocs: int):
"""
Estimate CPU memory requirements for processing input data with a given pipeline and number
of processes
"""
memory_peak = estimate_cpu_memory(in_data_file, pipeline, nprocs)
print(
f"Estimated peak CPU memory usage for {nprocs} process run: {(memory_peak * nprocs) / (1024 ** 3):.3f} GB"
)


@main.command()
@click.argument(
"pipeline",
Expand Down Expand Up @@ -500,3 +530,42 @@ def execute_high_throughput_run(

def execute_sweep_run(pipeline: Pipeline, global_comm: MPI.Comm) -> None:
ParamSweepRunner(pipeline, global_comm).execute()


def estimate_cpu_memory(in_data_file: Path, pipeline_file: Path, nprocs: int) -> int:
pipeline = generate_pipeline(
in_data_file, pipeline_file, False, MPI.COMM_WORLD, PipelineFormat.Yaml
)
sections = sectionize(pipeline)
config = yaml_loader(pipeline_file)
data_config, _, _, _, _ = parse_config(in_data_file, config[0]["parameters"])
with h5py.File(in_data_file, "r") as f:
dataset = f[data_config.data_path]
dtype = dataset.dtype
full_shape = dataset.shape

preview_config = parse_preview(
config[0]["parameters"].get("preview", None), full_shape
)
previewed_shape = (
preview_config.angles.stop - preview_config.angles.start,
preview_config.detector_y.stop - preview_config.detector_y.start,
preview_config.detector_x.stop - preview_config.detector_x.start,
)

section_memory_peak = 0
for idx in range(len(sections)):
section_memory_peak = max(
section_memory_peak,
estimate_section_memory(
nprocs,
0,
None,
dtype,
previewed_shape,
sections,
idx,
),
)

return section_memory_peak
2 changes: 1 addition & 1 deletion httomo/data/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def __init__(
self._data = self._reslice(source.slicing_dim, slicing_dim, source_data)
end = time.perf_counter()
log_once(
f"reslice_memory_estimator: {reslice_memory_estimator(source_data.shape, source_data.dtype, source.slicing_dim, slicing_dim, self._comm)}",
f"reslice_memory_estimator: {reslice_memory_estimator(source_data.shape, source_data.dtype, source.slicing_dim, slicing_dim, self._comm.size, self._comm.rank, self._comm.allgather)}",
level=logging.DEBUG,
)
if slicing_dim == 1:
Expand Down
17 changes: 12 additions & 5 deletions httomo/data/hdf/_utils/reslice.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import Tuple
from collections.abc import Callable
from typing import Any, Optional, Tuple, TypeAlias

import numpy
from mpi4py.MPI import Comm
Expand Down Expand Up @@ -70,15 +71,18 @@ def reslice(
return new_data, next_slice_dim, start_idx


AllGatherFunc: TypeAlias = Optional[Callable[[Any], list[Any]]]


def reslice_memory_estimator(
data_shape: Tuple[int, int, int],
dtype: numpy.dtype,
current_slice_dim: int,
next_slice_dim: int,
comm: Comm,
nprocs: int,
rank: int,
allgather_func: AllGatherFunc,
) -> Tuple[int, int]:
rank = comm.rank
nprocs = comm.size
itemsize = numpy.dtype(dtype).itemsize

split_sizes = []
Expand All @@ -93,7 +97,10 @@ def reslice_memory_estimator(
split_sizes.append(numpy.prod(split_shape) * itemsize)
prev_idx = next_idx

all_split_sizes = comm.allgather(split_sizes)
if allgather_func is not None:
all_split_sizes = allgather_func(split_sizes)
else:
all_split_sizes = [split_sizes] * nprocs
recv_sizes = [all_split_sizes[p][rank] for p in range(nprocs)]

output_shape = list(data_shape)
Expand Down
77 changes: 59 additions & 18 deletions httomo/runner/dataset_store_backing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@
from numpy.typing import DTypeLike
from mpi4py import MPI

from httomo.data.hdf._utils.reslice import reslice_memory_estimator
from httomo.data.hdf._utils.reslice import AllGatherFunc, reslice_memory_estimator
from httomo.runner.section import Section, determine_section_padding
from httomo.utils import _get_slicing_dim, make_3d_shape_from_shape


def calculate_section_input_chunk_shape(
comm: MPI.Comm,
nprocs: int,
rank: int,
global_shape: Tuple[int, int, int],
slicing_dim: int,
padding: Tuple[int, int],
) -> Tuple[int, int, int]:
"""
Calculate the shape of the section input chunk w/ or w/o padding.
"""
start = round((global_shape[slicing_dim] / comm.size) * comm.rank)
stop = round((global_shape[slicing_dim] / comm.size) * (comm.rank + 1))
start = round((global_shape[slicing_dim] / nprocs) * rank)
stop = round((global_shape[slicing_dim] / nprocs) * (rank + 1))
section_slicing_dim_len = stop - start
shape = list(global_shape)
shape[slicing_dim] = section_slicing_dim_len + padding[0] + padding[1]
Expand Down Expand Up @@ -58,19 +59,21 @@ class DataSetStoreBacking(Enum):
File = 2


def determine_store_backing(
comm: MPI.Comm,
sections: List[Section],
memory_limit_bytes: int,
def estimate_section_memory(
nprocs: int,
rank: int,
allgather_func: AllGatherFunc,
dtype: DTypeLike,
global_shape: Tuple[int, int, int],
sections: List[Section],
section_idx: int,
) -> DataSetStoreBacking:
) -> int:
# Get chunk shape created by reader of section `n` (the current section) that will account
# for padding. This chunk shape is based on the chunk shape written by the writer of
# section `n - 1` (the previous section)
padded_input_chunk_shape = calculate_section_input_chunk_shape(
comm=comm,
nprocs=nprocs,
rank=rank,
global_shape=global_shape,
slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1,
padding=determine_section_padding(sections[section_idx]),
Expand All @@ -82,7 +85,8 @@ def determine_store_backing(
# Get unpadded chunk shape input to current section (for calculation of bytes in output
# chunk for the current section)
input_chunk_shape = calculate_section_input_chunk_shape(
comm=comm,
nprocs=nprocs,
rank=rank,
global_shape=global_shape,
slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1,
padding=(0, 0),
Expand All @@ -102,7 +106,7 @@ def determine_store_backing(
# input to it (which would be the output chunk of the current section)
reslice_bytes = 0
if (
comm.size > 1
nprocs > 1
and section_idx < len(sections) - 1
and sections[section_idx].pattern != sections[section_idx + 1].pattern
):
Expand All @@ -111,18 +115,55 @@ def determine_store_backing(
dtype,
_get_slicing_dim(sections[section_idx].pattern),
_get_slicing_dim(sections[section_idx + 1].pattern),
comm,
nprocs,
rank,
allgather_func,
)
reslice_bytes += ring_algorithm_bytes + reslice_output_bytes

# TODO: The nature of the pinned memory allocations by cupy is currently under
# investigation, so a more precise calculation for its size is not yet known.
#
# It's known that this can grow quite large via allocations exceeding the current
# allocation being bumped to the next power of 2 (ie, a 16GiB allocation that is exceeded
# by 1 byte will have a 32GiB allocation made in addition to the original 16GiB).
#
# Taking half the input data size seems to be in the ballpark for what has been observed
# with larger datasets (ie, an 84GB dataset being processed took ~520GB of memory, and with
# this arbitrary choice of 0.5 as a multiplicative factor gets the estimated value to
# ~514GB)
CUPY_PINNED_CPU_MEMORY = int(0.5 * np.prod(global_shape) * np.dtype(dtype).itemsize)

return (
padded_input_chunk_bytes
+ output_chunk_bytes
+ reslice_bytes
+ CUPY_PINNED_CPU_MEMORY
)


def determine_store_backing(
comm: MPI.Comm,
sections: List[Section],
memory_limit_bytes: int,
dtype: DTypeLike,
global_shape: Tuple[int, int, int],
section_idx: int,
) -> DataSetStoreBacking:
section_memory = estimate_section_memory(
comm.size,
comm.rank,
comm.allgather,
dtype,
global_shape,
sections,
section_idx,
)

send_buffer = np.zeros(1, dtype=bool)
recv_buffer = np.zeros(1, dtype=bool)

if (
memory_limit_bytes > 0
and padded_input_chunk_bytes + output_chunk_bytes + reslice_bytes
>= memory_limit_bytes
):
if memory_limit_bytes > 0 and section_memory >= memory_limit_bytes:
send_buffer[0] = True

# do a logical OR of all the enum variants across the processes
Expand Down
Loading
Loading