Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions nvmolkit/autotune/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ def suggest_from_space(trial, name: str, spec: Any) -> Any:

- ``(low, high)`` tuple — uniform integer range.
- ``(low, high, "log")`` tuple — log-uniform integer range.
- ``(low, high, step)`` tuple — uniform integer range restricted to
multiples of ``step`` (preserves ordering for TPE, unlike a categorical
list of the same values).
- ``list`` of choices — categorical search.

A literal scalar is returned unchanged (acts as a fixed value rather than
Expand All @@ -299,6 +302,11 @@ def suggest_from_space(trial, name: str, spec: Any) -> Any:
if isinstance(spec, tuple) and len(spec) == 3 and spec[2] == "log":
low, high, _ = spec
return trial.suggest_int(name, int(low), int(high), log=True)
if isinstance(spec, tuple) and len(spec) == 3 and all(isinstance(v, int) for v in spec):
low, high, step = spec
if step <= 0:
raise ValueError(f"Search-space override for {name!r}: step must be a positive integer, got {step!r}.")
return trial.suggest_int(name, int(low), int(high), step=int(step))
Comment thread
scal444 marked this conversation as resolved.
if isinstance(spec, list) and len(spec) == 2 and all(isinstance(v, int) for v in spec):
raise TypeError(
f"Search-space override for {name!r} is a 2-element int list {spec!r}; "
Expand All @@ -310,6 +318,12 @@ def suggest_from_space(trial, name: str, spec: Any) -> Any:
f"Search-space override for {name!r} is a 3-element list {spec!r} ending in 'log'; "
"use a tuple (low, high, 'log') for a log-uniform integer range."
)
if isinstance(spec, list) and len(spec) == 3 and all(isinstance(v, int) for v in spec):
raise TypeError(
f"Search-space override for {name!r} is a 3-element int list {spec!r}; "
"use a tuple (low, high, step) for a stepped integer range, or wrap in "
"a list of more than three values to request a categorical search."
)
if isinstance(spec, (list, tuple)):
choices = list(spec)
return trial.suggest_categorical(name, choices)
Expand All @@ -322,9 +336,11 @@ def collect_int_from_space(spec: Any) -> int:
For a uniform integer range ``(low, high)`` the arithmetic midpoint is
returned. For a log-uniform range ``(low, high, "log")`` the geometric
midpoint is returned (rounded to the nearest int, clamped to ``[low,
high]``). For a categorical list the first listed choice is returned, on
the convention that callers list their preferred default first. A bare
scalar is returned unchanged.
high]``). For a stepped range ``(low, high, step)`` the arithmetic
midpoint snapped to the nearest multiple of ``step`` from ``low`` is
returned, clamped to ``[low, high]``. For a categorical list the first
listed choice is returned, on the convention that callers list their
preferred default first. A bare scalar is returned unchanged.
"""
if isinstance(spec, tuple) and len(spec) == 3 and spec[2] == "log":
low, high, _ = spec
Expand All @@ -334,6 +350,11 @@ def collect_int_from_space(spec: Any) -> int:
raise ValueError(f"Log-uniform range {spec!r} requires strictly positive bounds.")
midpoint = int(round(math.sqrt(low_int * high_int)))
return max(low_int, min(high_int, midpoint))
if isinstance(spec, tuple) and len(spec) == 3 and all(isinstance(v, int) for v in spec):
low, high, step = (int(v) for v in spec)
midpoint = (low + high) // 2
snapped = low + round((midpoint - low) / step) * step
return max(low, min(high, snapped))
Comment thread
scal444 marked this conversation as resolved.
if isinstance(spec, tuple) and len(spec) == 2:
low, high = spec
return (int(low) + int(high)) // 2
Expand Down
41 changes: 33 additions & 8 deletions nvmolkit/autotune/_ff_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from __future__ import annotations

import os
import re
from typing import Iterable, Optional

from rdkit.Chem import Mol
Expand All @@ -49,17 +50,37 @@ def coerce_gpu_ids(gpuIds: Optional[Iterable[int]]) -> list[int]:


def cpu_count() -> int:
"""Return the usable CPU core count, with a floor of 1."""
"""Return the usable physical CPU core count, with a floor of 1.

Falls back to ``os.cpu_count()`` (logical) if ``/proc/cpuinfo`` is missing.
Comment thread
scal444 marked this conversation as resolved.
"""
physical = _physical_cpu_count_from_proc()
if physical is not None:
return max(1, physical)
return max(1, os.cpu_count() or 1)


def _physical_cpu_count_from_proc() -> Optional[int]:
"""Return the number of distinct physical cores from ``/proc/cpuinfo``."""
try:
with open("/proc/cpuinfo") as cpuinfo:
text = cpuinfo.read()
except OSError:
return None
physical_ids = re.findall(r"^physical id\s*:\s*(\S+)", text, re.MULTILINE)
core_ids = re.findall(r"^core id\s*:\s*(\S+)", text, re.MULTILINE)
if not physical_ids or len(physical_ids) != len(core_ids):
return None
return len(set(zip(physical_ids, core_ids)))


def resolve_cpu_budget(cpu_budget: Optional[int]) -> int:
"""Return the effective CPU budget for autotune search ranges.

``cpu_budget`` lets callers cap total CPU usage explicitly (useful for
cross-machine normalization where the goal is to isolate GPU performance
from CPU-count differences). When ``None``, falls back to
:func:`cpu_count`. Values less than 1 are rejected.
from CPU-count differences). When ``None``, falls back to physical core
count via :func:`cpu_count`. Values less than 1 are rejected.
"""
if cpu_budget is None:
return cpu_count()
Expand Down Expand Up @@ -89,12 +110,16 @@ def resolve_num_gpus(fixed_gpu_ids: list[int]) -> int:
def default_ff_search_space(num_gpus: int, cpus: int) -> dict:
"""Return the default FF search space scaled to the active hardware.

``batchesPerGpu`` is per-GPU; its upper bound is capped at
``cpus // num_gpus`` so the total worker thread count across all GPUs
(``num_gpus * batchesPerGpu``) does not exceed ``cpus``.
``batchSize`` is a stepped integer range in multiples of 64 since the
underlying kernels are tile-tuned for those sizes; the stepped form
preserves numeric ordering for TPE (unlike a categorical list).
``batchesPerGpu`` is per-GPU and capped at ``min(8, cpus // num_gpus)``:
8 is the empirical point of diminishing returns for the batched-FF
dispatch, and the physical-core floor prevents oversubscribing CPU
coordinator threads across all GPUs.
"""
per_gpu_max = max(1, cpus // max(1, num_gpus))
per_gpu_max = max(1, min(8, cpus // max(1, num_gpus)))
return {
"batchSize": (32, 1500, "log"),
"batchSize": (64, 1024, 64),
"batchesPerGpu": (1, per_gpu_max),
}
13 changes: 8 additions & 5 deletions nvmolkit/autotune/tune_embed_molecules.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ def _default_embed_search_space(num_gpus: int, cpus: int) -> dict:
EmbedMolecules runs preprocessing and GPU-dispatch threads sequentially,
so each pool is capped independently:

* ``batchesPerGpu`` (per-GPU GPU-runner threads): max = ``cpus //
num_gpus``.
* ``preprocessingThreads`` (total CPU pool): max = ``cpus``.
* ``batchSize``: stepped int range in multiples of 64 (kernels are
tile-tuned for these sizes); stepping preserves numeric ordering for TPE.
* ``batchesPerGpu`` (per-GPU GPU-runner threads): max =
``min(8, cpus // num_gpus)``. 8 is the empirical point of diminishing
returns; the physical-core floor prevents oversubscribing across GPUs.
* ``preprocessingThreads`` (total CPU pool): max = ``cpus`` (physical).
"""
per_gpu_max = max(1, cpus // max(1, num_gpus))
per_gpu_max = max(1, min(8, cpus // max(1, num_gpus)))
return {
"batchSize": (64, 2000, "log"),
"batchSize": (64, 1024, 64),
"batchesPerGpu": (1, per_gpu_max),
"preprocessingThreads": (1, cpus),
}
Expand Down
12 changes: 8 additions & 4 deletions nvmolkit/autotune/tune_substructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ def _default_substruct_search_space(num_gpus: int, cpus: int) -> dict:
at trial sampling time by clamping ``preprocessingThreads`` to whatever
cores are left after ``workerThreads`` is chosen.

* ``workerThreads`` (per-GPU): max = ``cpus // num_gpus``.
* ``preprocessingThreads`` (total CPU pool): max = ``cpus`` (the
* ``batchSize``: stepped int range in multiples of 128 (kernels are
tile-tuned for these sizes); stepping preserves numeric ordering for TPE.
* ``workerThreads`` (per-GPU): max = ``min(8, cpus // num_gpus)``.
8 is the empirical point of diminishing returns; the physical-core
floor prevents oversubscribing across GPUs.
* ``preprocessingThreads`` (total CPU pool): max = ``cpus`` (physical;
effective per-trial cap is reduced once ``workerThreads`` is sampled).
"""
per_gpu_worker_max = max(1, cpus // max(1, num_gpus))
per_gpu_worker_max = max(1, min(8, cpus // max(1, num_gpus)))
return {
"batchSize": (128, 8192, "log"),
"batchSize": (128, 1024, 128),
"workerThreads": (1, per_gpu_worker_max),
"preprocessingThreads": (1, cpus),
}
Expand Down
69 changes: 57 additions & 12 deletions nvmolkit/tests/test_autotune.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,31 +256,43 @@ def test_shrink_halves_within_floor():


def test_default_ff_search_space_caps_batches_per_gpu_by_cpu_count():
"""``batchesPerGpu`` upper bound divides the CPU budget by the GPU count."""
"""``batchesPerGpu`` upper bound is ``min(8, cpus // num_gpus)``."""
space_1gpu = _ff_common.default_ff_search_space(num_gpus=1, cpus=32)
space_4gpu = _ff_common.default_ff_search_space(num_gpus=4, cpus=32)
space_64gpu = _ff_common.default_ff_search_space(num_gpus=64, cpus=32)

assert space_1gpu["batchesPerGpu"] == (1, 32)
assert space_1gpu["batchesPerGpu"] == (1, 8)
assert space_4gpu["batchesPerGpu"] == (1, 8)
assert space_64gpu["batchesPerGpu"] == (1, 1)


def test_default_ff_search_space_batch_size_is_stepped_multiples_of_64():
"""``batchSize`` is a stepped int range in multiples of 64."""
space = _ff_common.default_ff_search_space(num_gpus=1, cpus=32)
low, high, step = space["batchSize"]
assert step == 64
assert low % 64 == 0 and high % 64 == 0
assert low <= high


def test_default_substruct_search_space_caps_per_pool():
"""Substruct: workerThreads capped per-GPU, preprocessingThreads capped at cpus."""
"""Substruct: workerThreads capped at min(8, cpus/num_gpus), prep at cpus."""
space_1gpu = _default_substruct_search_space(num_gpus=1, cpus=16)
space_4gpu = _default_substruct_search_space(num_gpus=4, cpus=16)
space_64gpu = _default_substruct_search_space(num_gpus=64, cpus=16)

assert space_1gpu["workerThreads"] == (1, 16)
assert space_1gpu["workerThreads"] == (1, 8)
assert space_4gpu["workerThreads"] == (1, 4)
assert space_64gpu["workerThreads"] == (1, 1)

assert space_1gpu["preprocessingThreads"] == (1, 16)
assert space_4gpu["preprocessingThreads"] == (1, 16)
assert space_64gpu["preprocessingThreads"] == (1, 16)

assert space_1gpu["batchSize"] == (128, 8192, "log")
low, high, step = space_1gpu["batchSize"]
assert step % 64 == 0
assert low % step == 0 and high % step == 0
assert low <= high


class _RecordingTrial:
Expand All @@ -290,8 +302,8 @@ def __init__(self, picker=lambda low, high: low):
self.calls: list[dict] = []
self.picker = picker

def suggest_int(self, name: str, low: int, high: int, log: bool = False) -> int:
self.calls.append({"name": name, "low": low, "high": high, "log": log})
def suggest_int(self, name: str, low: int, high: int, log: bool = False, step: int = 1) -> int:
self.calls.append({"name": name, "low": low, "high": high, "log": log, "step": step})
return int(self.picker(low, high))


Expand All @@ -300,7 +312,7 @@ def test_suggest_preprocessing_threads_clamps_to_remaining_cpu_budget():
trial = _RecordingTrial()
spec = (1, 32)
value = _suggest_preprocessing_threads(trial, spec, worker_threads=6, num_gpus=2, cpus=16)
assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 4, "log": False}]
assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 4, "log": False, "step": 1}]
assert value == 1


Expand All @@ -309,7 +321,7 @@ def test_suggest_preprocessing_threads_respects_low_floor_when_budget_exhausted(
trial = _RecordingTrial(picker=lambda low, high: high)
spec = (4, 32)
value = _suggest_preprocessing_threads(trial, spec, worker_threads=8, num_gpus=2, cpus=16)
assert trial.calls == [{"name": "preprocessingThreads", "low": 4, "high": 4, "log": False}]
assert trial.calls == [{"name": "preprocessingThreads", "low": 4, "high": 4, "log": False, "step": 1}]
assert value == 4


Expand All @@ -318,7 +330,7 @@ def test_suggest_preprocessing_threads_does_not_clamp_when_budget_available():
trial = _RecordingTrial(picker=lambda low, high: high)
spec = (1, 8)
value = _suggest_preprocessing_threads(trial, spec, worker_threads=2, num_gpus=2, cpus=32)
assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 8, "log": False}]
assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 8, "log": False, "step": 1}]
assert value == 8


Expand All @@ -327,14 +339,24 @@ def test_suggest_preprocessing_threads_propagates_log_flag():
trial = _RecordingTrial()
spec = (1, 32, "log")
_suggest_preprocessing_threads(trial, spec, worker_threads=2, num_gpus=1, cpus=8)
assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 6, "log": True}]
assert trial.calls == [{"name": "preprocessingThreads", "low": 1, "high": 6, "log": True, "step": 1}]


def test_default_embed_search_space_caps_per_pool():
"""Embed: per-GPU pool capped at cpus/numGpus, total pool capped at cpus."""
"""Embed: per-GPU pool capped at min(8, cpus/numGpus), total prep pool at cpus."""
space = _default_embed_search_space(num_gpus=4, cpus=16)
assert space["batchesPerGpu"] == (1, 4)
assert space["preprocessingThreads"] == (1, 16)
low, high, step = space["batchSize"]
assert step == 64
assert low % 64 == 0 and high % 64 == 0
assert low <= high


def test_default_embed_search_space_caps_batches_per_gpu_at_eight():
"""When cpus/num_gpus exceeds 8, ``batchesPerGpu`` is still capped at 8."""
space = _default_embed_search_space(num_gpus=4, cpus=128)
assert space["batchesPerGpu"] == (1, 8)


def test_resolve_num_gpus_prefers_explicit_list():
Expand All @@ -349,6 +371,29 @@ def test_resolve_cpu_budget_falls_back_to_cpu_count(monkeypatch):
assert _ff_common.resolve_cpu_budget(None) == 24


def test_physical_cpu_count_from_proc_dedupes_smt_siblings(tmp_path, monkeypatch):
"""``_physical_cpu_count_from_proc`` collapses SMT siblings by ``(physical id, core id)``."""
fake_cpuinfo = (
"processor\t: 0\nphysical id\t: 0\ncore id\t: 0\n\n"
"processor\t: 1\nphysical id\t: 0\ncore id\t: 0\n\n"
"processor\t: 2\nphysical id\t: 0\ncore id\t: 1\n\n"
"processor\t: 3\nphysical id\t: 0\ncore id\t: 1\n\n"
"processor\t: 4\nphysical id\t: 1\ncore id\t: 0\n\n"
"processor\t: 5\nphysical id\t: 1\ncore id\t: 0\n\n"
)
fake_path = tmp_path / "cpuinfo"
fake_path.write_text(fake_cpuinfo)
real_open = open

def fake_open(path, *args, **kwargs):
if path == "/proc/cpuinfo":
return real_open(fake_path, *args, **kwargs)
return real_open(path, *args, **kwargs)

monkeypatch.setattr("builtins.open", fake_open)
assert _ff_common._physical_cpu_count_from_proc() == 3


def test_resolve_cpu_budget_uses_explicit_value():
"""An explicit ``cpu_budget`` overrides whatever the OS reports."""
assert _ff_common.resolve_cpu_budget(14) == 14
Expand Down
Loading