Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion benchmarks/bench_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@

from bench_utils.loaders import load_pickle, load_sdf, load_smarts, load_smiles
from bench_utils.molprep import clone_mols_with_conformers, embed_and_jitter, perturb_conformer, prep_mols
from bench_utils.timing import TimingResult, time_it
from bench_utils.timing import (
Deadline,
TimingResult,
add_rdkit_max_seconds_arg,
throughput_per_s,
time_it,
time_it_bounded,
)

__all__ = [
"Deadline",
"TimingResult",
"add_rdkit_max_seconds_arg",
"clone_mols_with_conformers",
"embed_and_jitter",
"load_pickle",
Expand All @@ -33,5 +42,7 @@
"load_smiles",
"perturb_conformer",
"prep_mols",
"throughput_per_s",
"time_it",
"time_it_bounded",
]
92 changes: 92 additions & 0 deletions benchmarks/bench_utils/timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

"""Shared timing utilities for nvMolKit benchmarks."""

import argparse
import statistics
import time
from dataclasses import dataclass, field
Expand Down Expand Up @@ -89,3 +90,94 @@ def sync() -> None:
times_ms.append((t1 - t0) * 1000.0)

return TimingResult(times_ms=times_ms)


class Deadline:
"""Wall-clock budget that benchmark loops can poll for early termination.

A ``max_seconds`` of ``0`` (or negative) disables the budget, in which
case :meth:`expired` always returns ``False``. Construction starts the
clock; pass the same instance to nested loops to share one deadline.
"""

def __init__(self, max_seconds: float) -> None:
self._end: float | None = time.perf_counter() + max_seconds if max_seconds > 0 else None

def expired(self) -> bool:
return self._end is not None and time.perf_counter() >= self._end

@property
def active(self) -> bool:
"""``True`` when a real budget is being enforced."""
return self._end is not None


def throughput_per_s(items: float, elapsed_ms: float) -> float:
"""Items per second from a millisecond count; ``NaN`` if ``elapsed_ms <= 0``."""
if elapsed_ms <= 0:
return float("nan")
return items / (elapsed_ms / 1000.0)


def time_it_bounded(
run: Callable[[Deadline], None],
runs: int,
max_seconds: float,
progress_getter: Callable[[], int],
progress_target: int,
) -> tuple[float, float, int]:
"""Repeat ``run`` up to ``runs`` times, stopping early on budget exhaustion.

A single :class:`Deadline` covering the whole call is constructed from
``max_seconds`` and passed to ``run`` on every invocation; the closure
must poll it inside its inner work loop to honour the budget mid-run.
After each invocation, ``progress_getter()`` reports how much of the
workload was actually completed; a value below ``progress_target`` is
treated as a partial run and further iterations are skipped.

Returns ``(avg_ms, std_ms, last_progress)``. ``avg`` and ``std`` are
computed only over runs that completed end-to-end; if no full run
finished, the single partial timing is returned with ``std=0``.
"""
deadline = Deadline(max_seconds)
completed_times_ms: list[float] = []
partial_time_ms: float | None = None
last_progress = 0
for _ in range(runs):
if deadline.expired():
break
start = time.perf_counter()
run(deadline)
elapsed_ms = (time.perf_counter() - start) * 1000.0
last_progress = progress_getter()
if last_progress < progress_target:
partial_time_ms = elapsed_ms
break
completed_times_ms.append(elapsed_ms)
if completed_times_ms:
avg_ms = statistics.mean(completed_times_ms)
std_ms = statistics.pstdev(completed_times_ms) if len(completed_times_ms) > 1 else 0.0
return avg_ms, std_ms, last_progress
if partial_time_ms is not None:
return partial_time_ms, 0.0, last_progress
return 0.0, 0.0, last_progress


def add_rdkit_max_seconds_arg(parser: argparse.ArgumentParser, *, extra_help: str = "") -> None:
"""Register the shared ``--rdkit_max_seconds`` CLI flag.

``extra_help`` is appended to the standard help string so individual
benchmarks can describe how partial-run semantics apply to their RDKit
code path (e.g. per-molecule vs. per-query truncation).
"""
base_help = (
"Stop the RDKit comparison after this many wall-clock seconds and "
"report throughput on the work actually completed. 0 disables the "
"cap and runs the full workload (default: 0)."
)
parser.add_argument(
"--rdkit_max_seconds",
type=float,
default=0.0,
help=f"{base_help} {extra_help}".rstrip(),
)
78 changes: 61 additions & 17 deletions benchmarks/etkdg_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@

import nvtx
from bench_utils import (
Deadline,
TimingResult,
add_rdkit_max_seconds_arg,
clone_mols_with_conformers,
load_pickle,
load_sdf,
load_smiles,
prep_mols,
throughput_per_s,
time_it,
)
from nvmolkit import autotune as nv_autotune
Expand Down Expand Up @@ -132,23 +135,38 @@ def bench_rdkit(
confs_per_mol: int,
runs: int,
warmup: bool,
) -> tuple[TimingResult, list[Chem.Mol]]:
"""Benchmark RDKit ``EmbedMultipleConfs``; return ``(timing, last_run_mols)``."""
max_seconds: float = 0.0,
) -> tuple[TimingResult, list[Chem.Mol], int]:
"""Benchmark RDKit ``EmbedMultipleConfs``; return ``(timing, processed_mols, processed_count)``.

When ``max_seconds > 0``, the inner loop stops processing molecules once
wall-clock elapsed exceeds the cap. The reported timing is over the
molecules actually processed; throughput is items / elapsed at the call
site. Cloned molecules that were never processed are omitted from the
returned list so downstream energy validation only sees comparable inputs.
"""
last_run_mols: list[list[Chem.Mol]] = [[]]
processed_count = [0]

@nvtx.annotate("etkdg_rdkit_run", color="yellow")
def run() -> None:
cloned = clone_mols_with_conformers(mols)
deadline = Deadline(max_seconds)
n_done = 0
for mol in cloned:
rdDistGeom.EmbedMultipleConfs(mol, numConfs=confs_per_mol, params=params)
last_run_mols[0] = cloned
n_done += 1
if deadline.expired():
break
last_run_mols[0] = cloned[:n_done]
processed_count[0] = n_done

if warmup:
warmup_mol = Chem.RWMol(mols[0])
rdDistGeom.EmbedMultipleConfs(warmup_mol, numConfs=1, params=params)

timing = time_it(run, runs=runs, warmups=0, gpu_sync=False)
return timing, last_run_mols[0]
return timing, last_run_mols[0], processed_count[0]


def _build_etkdg_params(max_iterations: int, num_threads: int, seed: int) -> rdDistGeom.EmbedParameters:
Expand Down Expand Up @@ -176,10 +194,11 @@ def _build_hardware_options(


CSV_HEADER = (
"method,input_file,input_type,num_mols,confs_per_mol,max_iterations,"
"method,input_file,input_type,num_mols,mols_processed,confs_per_mol,max_iterations,"
"batch_size,batches_per_gpu,prep_threads,num_gpus,nvmolkit_config_source,"
"rdkit_threads,time_ms,std_ms,conformers_generated,mean_energy_diff,median_energy_diff,"
"energy_diff_pairs"
"rdkit_threads,rdkit_max_seconds,time_ms,std_ms,conformers_generated,"
"confs_per_second,vs_rdkit_throughput_ratio,"
"mean_energy_diff,median_energy_diff,energy_diff_pairs"
)


Expand Down Expand Up @@ -219,6 +238,10 @@ def main() -> None:
default=1,
help="Threads passed to RDKit ETKDG via params.numThreads (default: 1)",
)
add_rdkit_max_seconds_arg(
parser,
extra_help="The RDKit ETKDG loop stops at the next molecule boundary once the budget is hit.",
)

parser.add_argument("--batch_size", "-b", type=int, default=1024, help="nvmolkit batch size (default: 1024)")
parser.add_argument(
Expand Down Expand Up @@ -455,10 +478,16 @@ def main() -> None:
except ImportError as exc:
print(f" nvmolkit: SKIPPED (import error: {exc})")

rdkit_processed_count = len(mols)
if not args.no_rdkit:
print("\nRunning RDKit ETKDG benchmark...")
rd_timing, rd_mols = bench_rdkit(mols, params, args.confs_per_mol, args.runs, args.warmup)
print(f" RDKit: {rd_timing.mean_ms:10.2f} ms (+/- {rd_timing.std_ms:.2f} ms)")
rd_timing, rd_mols, rdkit_processed_count = bench_rdkit(
mols, params, args.confs_per_mol, args.runs, args.warmup, max_seconds=args.rdkit_max_seconds
)
print(
f" RDKit: {rd_timing.mean_ms:10.2f} ms (+/- {rd_timing.std_ms:.2f} ms)"
f" [processed {rdkit_processed_count}/{len(mols)} mols]"
)
results["rdkit"] = (rd_timing, rd_mols)

if not results:
Expand All @@ -467,11 +496,16 @@ def main() -> None:

print("\n" + "=" * 70)
print("Summary:")
baseline_ms = results["rdkit"][0].mean_ms if "rdkit" in results else None
for name, (timing, _) in results.items():
rdkit_throughput_per_s: float | None = None
if "rdkit" in results and results["rdkit"][0].mean_ms > 0:
rdkit_throughput_per_s = throughput_per_s(
rdkit_processed_count * args.confs_per_mol, results["rdkit"][0].mean_ms
)
for name, (timing, run_mols) in results.items():
speedup = ""
if baseline_ms is not None and name != "rdkit" and timing.mean_ms > 0:
speedup = f", {baseline_ms / timing.mean_ms:.1f}x vs RDKit"
if rdkit_throughput_per_s is not None and name != "rdkit" and timing.mean_ms > 0:
method_throughput = throughput_per_s(len(mols) * args.confs_per_mol, timing.mean_ms)
speedup = f", {method_throughput / rdkit_throughput_per_s:.1f}x vs RDKit (throughput)"
print(f" {name:20s}: {timing.mean_ms:10.2f} ms (+/- {timing.std_ms:.2f} ms){speedup}")

energy_mean = float("nan")
Expand All @@ -493,21 +527,31 @@ def main() -> None:
csv_rows: list[str] = []
for name, (timing, run_mols) in results.items():
is_nv = name == "nvmolkit"
is_rdkit = name == "rdkit"
batch_size = applied_batch_size if is_nv else "N/A"
batches_per_gpu = applied_batches_per_gpu if is_nv else "N/A"
prep_threads = applied_prep_threads if is_nv else "N/A"
num_gpus = applied_num_gpus if is_nv else "N/A"
nvmolkit_config_source = config_source if is_nv else "N/A"
rdkit_threads = args.rdkit_threads if name == "rdkit" else "N/A"
rdkit_threads = args.rdkit_threads if is_rdkit else "N/A"
rdkit_max_seconds = args.rdkit_max_seconds if is_rdkit else "N/A"
mols_processed = rdkit_processed_count if is_rdkit else len(mols)
confs_generated = _conformer_count(run_mols)
confs_per_second = throughput_per_s(mols_processed * args.confs_per_mol, timing.mean_ms)
if rdkit_throughput_per_s is not None and not is_rdkit and timing.mean_ms > 0:
vs_rdkit_throughput_ratio = f"{confs_per_second / rdkit_throughput_per_s:.4f}"
else:
vs_rdkit_throughput_ratio = "N/A"
mean_diff = energy_mean if (diff_computed and is_nv) else "N/A"
median_diff = energy_median if (diff_computed and is_nv) else "N/A"
pairs = energy_pairs if (diff_computed and is_nv) else "N/A"
csv_rows.append(
f"{name},{input_file},{input_type},{len(mols)},{args.confs_per_mol},"
f"{name},{input_file},{input_type},{len(mols)},{mols_processed},{args.confs_per_mol},"
f"{args.max_iterations},{batch_size},{batches_per_gpu},{prep_threads},{num_gpus},"
f"{nvmolkit_config_source},{rdkit_threads},{timing.mean_ms:.2f},{timing.std_ms:.2f},"
f"{confs_generated},{mean_diff},{median_diff},{pairs}"
f"{nvmolkit_config_source},{rdkit_threads},{rdkit_max_seconds},"
f"{timing.mean_ms:.2f},{timing.std_ms:.2f},"
f"{confs_generated},{confs_per_second:.2f},{vs_rdkit_throughput_ratio},"
f"{mean_diff},{median_diff},{pairs}"
)

print("\n\nCSV Results:")
Expand Down
Loading
Loading