From e956c9105122cfa47b5532825bddd49a875d43a8 Mon Sep 17 00:00:00 2001 From: Farid Yagubbayli Date: Thu, 14 May 2026 14:16:05 +0000 Subject: [PATCH 1/9] Migrate Benchmark --- benchmarks/README.md | 64 +++++++++ benchmarks/__init__.py | 1 + benchmarks/benchmark.py | 297 ++++++++++++++++++++++++++++++++++++++++ tests/test_benchmark.py | 109 +++++++++++++++ 4 files changed, 471 insertions(+) create mode 100644 benchmarks/README.md create mode 100644 benchmarks/__init__.py create mode 100644 benchmarks/benchmark.py create mode 100644 tests/test_benchmark.py diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 00000000..9878f6a5 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,64 @@ +# Benchmarks + +This directory contains performance benchmarks for k-wave-python. These are not standard examples: they are intended to measure runtime and memory behavior and can become expensive as the grid size grows. + +## 3D Solver Scaling Benchmark + +`benchmark.py` ports MATLAB k-Wave's `benchmark.m`. It runs `kspaceFirstOrder` on a sequence of 3D grids with increasing sizes, averages runtime over repeated runs, and saves partial results after each run. + +The default benchmark uses: + +- heterogeneous absorbing medium +- smoothed initial pressure ball source +- binary sensor mask built from 100 Cartesian points on a sphere +- 1000 time steps +- 3 averages per grid size +- grid sizes based on MATLAB's original scale arrays, starting at `32 x 32 x 32` + +By default, this can run for a long time and may stop once memory limits are reached. + +## Usage + +Run a small smoke benchmark: + +```bash +uv run benchmarks/benchmark.py --max-cases 1 --num-averages 1 --number-time-points 20 +``` + +Run the default CPU benchmark: + +```bash +uv run benchmarks/benchmark.py +``` + +Run with single-precision arrays: + +```bash +uv run benchmarks/benchmark.py --data-cast single +``` + +Run on the Python GPU backend: + +```bash +uv run benchmarks/benchmark.py --device gpu +``` + +Choose an output file: + +```bash +uv run benchmarks/benchmark.py --output-path benchmark_data.json +``` + +## Output + +The benchmark writes a JSON file containing: + +- `comp_size`: total grid points for each completed grid size +- `comp_time`: rolling average elapsed seconds for each grid size +- `options`: benchmark settings and environment metadata +- `output_path`: path to the JSON output file +- `error_reached`: whether the benchmark stopped after an exception +- `error_message`: exception message, if any +- `mem_usage`: optional process peak memory estimate when `--report-mem-usage` is set + +Partial results are saved after each run so completed timings are preserved if a later grid fails. diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py new file mode 100644 index 00000000..522d2cce --- /dev/null +++ b/benchmarks/__init__.py @@ -0,0 +1 @@ +"""Performance benchmarks ported from MATLAB k-Wave.""" diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py new file mode 100644 index 00000000..d59550c3 --- /dev/null +++ b/benchmarks/benchmark.py @@ -0,0 +1,297 @@ +""" +k-Wave 3D Performance Benchmark + +Ported from: k-Wave/benchmark.m + +Runs a sequence of 3D initial-value simulations with increasing grid sizes and +records average execution time for each grid. +""" + +from __future__ import annotations + +import argparse +import json +import platform +import sys +from dataclasses import asdict, dataclass +from datetime import datetime +from pathlib import Path +from time import perf_counter +from typing import Any, Callable + +import numpy as np + +import kwave +from kwave.data import Vector +from kwave.kgrid import kWaveGrid +from kwave.kmedium import kWaveMedium +from kwave.ksensor import kSensor +from kwave.ksource import kSource +from kwave.kspaceFirstOrder import kspaceFirstOrder +from kwave.utils.conversion import cart2grid +from kwave.utils.filters import smooth +from kwave.utils.mapgen import make_ball, make_cart_sphere + + +@dataclass(frozen=True) +class BenchmarkOptions: + data_cast: str = "off" + heterogeneous_media: bool = True + absorbing_media: bool = True + nonlinear_media: bool = False + binary_sensor_mask: bool = True + number_sensor_points: int = 100 + number_time_points: int = 1000 + num_averages: int = 3 + start_size: int = 32 + x_scale_array: tuple[int, ...] = (1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16, 16) + y_scale_array: tuple[int, ...] = (1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16) + z_scale_array: tuple[int, ...] = (1, 1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8) + domain_size: float = 22e-3 + sensor_radius: float = 10e-3 + pml_size: int = 10 + pml_inside: bool = True + report_mem_usage: bool = False + + def __post_init__(self) -> None: + if self.data_cast not in {"off", "single"}: + raise ValueError("data_cast must be 'off' or 'single'. Use device='gpu' to run on a GPU.") + if not (len(self.x_scale_array) == len(self.y_scale_array) == len(self.z_scale_array)): + raise ValueError("scale arrays must have the same length") + if self.number_time_points <= 0 or self.num_averages <= 0: + raise ValueError("number_time_points and num_averages must be positive") + if self.number_sensor_points <= 1: + raise ValueError("number_sensor_points must be greater than 1") + + @property + def dtype(self) -> type[np.floating[Any]]: + return np.float32 if self.data_cast == "single" else np.float64 + + +def grid_sizes(options: BenchmarkOptions = BenchmarkOptions()) -> list[tuple[int, int, int, int]]: + return [ + ( + options.start_size * xscale, + options.start_size * yscale, + options.start_size * zscale, + min(xscale, yscale, zscale), + ) + for xscale, yscale, zscale in zip(options.x_scale_array, options.y_scale_array, options.z_scale_array) + ] + + +def build_case(options: BenchmarkOptions, nx: int, ny: int, nz: int, scale: int) -> tuple[kWaveGrid, kWaveMedium, kSource, kSensor]: + dtype = options.dtype + dx = options.domain_size / nx + dy = options.domain_size / ny + dz = options.domain_size / nz + kgrid = kWaveGrid(Vector([nx, ny, nz]), Vector([dx, dy, dz])) + + c0 = dtype(1500) + rho0 = dtype(1000) + alpha_coeff = dtype(0.75) + alpha_power = dtype(1.5) + bon_a = dtype(6) + + if options.heterogeneous_media: + sound_speed = c0 * np.ones((nx, ny, nz), dtype=dtype) + sound_speed[: nx // 4, :, :] = c0 * dtype(1.2) + density = rho0 * np.ones((nx, ny, nz), dtype=dtype) + density[:, max(ny // 4 - 1, 0) :, :] = rho0 * dtype(1.2) + else: + sound_speed = np.array(c0, dtype=dtype) + density = np.array(rho0, dtype=dtype) + + medium = kWaveMedium(sound_speed=sound_speed, density=density) + if options.absorbing_media: + medium.alpha_coeff = alpha_coeff + medium.alpha_power = alpha_power + if options.nonlinear_media: + medium.BonA = bon_a + + source = kSource() + source.p0 = dtype(10) * make_ball(Vector([nx, ny, nz]), Vector([nx // 2, ny // 2, nz // 2]), 2 * scale) + source.p0 = smooth(source.p0.astype(dtype, copy=False), restore_max=True).astype(dtype, copy=False) + + sensor_mask = make_cart_sphere(options.sensor_radius, options.number_sensor_points) + if options.binary_sensor_mask: + sensor_mask, _, _ = cart2grid(kgrid, sensor_mask, order="C") + sensor_mask = sensor_mask.astype(bool) + sensor = kSensor(mask=sensor_mask) + sensor.record = ["p"] + + kgrid.makeTime(np.max(np.asarray(medium.sound_speed))) + kgrid.setTime(options.number_time_points, kgrid.dt) + + return kgrid, medium, source, sensor + + +def default_output_path(options: BenchmarkOptions) -> Path: + computer_name = platform.node() or "unknown-computer" + date = datetime.now().strftime("%Y%m%d-%H%M%S") + return Path(f"benchmark_data-{computer_name}-{options.data_cast}-{date}.json") + + +def run( + options: BenchmarkOptions = BenchmarkOptions(), + *, + backend: str = "python", + device: str = "cpu", + max_cases: int | None = None, + output_path: str | Path | None = None, + quiet: bool = True, + solver: Callable[..., Any] | None = None, + timer: Callable[[], float] = perf_counter, +) -> dict[str, Any]: + solver = kspaceFirstOrder if solver is None else solver + cases = grid_sizes(options) + if max_cases is not None: + if max_cases <= 0: + raise ValueError("max_cases must be positive") + cases = cases[:max_cases] + + path = default_output_path(options) if output_path is None else Path(output_path) + result: dict[str, Any] = { + "comp_size": [], + "comp_time": [], + "options": _options_payload(options, backend, device), + "output_path": str(path), + "error_reached": False, + "error_message": "", + } + if options.report_mem_usage: + result["mem_usage"] = [] + + for nx, ny, nz, scale in cases: + loop_time = 0.0 + loop_mem_usage = 0.0 + try: + kgrid, medium, source, sensor = build_case(options, nx, ny, nz, scale) + for loop_num in range(1, options.num_averages + 1): + start = timer() + solver( + kgrid, + medium, + source, + sensor, + backend=backend, + device=device, + quiet=quiet, + pml_size=options.pml_size, + pml_inside=options.pml_inside, + smooth_p0=False, + ) + elapsed_time = timer() - start + loop_time = _rolling_average(loop_time, elapsed_time, loop_num) + if options.report_mem_usage: + loop_mem_usage = _rolling_average(loop_mem_usage, _peak_memory_bytes(), loop_num) + _store_case_result(result, nx * ny * nz, loop_time, loop_mem_usage, options.report_mem_usage) + _save_results(path, result) + except Exception as exc: + result["error_reached"] = True + result["error_message"] = str(exc) + _save_results(path, result) + break + + return result + + +def _rolling_average(previous_average: float, new_value: float, count: int) -> float: + return (previous_average * (count - 1) + new_value) / count + + +def _store_case_result(result: dict[str, Any], comp_size: int, comp_time: float, mem_usage: float, report_mem_usage: bool) -> None: + if len(result["comp_size"]) == 0 or result["comp_size"][-1] != comp_size: + result["comp_size"].append(comp_size) + result["comp_time"].append(comp_time) + if report_mem_usage: + result["mem_usage"].append(mem_usage) + return + + result["comp_time"][-1] = comp_time + if report_mem_usage: + result["mem_usage"][-1] = mem_usage + + +def _save_results(path: Path, result: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "comp_size": [int(size) for size in result["comp_size"]], + "comp_time": [float(time) for time in result["comp_time"]], + "options": result["options"], + "output_path": result["output_path"], + "error_reached": bool(result["error_reached"]), + "error_message": result["error_message"], + } + if "mem_usage" in result: + payload["mem_usage"] = [float(usage) for usage in result["mem_usage"]] + path.write_text(json.dumps(payload, indent=2) + "\n") + + +def _options_payload(options: BenchmarkOptions, backend: str, device: str) -> dict[str, Any]: + payload = asdict(options) + payload.update( + { + "backend": backend, + "device": device, + "computer_name": platform.node(), + "python_version": platform.python_version(), + "platform": platform.platform(), + "kwave_python_version": kwave.__version__, + } + ) + return payload + + +def _peak_memory_bytes() -> float: + try: + import resource + except ImportError: + return float("nan") + + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system().lower() == "darwin": + return float(usage) + return float(usage * 1024) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run the k-Wave 3D performance benchmark.") + parser.add_argument("--data-cast", choices=("off", "single"), default="off") + parser.add_argument("--backend", choices=("python", "cpp"), default="python") + parser.add_argument("--device", choices=("cpu", "gpu"), default="cpu") + parser.add_argument("--max-cases", type=int, default=None) + parser.add_argument("--num-averages", type=int, default=3) + parser.add_argument("--number-time-points", type=int, default=1000) + parser.add_argument("--output-path", type=Path, default=None) + parser.add_argument("--report-mem-usage", action="store_true") + parser.add_argument("--verbose", action="store_true") + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + benchmark_options = BenchmarkOptions( + data_cast=args.data_cast, + num_averages=args.num_averages, + number_time_points=args.number_time_points, + report_mem_usage=args.report_mem_usage, + ) + result = run( + benchmark_options, + backend=args.backend, + device=args.device, + max_cases=args.max_cases, + output_path=args.output_path, + quiet=not args.verbose, + ) + print(f"Benchmark results saved to {result['output_path']}") + if result["error_reached"]: + print("Memory limit reached or error encountered, exiting benchmark. Error message:", file=sys.stderr) + print(f" {result['error_message']}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py new file mode 100644 index 00000000..37c4a0ed --- /dev/null +++ b/tests/test_benchmark.py @@ -0,0 +1,109 @@ +import json +from pathlib import Path + +import numpy as np +import pytest + +from benchmarks.benchmark import BenchmarkOptions, build_case, grid_sizes, run + + +def small_options(**overrides): + values = { + "start_size": 8, + "x_scale_array": (1,), + "y_scale_array": (1,), + "z_scale_array": (1,), + "number_sensor_points": 8, + "number_time_points": 5, + "num_averages": 1, + "sensor_radius": 1e-3, + } + values.update(overrides) + return BenchmarkOptions(**values) + + +def test_grid_sizes_match_matlab_scale_arrays(): + sizes = grid_sizes(BenchmarkOptions()) + + assert sizes[0] == (32, 32, 32, 1) + assert sizes[-1] == (512, 512, 256, 8) + assert len(sizes) == 12 + + +def test_build_case_matches_benchmark_defaults_for_small_grid(): + options = small_options() + + kgrid, medium, source, sensor = build_case(options, 8, 8, 8, 1) + + assert tuple(kgrid.N) == (8, 8, 8) + assert np.isclose(kgrid.dx, options.domain_size / 8) + assert kgrid.Nt == options.number_time_points + assert medium.sound_speed.shape == (8, 8, 8) + assert np.all(medium.sound_speed[:2, :, :] == 1800) + assert np.all(medium.sound_speed[2:, :, :] == 1500) + assert np.all(medium.density[:, :1, :] == 1000) + assert np.all(medium.density[:, 1:, :] == 1200) + assert medium.alpha_coeff == pytest.approx(0.75) + assert medium.alpha_power == pytest.approx(1.5) + assert source.p0.shape == (8, 8, 8) + assert np.max(source.p0) == pytest.approx(10) + assert sensor.mask.shape == (8, 8, 8) + assert sensor.mask.dtype == bool + assert 0 < np.count_nonzero(sensor.mask) <= options.number_sensor_points + assert sensor.record == ["p"] + + +def test_single_data_cast_uses_float32_arrays(): + options = small_options(data_cast="single") + + _, medium, source, _ = build_case(options, 8, 8, 8, 1) + + assert medium.sound_speed.dtype == np.float32 + assert medium.density.dtype == np.float32 + assert source.p0.dtype == np.float32 + + +def test_run_aggregates_timings_and_saves_json_file(tmp_path: Path): + options = small_options(num_averages=2) + output_path = tmp_path / "benchmark.json" + times = iter([0.0, 1.0, 1.0, 3.0]) + calls = [] + + def fake_solver(kgrid, medium, source, sensor, **kwargs): + calls.append((kgrid, medium, source, sensor, kwargs)) + return {"p": np.zeros((1, 1))} + + result = run(options, max_cases=1, output_path=output_path, solver=fake_solver, timer=lambda: next(times)) + + assert len(calls) == 2 + assert calls[0][4]["pml_size"] == options.pml_size + assert calls[0][4]["pml_inside"] is True + assert calls[0][4]["smooth_p0"] is False + assert result["comp_size"] == [8 * 8 * 8] + assert result["comp_time"] == [pytest.approx(1.5)] + assert result["output_path"] == str(output_path) + assert result["error_reached"] is False + assert output_path.exists() + + saved = json.loads(output_path.read_text()) + assert saved["comp_size"] == [8 * 8 * 8] + assert saved["comp_time"] == [pytest.approx(1.5)] + assert saved["output_path"] == str(output_path) + assert saved["options"]["start_size"] == 8 + + +def test_run_saves_partial_results_after_solver_error(tmp_path: Path): + options = small_options() + output_path = tmp_path / "benchmark.json" + + def failing_solver(*args, **kwargs): + raise RuntimeError("solver failed") + + result = run(options, max_cases=1, output_path=output_path, solver=failing_solver) + + assert result["error_reached"] is True + assert result["error_message"] == "solver failed" + assert output_path.exists() + saved = json.loads(output_path.read_text()) + assert saved["error_reached"] is True + assert saved["error_message"] == "solver failed" From 968b5f8f00fc148b9bca19c973144abf6a987ef1 Mon Sep 17 00:00:00 2001 From: Farid Yagubbayli Date: Thu, 14 May 2026 14:31:48 +0000 Subject: [PATCH 2/9] helpers --- benchmarks/benchmark.py | 196 ++++------------------------------------ benchmarks/helpers.py | 178 ++++++++++++++++++++++++++++++++++++ 2 files changed, 195 insertions(+), 179 deletions(-) create mode 100644 benchmarks/helpers.py diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index d59550c3..23ec6f5e 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -10,126 +10,23 @@ from __future__ import annotations import argparse -import json -import platform import sys -from dataclasses import asdict, dataclass -from datetime import datetime from pathlib import Path from time import perf_counter from typing import Any, Callable -import numpy as np - -import kwave -from kwave.data import Vector -from kwave.kgrid import kWaveGrid -from kwave.kmedium import kWaveMedium -from kwave.ksensor import kSensor -from kwave.ksource import kSource +from benchmarks.helpers import ( + BenchmarkOptions, + build_case, + default_output_path, + grid_sizes, + options_payload, + peak_memory_bytes, + rolling_average, + save_results, + store_case_result, +) from kwave.kspaceFirstOrder import kspaceFirstOrder -from kwave.utils.conversion import cart2grid -from kwave.utils.filters import smooth -from kwave.utils.mapgen import make_ball, make_cart_sphere - - -@dataclass(frozen=True) -class BenchmarkOptions: - data_cast: str = "off" - heterogeneous_media: bool = True - absorbing_media: bool = True - nonlinear_media: bool = False - binary_sensor_mask: bool = True - number_sensor_points: int = 100 - number_time_points: int = 1000 - num_averages: int = 3 - start_size: int = 32 - x_scale_array: tuple[int, ...] = (1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16, 16) - y_scale_array: tuple[int, ...] = (1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16) - z_scale_array: tuple[int, ...] = (1, 1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8) - domain_size: float = 22e-3 - sensor_radius: float = 10e-3 - pml_size: int = 10 - pml_inside: bool = True - report_mem_usage: bool = False - - def __post_init__(self) -> None: - if self.data_cast not in {"off", "single"}: - raise ValueError("data_cast must be 'off' or 'single'. Use device='gpu' to run on a GPU.") - if not (len(self.x_scale_array) == len(self.y_scale_array) == len(self.z_scale_array)): - raise ValueError("scale arrays must have the same length") - if self.number_time_points <= 0 or self.num_averages <= 0: - raise ValueError("number_time_points and num_averages must be positive") - if self.number_sensor_points <= 1: - raise ValueError("number_sensor_points must be greater than 1") - - @property - def dtype(self) -> type[np.floating[Any]]: - return np.float32 if self.data_cast == "single" else np.float64 - - -def grid_sizes(options: BenchmarkOptions = BenchmarkOptions()) -> list[tuple[int, int, int, int]]: - return [ - ( - options.start_size * xscale, - options.start_size * yscale, - options.start_size * zscale, - min(xscale, yscale, zscale), - ) - for xscale, yscale, zscale in zip(options.x_scale_array, options.y_scale_array, options.z_scale_array) - ] - - -def build_case(options: BenchmarkOptions, nx: int, ny: int, nz: int, scale: int) -> tuple[kWaveGrid, kWaveMedium, kSource, kSensor]: - dtype = options.dtype - dx = options.domain_size / nx - dy = options.domain_size / ny - dz = options.domain_size / nz - kgrid = kWaveGrid(Vector([nx, ny, nz]), Vector([dx, dy, dz])) - - c0 = dtype(1500) - rho0 = dtype(1000) - alpha_coeff = dtype(0.75) - alpha_power = dtype(1.5) - bon_a = dtype(6) - - if options.heterogeneous_media: - sound_speed = c0 * np.ones((nx, ny, nz), dtype=dtype) - sound_speed[: nx // 4, :, :] = c0 * dtype(1.2) - density = rho0 * np.ones((nx, ny, nz), dtype=dtype) - density[:, max(ny // 4 - 1, 0) :, :] = rho0 * dtype(1.2) - else: - sound_speed = np.array(c0, dtype=dtype) - density = np.array(rho0, dtype=dtype) - - medium = kWaveMedium(sound_speed=sound_speed, density=density) - if options.absorbing_media: - medium.alpha_coeff = alpha_coeff - medium.alpha_power = alpha_power - if options.nonlinear_media: - medium.BonA = bon_a - - source = kSource() - source.p0 = dtype(10) * make_ball(Vector([nx, ny, nz]), Vector([nx // 2, ny // 2, nz // 2]), 2 * scale) - source.p0 = smooth(source.p0.astype(dtype, copy=False), restore_max=True).astype(dtype, copy=False) - - sensor_mask = make_cart_sphere(options.sensor_radius, options.number_sensor_points) - if options.binary_sensor_mask: - sensor_mask, _, _ = cart2grid(kgrid, sensor_mask, order="C") - sensor_mask = sensor_mask.astype(bool) - sensor = kSensor(mask=sensor_mask) - sensor.record = ["p"] - - kgrid.makeTime(np.max(np.asarray(medium.sound_speed))) - kgrid.setTime(options.number_time_points, kgrid.dt) - - return kgrid, medium, source, sensor - - -def default_output_path(options: BenchmarkOptions) -> Path: - computer_name = platform.node() or "unknown-computer" - date = datetime.now().strftime("%Y%m%d-%H%M%S") - return Path(f"benchmark_data-{computer_name}-{options.data_cast}-{date}.json") def run( @@ -154,7 +51,7 @@ def run( result: dict[str, Any] = { "comp_size": [], "comp_time": [], - "options": _options_payload(options, backend, device), + "options": options_payload(options, backend, device), "output_path": str(path), "error_reached": False, "error_message": "", @@ -182,79 +79,20 @@ def run( smooth_p0=False, ) elapsed_time = timer() - start - loop_time = _rolling_average(loop_time, elapsed_time, loop_num) + loop_time = rolling_average(loop_time, elapsed_time, loop_num) if options.report_mem_usage: - loop_mem_usage = _rolling_average(loop_mem_usage, _peak_memory_bytes(), loop_num) - _store_case_result(result, nx * ny * nz, loop_time, loop_mem_usage, options.report_mem_usage) - _save_results(path, result) + loop_mem_usage = rolling_average(loop_mem_usage, peak_memory_bytes(), loop_num) + store_case_result(result, nx * ny * nz, loop_time, loop_mem_usage, options.report_mem_usage) + save_results(path, result) except Exception as exc: result["error_reached"] = True result["error_message"] = str(exc) - _save_results(path, result) + save_results(path, result) break return result -def _rolling_average(previous_average: float, new_value: float, count: int) -> float: - return (previous_average * (count - 1) + new_value) / count - - -def _store_case_result(result: dict[str, Any], comp_size: int, comp_time: float, mem_usage: float, report_mem_usage: bool) -> None: - if len(result["comp_size"]) == 0 or result["comp_size"][-1] != comp_size: - result["comp_size"].append(comp_size) - result["comp_time"].append(comp_time) - if report_mem_usage: - result["mem_usage"].append(mem_usage) - return - - result["comp_time"][-1] = comp_time - if report_mem_usage: - result["mem_usage"][-1] = mem_usage - - -def _save_results(path: Path, result: dict[str, Any]) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - payload = { - "comp_size": [int(size) for size in result["comp_size"]], - "comp_time": [float(time) for time in result["comp_time"]], - "options": result["options"], - "output_path": result["output_path"], - "error_reached": bool(result["error_reached"]), - "error_message": result["error_message"], - } - if "mem_usage" in result: - payload["mem_usage"] = [float(usage) for usage in result["mem_usage"]] - path.write_text(json.dumps(payload, indent=2) + "\n") - - -def _options_payload(options: BenchmarkOptions, backend: str, device: str) -> dict[str, Any]: - payload = asdict(options) - payload.update( - { - "backend": backend, - "device": device, - "computer_name": platform.node(), - "python_version": platform.python_version(), - "platform": platform.platform(), - "kwave_python_version": kwave.__version__, - } - ) - return payload - - -def _peak_memory_bytes() -> float: - try: - import resource - except ImportError: - return float("nan") - - usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - if platform.system().lower() == "darwin": - return float(usage) - return float(usage * 1024) - - def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run the k-Wave 3D performance benchmark.") parser.add_argument("--data-cast", choices=("off", "single"), default="off") diff --git a/benchmarks/helpers.py b/benchmarks/helpers.py new file mode 100644 index 00000000..1843bc5a --- /dev/null +++ b/benchmarks/helpers.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import json +import platform +from dataclasses import asdict, dataclass +from datetime import datetime +from pathlib import Path +from typing import Any + +import numpy as np + +import kwave +from kwave.data import Vector +from kwave.kgrid import kWaveGrid +from kwave.kmedium import kWaveMedium +from kwave.ksensor import kSensor +from kwave.ksource import kSource +from kwave.utils.conversion import cart2grid +from kwave.utils.filters import smooth +from kwave.utils.mapgen import make_ball, make_cart_sphere + + +@dataclass(frozen=True) +class BenchmarkOptions: + data_cast: str = "off" + heterogeneous_media: bool = True + absorbing_media: bool = True + nonlinear_media: bool = False + binary_sensor_mask: bool = True + number_sensor_points: int = 100 + number_time_points: int = 1000 + num_averages: int = 3 + start_size: int = 32 + x_scale_array: tuple[int, ...] = (1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16, 16) + y_scale_array: tuple[int, ...] = (1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16) + z_scale_array: tuple[int, ...] = (1, 1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8) + domain_size: float = 22e-3 + sensor_radius: float = 10e-3 + pml_size: int = 10 + pml_inside: bool = True + report_mem_usage: bool = False + + def __post_init__(self) -> None: + if self.data_cast not in {"off", "single"}: + raise ValueError("data_cast must be 'off' or 'single'. Use device='gpu' to run on a GPU.") + if not (len(self.x_scale_array) == len(self.y_scale_array) == len(self.z_scale_array)): + raise ValueError("scale arrays must have the same length") + if self.number_time_points <= 0 or self.num_averages <= 0: + raise ValueError("number_time_points and num_averages must be positive") + if self.number_sensor_points <= 1: + raise ValueError("number_sensor_points must be greater than 1") + + @property + def dtype(self) -> type[np.floating[Any]]: + return np.float32 if self.data_cast == "single" else np.float64 + + +def grid_sizes(options: BenchmarkOptions = BenchmarkOptions()) -> list[tuple[int, int, int, int]]: + return [ + ( + options.start_size * xscale, + options.start_size * yscale, + options.start_size * zscale, + min(xscale, yscale, zscale), + ) + for xscale, yscale, zscale in zip(options.x_scale_array, options.y_scale_array, options.z_scale_array) + ] + + +def build_case(options: BenchmarkOptions, nx: int, ny: int, nz: int, scale: int) -> tuple[kWaveGrid, kWaveMedium, kSource, kSensor]: + dtype = options.dtype + dx = options.domain_size / nx + dy = options.domain_size / ny + dz = options.domain_size / nz + kgrid = kWaveGrid(Vector([nx, ny, nz]), Vector([dx, dy, dz])) + + c0 = dtype(1500) + rho0 = dtype(1000) + alpha_coeff = dtype(0.75) + alpha_power = dtype(1.5) + bon_a = dtype(6) + + if options.heterogeneous_media: + sound_speed = c0 * np.ones((nx, ny, nz), dtype=dtype) + sound_speed[: nx // 4, :, :] = c0 * dtype(1.2) + density = rho0 * np.ones((nx, ny, nz), dtype=dtype) + density[:, max(ny // 4 - 1, 0) :, :] = rho0 * dtype(1.2) + else: + sound_speed = np.array(c0, dtype=dtype) + density = np.array(rho0, dtype=dtype) + + medium = kWaveMedium(sound_speed=sound_speed, density=density) + if options.absorbing_media: + medium.alpha_coeff = alpha_coeff + medium.alpha_power = alpha_power + if options.nonlinear_media: + medium.BonA = bon_a + + source = kSource() + source.p0 = dtype(10) * make_ball(Vector([nx, ny, nz]), Vector([nx // 2, ny // 2, nz // 2]), 2 * scale) + source.p0 = smooth(source.p0.astype(dtype, copy=False), restore_max=True).astype(dtype, copy=False) + + sensor_mask = make_cart_sphere(options.sensor_radius, options.number_sensor_points) + if options.binary_sensor_mask: + sensor_mask, _, _ = cart2grid(kgrid, sensor_mask, order="C") + sensor_mask = sensor_mask.astype(bool) + sensor = kSensor(mask=sensor_mask) + sensor.record = ["p"] + + kgrid.makeTime(np.max(np.asarray(medium.sound_speed))) + kgrid.setTime(options.number_time_points, kgrid.dt) + + return kgrid, medium, source, sensor + + +def default_output_path(options: BenchmarkOptions) -> Path: + computer_name = platform.node() or "unknown-computer" + date = datetime.now().strftime("%Y%m%d-%H%M%S") + return Path(f"benchmark_data-{computer_name}-{options.data_cast}-{date}.json") + + +def rolling_average(previous_average: float, new_value: float, count: int) -> float: + return (previous_average * (count - 1) + new_value) / count + + +def store_case_result(result: dict[str, Any], comp_size: int, comp_time: float, mem_usage: float, report_mem_usage: bool) -> None: + if len(result["comp_size"]) == 0 or result["comp_size"][-1] != comp_size: + result["comp_size"].append(comp_size) + result["comp_time"].append(comp_time) + if report_mem_usage: + result["mem_usage"].append(mem_usage) + return + + result["comp_time"][-1] = comp_time + if report_mem_usage: + result["mem_usage"][-1] = mem_usage + + +def save_results(path: Path, result: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "comp_size": [int(size) for size in result["comp_size"]], + "comp_time": [float(time) for time in result["comp_time"]], + "options": result["options"], + "output_path": result["output_path"], + "error_reached": bool(result["error_reached"]), + "error_message": result["error_message"], + } + if "mem_usage" in result: + payload["mem_usage"] = [float(usage) for usage in result["mem_usage"]] + path.write_text(json.dumps(payload, indent=2) + "\n") + + +def options_payload(options: BenchmarkOptions, backend: str, device: str) -> dict[str, Any]: + payload = asdict(options) + payload.update( + { + "backend": backend, + "device": device, + "computer_name": platform.node(), + "python_version": platform.python_version(), + "platform": platform.platform(), + "kwave_python_version": kwave.__version__, + } + ) + return payload + + +def peak_memory_bytes() -> float: + try: + import resource + except ImportError: + return float("nan") + + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system().lower() == "darwin": + return float(usage) + return float(usage * 1024) From 6abd4ed37db9a11e34ccaa32152248b25e048035 Mon Sep 17 00:00:00 2001 From: Farid Yagubbayli Date: Thu, 4 Jun 2026 14:24:53 +0000 Subject: [PATCH 3/9] Fix benchmark review issues. Validate benchmark inputs and memory reporting so generated results stay complete and JSON-compatible. --- benchmarks/benchmark.py | 60 +++++++++++------ benchmarks/helpers.py | 138 +++++++++++++++++++++++++++++++++++----- tests/test_benchmark.py | 68 ++++++++++++++++++++ 3 files changed, 233 insertions(+), 33 deletions(-) diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index 23ec6f5e..d20e4dbb 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -17,6 +17,7 @@ from benchmarks.helpers import ( BenchmarkOptions, + PeakMemorySampler, build_case, default_output_path, grid_sizes, @@ -25,6 +26,7 @@ rolling_average, save_results, store_case_result, + validate_memory_bytes, ) from kwave.kspaceFirstOrder import kspaceFirstOrder @@ -39,6 +41,8 @@ def run( quiet: bool = True, solver: Callable[..., Any] | None = None, timer: Callable[[], float] = perf_counter, + memory_reader: Callable[[], float] = peak_memory_bytes, + memory_sampling_interval: float = 0.05, ) -> dict[str, Any]: solver = kspaceFirstOrder if solver is None else solver cases = grid_sizes(options) @@ -57,32 +61,52 @@ def run( "error_message": "", } if options.report_mem_usage: + try: + validate_memory_bytes(memory_reader()) + except Exception as exc: + raise ValueError("report_mem_usage is not supported on this platform") from exc result["mem_usage"] = [] - for nx, ny, nz, scale in cases: + for case_index, (nx, ny, nz, scale) in enumerate(cases): loop_time = 0.0 loop_mem_usage = 0.0 try: kgrid, medium, source, sensor = build_case(options, nx, ny, nz, scale) for loop_num in range(1, options.num_averages + 1): - start = timer() - solver( - kgrid, - medium, - source, - sensor, - backend=backend, - device=device, - quiet=quiet, - pml_size=options.pml_size, - pml_inside=options.pml_inside, - smooth_p0=False, - ) - elapsed_time = timer() - start - loop_time = rolling_average(loop_time, elapsed_time, loop_num) if options.report_mem_usage: - loop_mem_usage = rolling_average(loop_mem_usage, peak_memory_bytes(), loop_num) - store_case_result(result, nx * ny * nz, loop_time, loop_mem_usage, options.report_mem_usage) + with PeakMemorySampler(memory_reader, memory_sampling_interval) as memory_sampler: + start = timer() + solver( + kgrid, + medium, + source, + sensor, + backend=backend, + device=device, + quiet=quiet, + pml_size=options.pml_size, + pml_inside=options.pml_inside, + smooth_p0=False, + ) + elapsed_time = timer() - start + loop_mem_usage = rolling_average(loop_mem_usage, memory_sampler.peak_bytes, loop_num) + else: + start = timer() + solver( + kgrid, + medium, + source, + sensor, + backend=backend, + device=device, + quiet=quiet, + pml_size=options.pml_size, + pml_inside=options.pml_inside, + smooth_p0=False, + ) + elapsed_time = timer() - start + loop_time = rolling_average(loop_time, elapsed_time, loop_num) + store_case_result(result, case_index, nx * ny * nz, loop_time, loop_mem_usage, options.report_mem_usage) save_results(path, result) except Exception as exc: result["error_reached"] = True diff --git a/benchmarks/helpers.py b/benchmarks/helpers.py index 1843bc5a..0687a115 100644 --- a/benchmarks/helpers.py +++ b/benchmarks/helpers.py @@ -1,11 +1,15 @@ from __future__ import annotations import json +import math +import os import platform +import subprocess +import threading from dataclasses import asdict, dataclass from datetime import datetime from pathlib import Path -from typing import Any +from typing import Any, Callable import numpy as np @@ -47,6 +51,8 @@ def __post_init__(self) -> None: raise ValueError("scale arrays must have the same length") if self.number_time_points <= 0 or self.num_averages <= 0: raise ValueError("number_time_points and num_averages must be positive") + if self.start_size <= 0: + raise ValueError("start_size must be positive") if self.number_sensor_points <= 1: raise ValueError("number_sensor_points must be greater than 1") @@ -123,17 +129,20 @@ def rolling_average(previous_average: float, new_value: float, count: int) -> fl return (previous_average * (count - 1) + new_value) / count -def store_case_result(result: dict[str, Any], comp_size: int, comp_time: float, mem_usage: float, report_mem_usage: bool) -> None: - if len(result["comp_size"]) == 0 or result["comp_size"][-1] != comp_size: +def store_case_result( + result: dict[str, Any], case_index: int, comp_size: int, comp_time: float, mem_usage: float, report_mem_usage: bool +) -> None: + if case_index == len(result["comp_size"]): result["comp_size"].append(comp_size) result["comp_time"].append(comp_time) if report_mem_usage: result["mem_usage"].append(mem_usage) return - result["comp_time"][-1] = comp_time + result["comp_size"][case_index] = comp_size + result["comp_time"][case_index] = comp_time if report_mem_usage: - result["mem_usage"][-1] = mem_usage + result["mem_usage"][case_index] = mem_usage def save_results(path: Path, result: dict[str, Any]) -> None: @@ -148,7 +157,7 @@ def save_results(path: Path, result: dict[str, Any]) -> None: } if "mem_usage" in result: payload["mem_usage"] = [float(usage) for usage in result["mem_usage"]] - path.write_text(json.dumps(payload, indent=2) + "\n") + path.write_text(json.dumps(payload, indent=2, allow_nan=False) + "\n") def options_payload(options: BenchmarkOptions, backend: str, device: str) -> dict[str, Any]: @@ -166,13 +175,112 @@ def options_payload(options: BenchmarkOptions, backend: str, device: str) -> dic return payload +def validate_memory_bytes(value: float) -> float: + memory_bytes = float(value) + if not math.isfinite(memory_bytes) or memory_bytes < 0: + raise ValueError("memory usage must be a finite non-negative value") + return memory_bytes + + +def _linux_current_memory_bytes() -> float: + pages = int(Path("/proc/self/statm").read_text().split()[1]) + return float(pages * os.sysconf("SC_PAGE_SIZE")) + + +def _ps_current_memory_bytes() -> float: + completed = subprocess.run( + ["ps", "-o", "rss=", "-p", str(os.getpid())], + check=True, + capture_output=True, + text=True, + ) + return float(int(completed.stdout.strip()) * 1024) + + +def _windows_current_memory_bytes() -> float: + import ctypes + from ctypes import wintypes + + class ProcessMemoryCounters(ctypes.Structure): + _fields_ = [ + ("cb", wintypes.DWORD), + ("PageFaultCount", wintypes.DWORD), + ("PeakWorkingSetSize", ctypes.c_size_t), + ("WorkingSetSize", ctypes.c_size_t), + ("QuotaPeakPagedPoolUsage", ctypes.c_size_t), + ("QuotaPagedPoolUsage", ctypes.c_size_t), + ("QuotaPeakNonPagedPoolUsage", ctypes.c_size_t), + ("QuotaNonPagedPoolUsage", ctypes.c_size_t), + ("PagefileUsage", ctypes.c_size_t), + ("PeakPagefileUsage", ctypes.c_size_t), + ] + + counters = ProcessMemoryCounters() + counters.cb = ctypes.sizeof(ProcessMemoryCounters) + handle = ctypes.windll.kernel32.GetCurrentProcess() + if not ctypes.windll.psapi.GetProcessMemoryInfo(handle, ctypes.byref(counters), counters.cb): + raise RuntimeError("could not read process memory usage") + return float(counters.WorkingSetSize) + + def peak_memory_bytes() -> float: - try: - import resource - except ImportError: - return float("nan") - - usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - if platform.system().lower() == "darwin": - return float(usage) - return float(usage * 1024) + system = platform.system().lower() + if system == "linux": + try: + return validate_memory_bytes(_linux_current_memory_bytes()) + except (OSError, ValueError, IndexError): + return validate_memory_bytes(_ps_current_memory_bytes()) + if system == "darwin": + return validate_memory_bytes(_ps_current_memory_bytes()) + if system == "windows": + return validate_memory_bytes(_windows_current_memory_bytes()) + + raise RuntimeError("report_mem_usage is not supported on this platform") + + +class PeakMemorySampler: + def __init__(self, reader: Callable[[], float] = peak_memory_bytes, interval: float = 0.05): + self._reader = reader + self._interval = interval + self._peak_bytes = 0.0 + self._error: Exception | None = None + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + + @property + def peak_bytes(self) -> float: + with self._lock: + return self._peak_bytes + + def __enter__(self) -> PeakMemorySampler: + self._record_sample() + if self._interval > 0: + self._thread = threading.Thread(target=self._sample_until_stopped, daemon=True) + self._thread.start() + return self + + def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: + self._stop_event.set() + if self._thread is not None: + self._thread.join() + try: + self._record_sample() + except Exception as exc: + if exc_type is None: + raise exc + if exc_type is None and self._error is not None: + raise self._error + + def _sample_until_stopped(self) -> None: + while not self._stop_event.wait(self._interval): + try: + self._record_sample() + except Exception as exc: + self._error = exc + self._stop_event.set() + + def _record_sample(self) -> None: + memory_bytes = validate_memory_bytes(self._reader()) + with self._lock: + self._peak_bytes = max(self._peak_bytes, memory_bytes) diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index 37c4a0ed..07aaa8c5 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -92,6 +92,74 @@ def fake_solver(kgrid, medium, source, sensor, **kwargs): assert saved["options"]["start_size"] == 8 +def test_run_keeps_distinct_cases_with_the_same_total_grid_points(tmp_path: Path): + options = small_options( + x_scale_array=(1, 2), + y_scale_array=(2, 1), + z_scale_array=(1, 1), + ) + output_path = tmp_path / "benchmark.json" + times = iter([0.0, 1.0, 1.0, 3.0]) + shapes = [] + + def fake_solver(kgrid, *args, **kwargs): + shapes.append(tuple(kgrid.N)) + return {"p": np.zeros((1, 1))} + + result = run(options, output_path=output_path, solver=fake_solver, timer=lambda: next(times)) + + assert shapes == [(8, 16, 8), (16, 8, 8)] + assert result["comp_size"] == [8 * 16 * 8, 16 * 8 * 8] + assert result["comp_time"] == [pytest.approx(1.0), pytest.approx(2.0)] + + saved = json.loads(output_path.read_text()) + assert saved["comp_size"] == [8 * 16 * 8, 16 * 8 * 8] + assert saved["comp_time"] == [pytest.approx(1.0), pytest.approx(2.0)] + + +def test_start_size_must_be_positive(): + with pytest.raises(ValueError, match="start_size must be positive"): + small_options(start_size=0) + + +def test_run_reports_memory_usage_and_saves_valid_json(tmp_path: Path): + options = small_options(report_mem_usage=True, num_averages=2) + output_path = tmp_path / "benchmark.json" + times = iter([0.0, 1.0, 1.0, 3.0]) + memory_samples = iter([512.0, 1024.0, 2048.0, 4096.0, 8192.0]) + + def fake_solver(*args, **kwargs): + return {"p": np.zeros((1, 1))} + + result = run( + options, + max_cases=1, + output_path=output_path, + solver=fake_solver, + timer=lambda: next(times), + memory_reader=lambda: next(memory_samples), + memory_sampling_interval=0, + ) + + assert result["mem_usage"] == [pytest.approx(5120.0)] + + saved = json.loads(output_path.read_text()) + assert saved["mem_usage"] == [pytest.approx(5120.0)] + + +def test_report_memory_usage_fails_before_writing_when_unavailable(tmp_path: Path): + options = small_options(report_mem_usage=True) + output_path = tmp_path / "benchmark.json" + + def unsupported_memory_reader(): + raise RuntimeError("memory unavailable") + + with pytest.raises(ValueError, match="report_mem_usage is not supported"): + run(options, max_cases=1, output_path=output_path, memory_reader=unsupported_memory_reader) + + assert not output_path.exists() + + def test_run_saves_partial_results_after_solver_error(tmp_path: Path): options = small_options() output_path = tmp_path / "benchmark.json" From 08d8da69363f9ac9e1078d63cf1da4e2964b0128 Mon Sep 17 00:00:00 2001 From: Walter Simson Date: Sun, 21 Jun 2026 21:20:20 +0000 Subject: [PATCH 4/9] benchmark: cpp-backend memory via getrusage; rename + validation cleanups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses reviewer feedback (PR #731 thread, comment thread): ## cpp-backend memory measurement (P1) `benchmark.py` exposes a `--backend cpp` option, but the existing PeakMemorySampler reads the *Python* process RSS. When the cpp backend is active the simulation runs in a separate subprocess (the `kspaceFirstOrder-OMP`/`-CUDA` binary), so Python RSS reflects nothing about its memory footprint and `mem_usage` becomes silently meaningless. (The MATLAB original `benchmark.m` only ever exercised the in-process `kspaceFirstOrder3D` solver, so the equivalent question — measuring the subprocess `kspaceFirstOrder3DC` backend — didn't come up there.) Add `ChildPeakMemorySampler` that uses `resource.getrusage(RUSAGE_CHILDREN)` before/after the subprocess to capture true peak RSS of all reaped children. Zero new dependencies (stdlib only). Linux returns KB, macOS returns bytes — normalized to bytes. Windows is unsupported (`resource` is POSIX-only); `--report-mem-usage` + `backend="cpp"` on Windows now refuses with a clear `ValueError` at startup, before any output file is written. `benchmark.run()` picks the sampler factory based on backend automatically; tests override via the new `mem_sampler_factory=` kwarg. ## Rename peak_memory_bytes → current_memory_bytes (P2) The Linux (`/proc/self/statm` field [1]), macOS (`ps -o rss=`), and Windows (`WorkingSetSize`) readers all return *current* RSS, not historical peak. Only `PeakMemorySampler` (which polls in a background thread) finds a peak over time. Rename to reflect what the readers actually do. `peak_memory_bytes` retained as a back-compat alias. ## README clarifications - `data_cast="single"` is a no-op for `backend="cpp"` (binary always serializes as float32); only meaningful for `backend="python"`. - `--report-mem-usage` + backend interaction documented per above. ## __post_init__ validation guards (P3) Add three cheap guards that prevent opaque `ZeroDivisionError`s downstream: `domain_size > 0`, `sensor_radius > 0`, `pml_size >= 0`. ## Code comments - Density slab `[max(ny//4 - 1, 0):, :]`: comment why the `-1` is asymmetric with the head slice `[:nx//4, :]` (1-indexed inclusive start ↔ 0-indexed start). - `make_ball(... Vector([nx//2, ny//2, nz//2]) ...)`: comment that `make_ball` treats the supplied center as 1-indexed, so this matches MATLAB's `Nx/2` despite looking off-by-one. - `source.p0 = smooth(...).astype(dtype)`: comment that smooth() upcasts to float64 via the FFT path; the trailing astype restores user dtype. ## Inline `bon_a` The local was always computed but only used in the `options.nonlinear_media` branch (default `False`). Inline at the use site. ## Tests - 9 new test cases (18 total): back-compat alias, three validation guards (parametrized × 5 cases), cpp-backend factory invocation, Windows-cpp clear-error path, ChildPeakMemorySampler Windows refusal. - All existing tests still pass. Co-authored-by: Farid Yagubbayli Co-authored-by: Walter Simson --- benchmarks/README.md | 4 ++ benchmarks/benchmark.py | 34 +++++++++++++--- benchmarks/helpers.py | 79 ++++++++++++++++++++++++++++++++++-- tests/test_benchmark.py | 89 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 196 insertions(+), 10 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index 9878f6a5..012783ce 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -17,6 +17,10 @@ The default benchmark uses: By default, this can run for a long time and may stop once memory limits are reached. +> **`data_cast="single"` for `backend="cpp"`**: the C++ binary serializes inputs as `float32` regardless of the Python-side dtype, so `data_cast="single"` is a no-op for the cpp backend and `data_cast="off"` (default) is silently downcast at the serialize step. The flag is only meaningful for `backend="python"`, where it actually drives FFT precision. + +> **`--report-mem-usage` and backend choice**: for `backend="python"` (in-process), memory usage is the Python process's peak RSS — sampled by a background thread during each simulation. For `backend="cpp"` (subprocess), the C++ binary runs in a separate process, so Python-process RSS doesn't reflect its footprint. The benchmark uses `getrusage(RUSAGE_CHILDREN)` on Linux/macOS to record the cpp subprocess's peak RSS; on Windows this combination is unsupported and `report_mem_usage` will refuse `backend="cpp"` at startup with a clear error. (Note: the MATLAB original `benchmark.m` only ever exercised the in-process `kspaceFirstOrder3D` solver, so the equivalent question — measuring the subprocess `kspaceFirstOrder3DC` backend — didn't come up there.) + ## Usage Run a small smoke benchmark: diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index d20e4dbb..7594dd5b 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -17,12 +17,13 @@ from benchmarks.helpers import ( BenchmarkOptions, + ChildPeakMemorySampler, PeakMemorySampler, build_case, + current_memory_bytes, default_output_path, grid_sizes, options_payload, - peak_memory_bytes, rolling_average, save_results, store_case_result, @@ -41,8 +42,9 @@ def run( quiet: bool = True, solver: Callable[..., Any] | None = None, timer: Callable[[], float] = perf_counter, - memory_reader: Callable[[], float] = peak_memory_bytes, + memory_reader: Callable[[], float] = current_memory_bytes, memory_sampling_interval: float = 0.05, + mem_sampler_factory: Callable[[], Any] | None = None, ) -> dict[str, Any]: solver = kspaceFirstOrder if solver is None else solver cases = grid_sizes(options) @@ -51,6 +53,17 @@ def run( raise ValueError("max_cases must be positive") cases = cases[:max_cases] + # Sampler picks itself based on backend: cpp runs the simulation in a + # subprocess, so Python-process RSS sampling is meaningless — use + # getrusage(RUSAGE_CHILDREN) instead. Override via mem_sampler_factory + # for tests. + def _default_sampler_factory() -> Any: + if backend == "cpp": + return ChildPeakMemorySampler() + return PeakMemorySampler(memory_reader, memory_sampling_interval) + + sampler_factory = mem_sampler_factory or _default_sampler_factory + path = default_output_path(options) if output_path is None else Path(output_path) result: dict[str, Any] = { "comp_size": [], @@ -61,10 +74,19 @@ def run( "error_message": "", } if options.report_mem_usage: + # Probe early so unsupported combinations (e.g. Windows + cpp) fail + # before any output file is written. try: - validate_memory_bytes(memory_reader()) - except Exception as exc: - raise ValueError("report_mem_usage is not supported on this platform") from exc + sampler_factory() + except NotImplementedError as exc: + raise ValueError(str(exc)) from exc + if backend != "cpp": + # For python backend, also probe the reader so platform-missing + # /proc or ps surfaces here rather than mid-simulation. + try: + validate_memory_bytes(memory_reader()) + except Exception as exc: + raise ValueError("report_mem_usage is not supported on this platform") from exc result["mem_usage"] = [] for case_index, (nx, ny, nz, scale) in enumerate(cases): @@ -74,7 +96,7 @@ def run( kgrid, medium, source, sensor = build_case(options, nx, ny, nz, scale) for loop_num in range(1, options.num_averages + 1): if options.report_mem_usage: - with PeakMemorySampler(memory_reader, memory_sampling_interval) as memory_sampler: + with sampler_factory() as memory_sampler: start = timer() solver( kgrid, diff --git a/benchmarks/helpers.py b/benchmarks/helpers.py index 0687a115..9290d320 100644 --- a/benchmarks/helpers.py +++ b/benchmarks/helpers.py @@ -55,6 +55,12 @@ def __post_init__(self) -> None: raise ValueError("start_size must be positive") if self.number_sensor_points <= 1: raise ValueError("number_sensor_points must be greater than 1") + if self.domain_size <= 0: + raise ValueError("domain_size must be positive") + if self.sensor_radius <= 0: + raise ValueError("sensor_radius must be positive") + if self.pml_size < 0: + raise ValueError("pml_size must be non-negative") @property def dtype(self) -> type[np.floating[Any]]: @@ -84,12 +90,15 @@ def build_case(options: BenchmarkOptions, nx: int, ny: int, nz: int, scale: int) rho0 = dtype(1000) alpha_coeff = dtype(0.75) alpha_power = dtype(1.5) - bon_a = dtype(6) if options.heterogeneous_media: sound_speed = c0 * np.ones((nx, ny, nz), dtype=dtype) + # MATLAB: sound_speed(1:Nx/4, :, :) → Python: [:nx//4, :, :] (head slice, no -1). sound_speed[: nx // 4, :, :] = c0 * dtype(1.2) density = rho0 * np.ones((nx, ny, nz), dtype=dtype) + # MATLAB: density(:, Ny/4:end, :) → Python: [:, ny//4-1:, :] (tail slice — the + # -1 converts MATLAB's 1-indexed inclusive start to Python's 0-indexed start). + # max(..., 0) guards tiny grids where ny//4 - 1 would be negative. density[:, max(ny // 4 - 1, 0) :, :] = rho0 * dtype(1.2) else: sound_speed = np.array(c0, dtype=dtype) @@ -100,10 +109,14 @@ def build_case(options: BenchmarkOptions, nx: int, ny: int, nz: int, scale: int) medium.alpha_coeff = alpha_coeff medium.alpha_power = alpha_power if options.nonlinear_media: - medium.BonA = bon_a + medium.BonA = dtype(6) source = kSource() + # make_ball treats the supplied center as 1-indexed internally (mapgen.py), + # so passing nx//2 (= MATLAB's Nx/2 1-indexed value) yields the same centroid. source.p0 = dtype(10) * make_ball(Vector([nx, ny, nz]), Vector([nx // 2, ny // 2, nz // 2]), 2 * scale) + # smooth() upcasts to float64 via the FFT path even when given float32; + # the trailing .astype(dtype) restores user-requested precision. source.p0 = smooth(source.p0.astype(dtype, copy=False), restore_max=True).astype(dtype, copy=False) sensor_mask = make_cart_sphere(options.sensor_radius, options.number_sensor_points) @@ -223,7 +236,14 @@ class ProcessMemoryCounters(ctypes.Structure): return float(counters.WorkingSetSize) -def peak_memory_bytes() -> float: +def current_memory_bytes() -> float: + """Return the current resident-set-size of THIS Python process, in bytes. + + Note: this returns the *current* RSS at the moment of the call, not a + historical peak (despite ``ru_maxrss``-style fields existing on most + platforms). Use ``PeakMemorySampler`` to track peak-over-time, or + ``ChildPeakMemorySampler`` for subprocess (``backend="cpp"``) measurement. + """ system = platform.system().lower() if system == "linux": try: @@ -238,8 +258,59 @@ def peak_memory_bytes() -> float: raise RuntimeError("report_mem_usage is not supported on this platform") +# Back-compat alias — the old name is misleading but kept so external callers +# (and any pre-merge benchmark JSON tooling) don't break. +peak_memory_bytes = current_memory_bytes + + +class ChildPeakMemorySampler: + """Capture the peak RSS of child processes spawned during the ``with`` block. + + Use this for ``backend="cpp"`` benchmark runs where the simulation work + happens in a subprocess — ``PeakMemorySampler`` only sees the Python + parent's RSS and reports a number that has nothing to do with the C++ + process's actual footprint. + + Implementation: ``resource.getrusage(RUSAGE_CHILDREN).ru_maxrss`` is the + cumulative max across all reaped children, so we record before/after and + return the increment. On Linux ``ru_maxrss`` is reported in kilobytes; on + macOS it's in bytes (per BSD historical convention). Windows has no + equivalent in ``resource`` — the sampler raises ``NotImplementedError`` + on Windows so the caller can refuse the combination cleanly. + """ + + def __init__(self) -> None: + if platform.system().lower() == "windows": + raise NotImplementedError( + "ChildPeakMemorySampler is not supported on Windows " + "(resource.RUSAGE_CHILDREN is POSIX-only). " + "Use backend='python' to measure simulation memory on Windows." + ) + self._before = 0 + self._after = 0 + + def __enter__(self) -> ChildPeakMemorySampler: + import resource # POSIX-only; gated by the Windows check above + + self._before = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + return self + + def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: + import resource + + self._after = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + + @property + def peak_bytes(self) -> float: + delta = max(0, self._after - self._before) + # ru_maxrss units differ by platform (Linux: KB, macOS: bytes). + if platform.system().lower() == "linux": + return float(delta * 1024) + return float(delta) + + class PeakMemorySampler: - def __init__(self, reader: Callable[[], float] = peak_memory_bytes, interval: float = 0.05): + def __init__(self, reader: Callable[[], float] = current_memory_bytes, interval: float = 0.05): self._reader = reader self._interval = interval self._peak_bytes = 0.0 diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index 07aaa8c5..69ce5d77 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -1,10 +1,12 @@ import json from pathlib import Path +from unittest.mock import patch import numpy as np import pytest from benchmarks.benchmark import BenchmarkOptions, build_case, grid_sizes, run +from benchmarks.helpers import ChildPeakMemorySampler, current_memory_bytes, peak_memory_bytes def small_options(**overrides): @@ -160,6 +162,93 @@ def unsupported_memory_reader(): assert not output_path.exists() +def test_peak_memory_bytes_is_back_compat_alias_for_current_memory_bytes(): + # The old name was misleading (Linux/macOS/Windows readers return current + # RSS, not historical peak). Keep the alias so external benchmark tooling + # that imported the old name doesn't break. + assert peak_memory_bytes is current_memory_bytes + + +@pytest.mark.parametrize( + "kwargs, expected_match", + [ + ({"domain_size": 0}, "domain_size must be positive"), + ({"domain_size": -1e-3}, "domain_size must be positive"), + ({"sensor_radius": 0}, "sensor_radius must be positive"), + ({"sensor_radius": -1}, "sensor_radius must be positive"), + ({"pml_size": -1}, "pml_size must be non-negative"), + ], +) +def test_options_post_init_rejects_invalid_geometry(kwargs, expected_match): + with pytest.raises(ValueError, match=expected_match): + small_options(**kwargs) + + +def test_cpp_backend_uses_child_sampler_via_factory(tmp_path: Path): + """For backend='cpp', the subprocess sampler factory is used (not PeakMemorySampler).""" + options = small_options(report_mem_usage=True, num_averages=2) + output_path = tmp_path / "benchmark.json" + times = iter([0.0, 1.0, 1.0, 3.0]) + + sampler_calls = [] + + class FakeChildSampler: + def __enter__(self): + sampler_calls.append("enter") + return self + + def __exit__(self, *exc): + sampler_calls.append("exit") + + @property + def peak_bytes(self) -> float: + return float(1024 * len(sampler_calls)) # arbitrary but deterministic + + def fake_solver(*args, **kwargs): + return {"p": np.zeros((1, 1))} + + result = run( + options, + backend="cpp", + max_cases=1, + output_path=output_path, + solver=fake_solver, + timer=lambda: next(times), + mem_sampler_factory=FakeChildSampler, + ) + + # Two loop iterations × (enter, exit). The early-startup probe just + # constructs a sampler to verify the factory works; it doesn't enter. + assert sampler_calls == ["enter", "exit", "enter", "exit"] + assert "mem_usage" in result and len(result["mem_usage"]) == 1 + + +def test_cpp_backend_reports_clear_error_when_child_sampler_unsupported(tmp_path: Path): + """Windows + cpp + report_mem_usage should raise a clear error at startup.""" + options = small_options(report_mem_usage=True) + output_path = tmp_path / "benchmark.json" + + def unsupported_sampler_factory(): + raise NotImplementedError("ChildPeakMemorySampler is not supported on Windows (resource.RUSAGE_CHILDREN is POSIX-only).") + + with pytest.raises(ValueError, match="not supported on Windows"): + run( + options, + backend="cpp", + max_cases=1, + output_path=output_path, + mem_sampler_factory=unsupported_sampler_factory, + ) + # Failed-fast, before writing the output file. + assert not output_path.exists() + + +def test_child_peak_memory_sampler_refuses_on_windows(): + with patch("benchmarks.helpers.platform.system", return_value="Windows"): + with pytest.raises(NotImplementedError, match="not supported on Windows"): + ChildPeakMemorySampler() + + def test_run_saves_partial_results_after_solver_error(tmp_path: Path): options = small_options() output_path = tmp_path / "benchmark.json" From 5bc96bb8f5f6580dce12b9c8c159afe46c7db40e Mon Sep 17 00:00:00 2001 From: Walter Simson Date: Sun, 21 Jun 2026 22:52:56 +0000 Subject: [PATCH 5/9] =?UTF-8?q?benchmark:=20trim=20README=20=E2=80=94=20dr?= =?UTF-8?q?op=20verbose=20data=5Fcast/mem=5Fusage=20caveats?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cpp-backend memory measurement is now handled correctly by ChildPeakMemorySampler (Windows refused with a clear error), so the three-paragraph README explanation was redundant. The behavior is self-documenting via the error message; the script is the source of truth, not the README. --- benchmarks/README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index 012783ce..9878f6a5 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -17,10 +17,6 @@ The default benchmark uses: By default, this can run for a long time and may stop once memory limits are reached. -> **`data_cast="single"` for `backend="cpp"`**: the C++ binary serializes inputs as `float32` regardless of the Python-side dtype, so `data_cast="single"` is a no-op for the cpp backend and `data_cast="off"` (default) is silently downcast at the serialize step. The flag is only meaningful for `backend="python"`, where it actually drives FFT precision. - -> **`--report-mem-usage` and backend choice**: for `backend="python"` (in-process), memory usage is the Python process's peak RSS — sampled by a background thread during each simulation. For `backend="cpp"` (subprocess), the C++ binary runs in a separate process, so Python-process RSS doesn't reflect its footprint. The benchmark uses `getrusage(RUSAGE_CHILDREN)` on Linux/macOS to record the cpp subprocess's peak RSS; on Windows this combination is unsupported and `report_mem_usage` will refuse `backend="cpp"` at startup with a clear error. (Note: the MATLAB original `benchmark.m` only ever exercised the in-process `kspaceFirstOrder3D` solver, so the equivalent question — measuring the subprocess `kspaceFirstOrder3DC` backend — didn't come up there.) - ## Usage Run a small smoke benchmark: From af5361b423090ff6db3266f191f766b3fd6e1b90 Mon Sep 17 00:00:00 2001 From: Walter Simson Date: Sun, 21 Jun 2026 22:58:22 +0000 Subject: [PATCH 6/9] ChildPeakMemorySampler: capture baseline once per grid (Greptile P1 fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation captured the ru_maxrss baseline at every __enter__ — but ru_maxrss is monotonically non-decreasing cumulative across the parent's lifetime, so any iteration after the first with the same workload reported delta=0. Rolling average across N iterations silently deflated the memory measurement by a factor of N. Fix: - helpers.py: ChildPeakMemorySampler now captures baseline in __init__ (once per construction), not __enter__. __enter__ is a no-op; __exit__ records the latest ru_maxrss. peak_bytes returns growth since baseline. - benchmark.py: for backend="cpp", create the sampler ONCE per grid (outside the inner num_averages loop) and reuse across iterations. For backend="python", PeakMemorySampler stays per-iteration (its polling-based per-call peak is independent and averages meaningfully). - tests: replaced the prior cpp factory test with one that explicitly verifies (a) factory is called twice for one grid (1 probe + 1 grid, not once per iteration), (b) enter/exit happen 3× on the SAME instance for num_averages=3, (c) peak_bytes correctly accumulates instead of zeroing out. --- benchmarks/benchmark.py | 8 +++++++- benchmarks/helpers.py | 38 ++++++++++++++++++++++++-------------- tests/test_benchmark.py | 38 +++++++++++++++++++++++++++----------- 3 files changed, 58 insertions(+), 26 deletions(-) diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index 7594dd5b..b6df1321 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -94,9 +94,15 @@ def _default_sampler_factory() -> Any: loop_mem_usage = 0.0 try: kgrid, medium, source, sensor = build_case(options, nx, ny, nz, scale) + # cpp backend: ChildPeakMemorySampler's baseline must be captured + # once per grid (ru_maxrss is cumulative — re-baselining per + # iteration would silently zero the delta). Python backend's + # PeakMemorySampler is per-iteration by design. + grid_sampler = sampler_factory() if (options.report_mem_usage and backend == "cpp") else None for loop_num in range(1, options.num_averages + 1): if options.report_mem_usage: - with sampler_factory() as memory_sampler: + memory_cm = grid_sampler if grid_sampler is not None else sampler_factory() + with memory_cm as memory_sampler: start = timer() solver( kgrid, diff --git a/benchmarks/helpers.py b/benchmarks/helpers.py index 9290d320..a2dd1c9a 100644 --- a/benchmarks/helpers.py +++ b/benchmarks/helpers.py @@ -264,19 +264,27 @@ def current_memory_bytes() -> float: class ChildPeakMemorySampler: - """Capture the peak RSS of child processes spawned during the ``with`` block. + """Capture the peak RSS of child processes spawned during the sampler's lifetime. Use this for ``backend="cpp"`` benchmark runs where the simulation work happens in a subprocess — ``PeakMemorySampler`` only sees the Python parent's RSS and reports a number that has nothing to do with the C++ process's actual footprint. - Implementation: ``resource.getrusage(RUSAGE_CHILDREN).ru_maxrss`` is the - cumulative max across all reaped children, so we record before/after and - return the increment. On Linux ``ru_maxrss`` is reported in kilobytes; on - macOS it's in bytes (per BSD historical convention). Windows has no - equivalent in ``resource`` — the sampler raises ``NotImplementedError`` - on Windows so the caller can refuse the combination cleanly. + **Lifetime model — use ONE instance per grid size, enter/exit per iteration.** + ``resource.getrusage(RUSAGE_CHILDREN).ru_maxrss`` is a monotonically + non-decreasing cumulative max across *all* reaped children since the + parent process started, so capturing a baseline at each ``__enter__`` + would yield ``delta == 0`` for any iteration after the first when the + workload is uniform. Instead the baseline is captured at construction + and ``peak_bytes`` returns growth since then; reuse the same instance + across all iterations of a grid size so the answer reflects the peak + for that grid's runs, not a per-iteration delta that's always zero. + + On Linux ``ru_maxrss`` is reported in kilobytes; on macOS it's in bytes + (per BSD historical convention). Windows has no equivalent in + ``resource`` — the sampler raises ``NotImplementedError`` on Windows so + the caller can refuse the combination cleanly. """ def __init__(self) -> None: @@ -286,23 +294,25 @@ def __init__(self) -> None: "(resource.RUSAGE_CHILDREN is POSIX-only). " "Use backend='python' to measure simulation memory on Windows." ) - self._before = 0 - self._after = 0 - - def __enter__(self) -> ChildPeakMemorySampler: import resource # POSIX-only; gated by the Windows check above - self._before = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + self._baseline = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + self._latest = self._baseline + + def __enter__(self) -> ChildPeakMemorySampler: + # No-op: ru_maxrss is cumulative across the lifetime of the parent, + # so re-capturing a baseline at every iteration would silently + # collapse the reported delta to zero. Baseline is fixed at __init__. return self def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: import resource - self._after = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + self._latest = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss @property def peak_bytes(self) -> float: - delta = max(0, self._after - self._before) + delta = max(0, self._latest - self._baseline) # ru_maxrss units differ by platform (Linux: KB, macOS: bytes). if platform.system().lower() == "linux": return float(delta * 1024) diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index 69ce5d77..350ceff7 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -184,25 +184,37 @@ def test_options_post_init_rejects_invalid_geometry(kwargs, expected_match): small_options(**kwargs) -def test_cpp_backend_uses_child_sampler_via_factory(tmp_path: Path): - """For backend='cpp', the subprocess sampler factory is used (not PeakMemorySampler).""" - options = small_options(report_mem_usage=True, num_averages=2) +def test_cpp_backend_shares_child_sampler_across_iterations(tmp_path: Path): + """For backend='cpp', the factory is called ONCE per grid (not per iteration). + + ChildPeakMemorySampler's baseline must be captured once per grid because + ru_maxrss is cumulative — re-baselining per iteration would silently + zero the reported delta (Greptile P1 on the original implementation). + """ + options = small_options(report_mem_usage=True, num_averages=3) output_path = tmp_path / "benchmark.json" - times = iter([0.0, 1.0, 1.0, 3.0]) + times = iter([0.0, 1.0, 1.0, 3.0, 3.0, 7.0]) - sampler_calls = [] + constructed: list[object] = [] + enter_count = 0 class FakeChildSampler: + def __init__(self): + constructed.append(self) + self._peak = 0 + def __enter__(self): - sampler_calls.append("enter") + nonlocal enter_count + enter_count += 1 return self def __exit__(self, *exc): - sampler_calls.append("exit") + # Simulate cumulative growth: peak grows by 1 KB each iteration. + self._peak += 1024 @property def peak_bytes(self) -> float: - return float(1024 * len(sampler_calls)) # arbitrary but deterministic + return float(self._peak) def fake_solver(*args, **kwargs): return {"p": np.zeros((1, 1))} @@ -217,9 +229,13 @@ def fake_solver(*args, **kwargs): mem_sampler_factory=FakeChildSampler, ) - # Two loop iterations × (enter, exit). The early-startup probe just - # constructs a sampler to verify the factory works; it doesn't enter. - assert sampler_calls == ["enter", "exit", "enter", "exit"] + # max_cases=1 grid → factory called twice: once at the startup probe, + # once for the actual grid loop. NOT once per iteration. + assert len(constructed) == 2, f"factory called {len(constructed)} times; expected 2 (1 probe + 1 grid)" + # num_averages=3 iterations on that one grid → 3 enter/exit pairs on the SAME instance. + assert enter_count == 3 + # The actively-used sampler grew monotonically to 3 KB. + assert constructed[1].peak_bytes == pytest.approx(3072.0) assert "mem_usage" in result and len(result["mem_usage"]) == 1 From 95e802f9f7dd74ca6a71847d6f6ddf464b5d469a Mon Sep 17 00:00:00 2001 From: Walter Simson Date: Sun, 21 Jun 2026 23:21:02 +0000 Subject: [PATCH 7/9] benchmark: use psutil for current-process RSS (replaces 3 platform readers) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Collapses the per-platform memory readers into one psutil call: - _linux_current_memory_bytes (parsed /proc/self/statm) - _ps_current_memory_bytes (shelled out to ps -o rss=) - _windows_current_memory_bytes (ctypes ProcessMemoryCounters struct) → psutil.Process().memory_info().rss ~70 LOC removed (including the Windows ctypes struct and try/except chain). Cross-platform from one call. Added as an optional dependency under the new `[benchmark]` extra in pyproject.toml so users not running benchmarks don't pull it in: pip install k-wave-python[benchmark] Scope intentionally limited: ChildPeakMemorySampler still uses resource.getrusage(RUSAGE_CHILDREN). Switching its subprocess measurement to psutil would require exposing kspaceFirstOrder's subprocess.Popen handle from kwave.solvers.cpp_simulation, which belongs in a separate PR. All 18 tests pass on Linux. --- benchmarks/helpers.py | 683 +++++++++++++++++++----------------------- pyproject.toml | 1 + 2 files changed, 317 insertions(+), 367 deletions(-) diff --git a/benchmarks/helpers.py b/benchmarks/helpers.py index a2dd1c9a..76c1eb43 100644 --- a/benchmarks/helpers.py +++ b/benchmarks/helpers.py @@ -1,367 +1,316 @@ -from __future__ import annotations - -import json -import math -import os -import platform -import subprocess -import threading -from dataclasses import asdict, dataclass -from datetime import datetime -from pathlib import Path -from typing import Any, Callable - -import numpy as np - -import kwave -from kwave.data import Vector -from kwave.kgrid import kWaveGrid -from kwave.kmedium import kWaveMedium -from kwave.ksensor import kSensor -from kwave.ksource import kSource -from kwave.utils.conversion import cart2grid -from kwave.utils.filters import smooth -from kwave.utils.mapgen import make_ball, make_cart_sphere - - -@dataclass(frozen=True) -class BenchmarkOptions: - data_cast: str = "off" - heterogeneous_media: bool = True - absorbing_media: bool = True - nonlinear_media: bool = False - binary_sensor_mask: bool = True - number_sensor_points: int = 100 - number_time_points: int = 1000 - num_averages: int = 3 - start_size: int = 32 - x_scale_array: tuple[int, ...] = (1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16, 16) - y_scale_array: tuple[int, ...] = (1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16) - z_scale_array: tuple[int, ...] = (1, 1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8) - domain_size: float = 22e-3 - sensor_radius: float = 10e-3 - pml_size: int = 10 - pml_inside: bool = True - report_mem_usage: bool = False - - def __post_init__(self) -> None: - if self.data_cast not in {"off", "single"}: - raise ValueError("data_cast must be 'off' or 'single'. Use device='gpu' to run on a GPU.") - if not (len(self.x_scale_array) == len(self.y_scale_array) == len(self.z_scale_array)): - raise ValueError("scale arrays must have the same length") - if self.number_time_points <= 0 or self.num_averages <= 0: - raise ValueError("number_time_points and num_averages must be positive") - if self.start_size <= 0: - raise ValueError("start_size must be positive") - if self.number_sensor_points <= 1: - raise ValueError("number_sensor_points must be greater than 1") - if self.domain_size <= 0: - raise ValueError("domain_size must be positive") - if self.sensor_radius <= 0: - raise ValueError("sensor_radius must be positive") - if self.pml_size < 0: - raise ValueError("pml_size must be non-negative") - - @property - def dtype(self) -> type[np.floating[Any]]: - return np.float32 if self.data_cast == "single" else np.float64 - - -def grid_sizes(options: BenchmarkOptions = BenchmarkOptions()) -> list[tuple[int, int, int, int]]: - return [ - ( - options.start_size * xscale, - options.start_size * yscale, - options.start_size * zscale, - min(xscale, yscale, zscale), - ) - for xscale, yscale, zscale in zip(options.x_scale_array, options.y_scale_array, options.z_scale_array) - ] - - -def build_case(options: BenchmarkOptions, nx: int, ny: int, nz: int, scale: int) -> tuple[kWaveGrid, kWaveMedium, kSource, kSensor]: - dtype = options.dtype - dx = options.domain_size / nx - dy = options.domain_size / ny - dz = options.domain_size / nz - kgrid = kWaveGrid(Vector([nx, ny, nz]), Vector([dx, dy, dz])) - - c0 = dtype(1500) - rho0 = dtype(1000) - alpha_coeff = dtype(0.75) - alpha_power = dtype(1.5) - - if options.heterogeneous_media: - sound_speed = c0 * np.ones((nx, ny, nz), dtype=dtype) - # MATLAB: sound_speed(1:Nx/4, :, :) → Python: [:nx//4, :, :] (head slice, no -1). - sound_speed[: nx // 4, :, :] = c0 * dtype(1.2) - density = rho0 * np.ones((nx, ny, nz), dtype=dtype) - # MATLAB: density(:, Ny/4:end, :) → Python: [:, ny//4-1:, :] (tail slice — the - # -1 converts MATLAB's 1-indexed inclusive start to Python's 0-indexed start). - # max(..., 0) guards tiny grids where ny//4 - 1 would be negative. - density[:, max(ny // 4 - 1, 0) :, :] = rho0 * dtype(1.2) - else: - sound_speed = np.array(c0, dtype=dtype) - density = np.array(rho0, dtype=dtype) - - medium = kWaveMedium(sound_speed=sound_speed, density=density) - if options.absorbing_media: - medium.alpha_coeff = alpha_coeff - medium.alpha_power = alpha_power - if options.nonlinear_media: - medium.BonA = dtype(6) - - source = kSource() - # make_ball treats the supplied center as 1-indexed internally (mapgen.py), - # so passing nx//2 (= MATLAB's Nx/2 1-indexed value) yields the same centroid. - source.p0 = dtype(10) * make_ball(Vector([nx, ny, nz]), Vector([nx // 2, ny // 2, nz // 2]), 2 * scale) - # smooth() upcasts to float64 via the FFT path even when given float32; - # the trailing .astype(dtype) restores user-requested precision. - source.p0 = smooth(source.p0.astype(dtype, copy=False), restore_max=True).astype(dtype, copy=False) - - sensor_mask = make_cart_sphere(options.sensor_radius, options.number_sensor_points) - if options.binary_sensor_mask: - sensor_mask, _, _ = cart2grid(kgrid, sensor_mask, order="C") - sensor_mask = sensor_mask.astype(bool) - sensor = kSensor(mask=sensor_mask) - sensor.record = ["p"] - - kgrid.makeTime(np.max(np.asarray(medium.sound_speed))) - kgrid.setTime(options.number_time_points, kgrid.dt) - - return kgrid, medium, source, sensor - - -def default_output_path(options: BenchmarkOptions) -> Path: - computer_name = platform.node() or "unknown-computer" - date = datetime.now().strftime("%Y%m%d-%H%M%S") - return Path(f"benchmark_data-{computer_name}-{options.data_cast}-{date}.json") - - -def rolling_average(previous_average: float, new_value: float, count: int) -> float: - return (previous_average * (count - 1) + new_value) / count - - -def store_case_result( - result: dict[str, Any], case_index: int, comp_size: int, comp_time: float, mem_usage: float, report_mem_usage: bool -) -> None: - if case_index == len(result["comp_size"]): - result["comp_size"].append(comp_size) - result["comp_time"].append(comp_time) - if report_mem_usage: - result["mem_usage"].append(mem_usage) - return - - result["comp_size"][case_index] = comp_size - result["comp_time"][case_index] = comp_time - if report_mem_usage: - result["mem_usage"][case_index] = mem_usage - - -def save_results(path: Path, result: dict[str, Any]) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - payload = { - "comp_size": [int(size) for size in result["comp_size"]], - "comp_time": [float(time) for time in result["comp_time"]], - "options": result["options"], - "output_path": result["output_path"], - "error_reached": bool(result["error_reached"]), - "error_message": result["error_message"], - } - if "mem_usage" in result: - payload["mem_usage"] = [float(usage) for usage in result["mem_usage"]] - path.write_text(json.dumps(payload, indent=2, allow_nan=False) + "\n") - - -def options_payload(options: BenchmarkOptions, backend: str, device: str) -> dict[str, Any]: - payload = asdict(options) - payload.update( - { - "backend": backend, - "device": device, - "computer_name": platform.node(), - "python_version": platform.python_version(), - "platform": platform.platform(), - "kwave_python_version": kwave.__version__, - } - ) - return payload - - -def validate_memory_bytes(value: float) -> float: - memory_bytes = float(value) - if not math.isfinite(memory_bytes) or memory_bytes < 0: - raise ValueError("memory usage must be a finite non-negative value") - return memory_bytes - - -def _linux_current_memory_bytes() -> float: - pages = int(Path("/proc/self/statm").read_text().split()[1]) - return float(pages * os.sysconf("SC_PAGE_SIZE")) - - -def _ps_current_memory_bytes() -> float: - completed = subprocess.run( - ["ps", "-o", "rss=", "-p", str(os.getpid())], - check=True, - capture_output=True, - text=True, - ) - return float(int(completed.stdout.strip()) * 1024) - - -def _windows_current_memory_bytes() -> float: - import ctypes - from ctypes import wintypes - - class ProcessMemoryCounters(ctypes.Structure): - _fields_ = [ - ("cb", wintypes.DWORD), - ("PageFaultCount", wintypes.DWORD), - ("PeakWorkingSetSize", ctypes.c_size_t), - ("WorkingSetSize", ctypes.c_size_t), - ("QuotaPeakPagedPoolUsage", ctypes.c_size_t), - ("QuotaPagedPoolUsage", ctypes.c_size_t), - ("QuotaPeakNonPagedPoolUsage", ctypes.c_size_t), - ("QuotaNonPagedPoolUsage", ctypes.c_size_t), - ("PagefileUsage", ctypes.c_size_t), - ("PeakPagefileUsage", ctypes.c_size_t), - ] - - counters = ProcessMemoryCounters() - counters.cb = ctypes.sizeof(ProcessMemoryCounters) - handle = ctypes.windll.kernel32.GetCurrentProcess() - if not ctypes.windll.psapi.GetProcessMemoryInfo(handle, ctypes.byref(counters), counters.cb): - raise RuntimeError("could not read process memory usage") - return float(counters.WorkingSetSize) - - -def current_memory_bytes() -> float: - """Return the current resident-set-size of THIS Python process, in bytes. - - Note: this returns the *current* RSS at the moment of the call, not a - historical peak (despite ``ru_maxrss``-style fields existing on most - platforms). Use ``PeakMemorySampler`` to track peak-over-time, or - ``ChildPeakMemorySampler`` for subprocess (``backend="cpp"``) measurement. - """ - system = platform.system().lower() - if system == "linux": - try: - return validate_memory_bytes(_linux_current_memory_bytes()) - except (OSError, ValueError, IndexError): - return validate_memory_bytes(_ps_current_memory_bytes()) - if system == "darwin": - return validate_memory_bytes(_ps_current_memory_bytes()) - if system == "windows": - return validate_memory_bytes(_windows_current_memory_bytes()) - - raise RuntimeError("report_mem_usage is not supported on this platform") - - -# Back-compat alias — the old name is misleading but kept so external callers -# (and any pre-merge benchmark JSON tooling) don't break. -peak_memory_bytes = current_memory_bytes - - -class ChildPeakMemorySampler: - """Capture the peak RSS of child processes spawned during the sampler's lifetime. - - Use this for ``backend="cpp"`` benchmark runs where the simulation work - happens in a subprocess — ``PeakMemorySampler`` only sees the Python - parent's RSS and reports a number that has nothing to do with the C++ - process's actual footprint. - - **Lifetime model — use ONE instance per grid size, enter/exit per iteration.** - ``resource.getrusage(RUSAGE_CHILDREN).ru_maxrss`` is a monotonically - non-decreasing cumulative max across *all* reaped children since the - parent process started, so capturing a baseline at each ``__enter__`` - would yield ``delta == 0`` for any iteration after the first when the - workload is uniform. Instead the baseline is captured at construction - and ``peak_bytes`` returns growth since then; reuse the same instance - across all iterations of a grid size so the answer reflects the peak - for that grid's runs, not a per-iteration delta that's always zero. - - On Linux ``ru_maxrss`` is reported in kilobytes; on macOS it's in bytes - (per BSD historical convention). Windows has no equivalent in - ``resource`` — the sampler raises ``NotImplementedError`` on Windows so - the caller can refuse the combination cleanly. - """ - - def __init__(self) -> None: - if platform.system().lower() == "windows": - raise NotImplementedError( - "ChildPeakMemorySampler is not supported on Windows " - "(resource.RUSAGE_CHILDREN is POSIX-only). " - "Use backend='python' to measure simulation memory on Windows." - ) - import resource # POSIX-only; gated by the Windows check above - - self._baseline = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss - self._latest = self._baseline - - def __enter__(self) -> ChildPeakMemorySampler: - # No-op: ru_maxrss is cumulative across the lifetime of the parent, - # so re-capturing a baseline at every iteration would silently - # collapse the reported delta to zero. Baseline is fixed at __init__. - return self - - def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: - import resource - - self._latest = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss - - @property - def peak_bytes(self) -> float: - delta = max(0, self._latest - self._baseline) - # ru_maxrss units differ by platform (Linux: KB, macOS: bytes). - if platform.system().lower() == "linux": - return float(delta * 1024) - return float(delta) - - -class PeakMemorySampler: - def __init__(self, reader: Callable[[], float] = current_memory_bytes, interval: float = 0.05): - self._reader = reader - self._interval = interval - self._peak_bytes = 0.0 - self._error: Exception | None = None - self._lock = threading.Lock() - self._stop_event = threading.Event() - self._thread: threading.Thread | None = None - - @property - def peak_bytes(self) -> float: - with self._lock: - return self._peak_bytes - - def __enter__(self) -> PeakMemorySampler: - self._record_sample() - if self._interval > 0: - self._thread = threading.Thread(target=self._sample_until_stopped, daemon=True) - self._thread.start() - return self - - def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: - self._stop_event.set() - if self._thread is not None: - self._thread.join() - try: - self._record_sample() - except Exception as exc: - if exc_type is None: - raise exc - if exc_type is None and self._error is not None: - raise self._error - - def _sample_until_stopped(self) -> None: - while not self._stop_event.wait(self._interval): - try: - self._record_sample() - except Exception as exc: - self._error = exc - self._stop_event.set() - - def _record_sample(self) -> None: - memory_bytes = validate_memory_bytes(self._reader()) - with self._lock: - self._peak_bytes = max(self._peak_bytes, memory_bytes) +from __future__ import annotations + +import json +import math +import platform +import threading +from dataclasses import asdict, dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Callable + +import numpy as np +import psutil + +import kwave +from kwave.data import Vector +from kwave.kgrid import kWaveGrid +from kwave.kmedium import kWaveMedium +from kwave.ksensor import kSensor +from kwave.ksource import kSource +from kwave.utils.conversion import cart2grid +from kwave.utils.filters import smooth +from kwave.utils.mapgen import make_ball, make_cart_sphere + + +@dataclass(frozen=True) +class BenchmarkOptions: + data_cast: str = "off" + heterogeneous_media: bool = True + absorbing_media: bool = True + nonlinear_media: bool = False + binary_sensor_mask: bool = True + number_sensor_points: int = 100 + number_time_points: int = 1000 + num_averages: int = 3 + start_size: int = 32 + x_scale_array: tuple[int, ...] = (1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16, 16) + y_scale_array: tuple[int, ...] = (1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8, 16) + z_scale_array: tuple[int, ...] = (1, 1, 1, 2, 2, 2, 4, 4, 4, 8, 8, 8) + domain_size: float = 22e-3 + sensor_radius: float = 10e-3 + pml_size: int = 10 + pml_inside: bool = True + report_mem_usage: bool = False + + def __post_init__(self) -> None: + if self.data_cast not in {"off", "single"}: + raise ValueError("data_cast must be 'off' or 'single'. Use device='gpu' to run on a GPU.") + if not (len(self.x_scale_array) == len(self.y_scale_array) == len(self.z_scale_array)): + raise ValueError("scale arrays must have the same length") + if self.number_time_points <= 0 or self.num_averages <= 0: + raise ValueError("number_time_points and num_averages must be positive") + if self.start_size <= 0: + raise ValueError("start_size must be positive") + if self.number_sensor_points <= 1: + raise ValueError("number_sensor_points must be greater than 1") + if self.domain_size <= 0: + raise ValueError("domain_size must be positive") + if self.sensor_radius <= 0: + raise ValueError("sensor_radius must be positive") + if self.pml_size < 0: + raise ValueError("pml_size must be non-negative") + + @property + def dtype(self) -> type[np.floating[Any]]: + return np.float32 if self.data_cast == "single" else np.float64 + + +def grid_sizes(options: BenchmarkOptions = BenchmarkOptions()) -> list[tuple[int, int, int, int]]: + return [ + ( + options.start_size * xscale, + options.start_size * yscale, + options.start_size * zscale, + min(xscale, yscale, zscale), + ) + for xscale, yscale, zscale in zip(options.x_scale_array, options.y_scale_array, options.z_scale_array) + ] + + +def build_case(options: BenchmarkOptions, nx: int, ny: int, nz: int, scale: int) -> tuple[kWaveGrid, kWaveMedium, kSource, kSensor]: + dtype = options.dtype + dx = options.domain_size / nx + dy = options.domain_size / ny + dz = options.domain_size / nz + kgrid = kWaveGrid(Vector([nx, ny, nz]), Vector([dx, dy, dz])) + + c0 = dtype(1500) + rho0 = dtype(1000) + alpha_coeff = dtype(0.75) + alpha_power = dtype(1.5) + + if options.heterogeneous_media: + sound_speed = c0 * np.ones((nx, ny, nz), dtype=dtype) + # MATLAB: sound_speed(1:Nx/4, :, :) → Python: [:nx//4, :, :] (head slice, no -1). + sound_speed[: nx // 4, :, :] = c0 * dtype(1.2) + density = rho0 * np.ones((nx, ny, nz), dtype=dtype) + # MATLAB: density(:, Ny/4:end, :) → Python: [:, ny//4-1:, :] (tail slice — the + # -1 converts MATLAB's 1-indexed inclusive start to Python's 0-indexed start). + # max(..., 0) guards tiny grids where ny//4 - 1 would be negative. + density[:, max(ny // 4 - 1, 0) :, :] = rho0 * dtype(1.2) + else: + sound_speed = np.array(c0, dtype=dtype) + density = np.array(rho0, dtype=dtype) + + medium = kWaveMedium(sound_speed=sound_speed, density=density) + if options.absorbing_media: + medium.alpha_coeff = alpha_coeff + medium.alpha_power = alpha_power + if options.nonlinear_media: + medium.BonA = dtype(6) + + source = kSource() + # make_ball treats the supplied center as 1-indexed internally (mapgen.py), + # so passing nx//2 (= MATLAB's Nx/2 1-indexed value) yields the same centroid. + source.p0 = dtype(10) * make_ball(Vector([nx, ny, nz]), Vector([nx // 2, ny // 2, nz // 2]), 2 * scale) + # smooth() upcasts to float64 via the FFT path even when given float32; + # the trailing .astype(dtype) restores user-requested precision. + source.p0 = smooth(source.p0.astype(dtype, copy=False), restore_max=True).astype(dtype, copy=False) + + sensor_mask = make_cart_sphere(options.sensor_radius, options.number_sensor_points) + if options.binary_sensor_mask: + sensor_mask, _, _ = cart2grid(kgrid, sensor_mask, order="C") + sensor_mask = sensor_mask.astype(bool) + sensor = kSensor(mask=sensor_mask) + sensor.record = ["p"] + + kgrid.makeTime(np.max(np.asarray(medium.sound_speed))) + kgrid.setTime(options.number_time_points, kgrid.dt) + + return kgrid, medium, source, sensor + + +def default_output_path(options: BenchmarkOptions) -> Path: + computer_name = platform.node() or "unknown-computer" + date = datetime.now().strftime("%Y%m%d-%H%M%S") + return Path(f"benchmark_data-{computer_name}-{options.data_cast}-{date}.json") + + +def rolling_average(previous_average: float, new_value: float, count: int) -> float: + return (previous_average * (count - 1) + new_value) / count + + +def store_case_result( + result: dict[str, Any], case_index: int, comp_size: int, comp_time: float, mem_usage: float, report_mem_usage: bool +) -> None: + if case_index == len(result["comp_size"]): + result["comp_size"].append(comp_size) + result["comp_time"].append(comp_time) + if report_mem_usage: + result["mem_usage"].append(mem_usage) + return + + result["comp_size"][case_index] = comp_size + result["comp_time"][case_index] = comp_time + if report_mem_usage: + result["mem_usage"][case_index] = mem_usage + + +def save_results(path: Path, result: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "comp_size": [int(size) for size in result["comp_size"]], + "comp_time": [float(time) for time in result["comp_time"]], + "options": result["options"], + "output_path": result["output_path"], + "error_reached": bool(result["error_reached"]), + "error_message": result["error_message"], + } + if "mem_usage" in result: + payload["mem_usage"] = [float(usage) for usage in result["mem_usage"]] + path.write_text(json.dumps(payload, indent=2, allow_nan=False) + "\n") + + +def options_payload(options: BenchmarkOptions, backend: str, device: str) -> dict[str, Any]: + payload = asdict(options) + payload.update( + { + "backend": backend, + "device": device, + "computer_name": platform.node(), + "python_version": platform.python_version(), + "platform": platform.platform(), + "kwave_python_version": kwave.__version__, + } + ) + return payload + + +def validate_memory_bytes(value: float) -> float: + memory_bytes = float(value) + if not math.isfinite(memory_bytes) or memory_bytes < 0: + raise ValueError("memory usage must be a finite non-negative value") + return memory_bytes + + + + + +def current_memory_bytes() -> float: + """Return the current resident-set-size of THIS Python process, in bytes. + + Note: this returns the *current* RSS at the moment of the call, not a + historical peak. Use ``PeakMemorySampler`` to track peak-over-time, or + ``ChildPeakMemorySampler`` for subprocess (``backend="cpp"``) measurement. + """ + return validate_memory_bytes(psutil.Process().memory_info().rss) + + +# Back-compat alias — the old name is misleading but kept so external callers +# (and any pre-merge benchmark JSON tooling) don't break. +peak_memory_bytes = current_memory_bytes + + +class ChildPeakMemorySampler: + """Capture the peak RSS of child processes spawned during the sampler's lifetime. + + Use this for ``backend="cpp"`` benchmark runs where the simulation work + happens in a subprocess — ``PeakMemorySampler`` only sees the Python + parent's RSS and reports a number that has nothing to do with the C++ + process's actual footprint. + + **Lifetime model — use ONE instance per grid size, enter/exit per iteration.** + ``resource.getrusage(RUSAGE_CHILDREN).ru_maxrss`` is a monotonically + non-decreasing cumulative max across *all* reaped children since the + parent process started, so capturing a baseline at each ``__enter__`` + would yield ``delta == 0`` for any iteration after the first when the + workload is uniform. Instead the baseline is captured at construction + and ``peak_bytes`` returns growth since then; reuse the same instance + across all iterations of a grid size so the answer reflects the peak + for that grid's runs, not a per-iteration delta that's always zero. + + On Linux ``ru_maxrss`` is reported in kilobytes; on macOS it's in bytes + (per BSD historical convention). Windows has no equivalent in + ``resource`` — the sampler raises ``NotImplementedError`` on Windows so + the caller can refuse the combination cleanly. + """ + + def __init__(self) -> None: + if platform.system().lower() == "windows": + raise NotImplementedError( + "ChildPeakMemorySampler is not supported on Windows " + "(resource.RUSAGE_CHILDREN is POSIX-only). " + "Use backend='python' to measure simulation memory on Windows." + ) + import resource # POSIX-only; gated by the Windows check above + + self._baseline = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + self._latest = self._baseline + + def __enter__(self) -> ChildPeakMemorySampler: + # No-op: ru_maxrss is cumulative across the lifetime of the parent, + # so re-capturing a baseline at every iteration would silently + # collapse the reported delta to zero. Baseline is fixed at __init__. + return self + + def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: + import resource + + self._latest = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + + @property + def peak_bytes(self) -> float: + delta = max(0, self._latest - self._baseline) + # ru_maxrss units differ by platform (Linux: KB, macOS: bytes). + if platform.system().lower() == "linux": + return float(delta * 1024) + return float(delta) + + +class PeakMemorySampler: + def __init__(self, reader: Callable[[], float] = current_memory_bytes, interval: float = 0.05): + self._reader = reader + self._interval = interval + self._peak_bytes = 0.0 + self._error: Exception | None = None + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + + @property + def peak_bytes(self) -> float: + with self._lock: + return self._peak_bytes + + def __enter__(self) -> PeakMemorySampler: + self._record_sample() + if self._interval > 0: + self._thread = threading.Thread(target=self._sample_until_stopped, daemon=True) + self._thread.start() + return self + + def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: + self._stop_event.set() + if self._thread is not None: + self._thread.join() + try: + self._record_sample() + except Exception as exc: + if exc_type is None: + raise exc + if exc_type is None and self._error is not None: + raise self._error + + def _sample_until_stopped(self) -> None: + while not self._stop_event.wait(self._interval): + try: + self._record_sample() + except Exception as exc: + self._error = exc + self._stop_event.set() + + def _record_sample(self) -> None: + memory_bytes = validate_memory_bytes(self._reader()) + with self._lock: + self._peak_bytes = max(self._peak_bytes, memory_bytes) diff --git a/pyproject.toml b/pyproject.toml index d834c2d2..c4eb0bb1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ test = ["pytest", "testfixtures==8.3.0", "requests==2.34.2"] example = ["gdown==6.1.0"] +benchmark = ["psutil>=5.9"] docs = [ "sphinx<9", "sphinx-mdinclude==0.6.2", "sphinx-copybutton==0.5.2", From cc170e1ee2e0e8d88db5f9fbb20a22efceb18cae Mon Sep 17 00:00:00 2001 From: Walter Simson Date: Mon, 22 Jun 2026 03:17:49 +0000 Subject: [PATCH 8/9] benchmark: import psutil lazily so test collection works without [benchmark] extra MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI pytest workflow installs the [test] extra, not [benchmark]. The previous commit's top-level `import psutil` in benchmarks/helpers.py made the import unconditional, so pytest collection of tests/test_benchmark.py failed with ModuleNotFoundError before any test ran (collection imports `from benchmarks.benchmark import ...` which transitively imports helpers). Move the import inside `current_memory_bytes()`. The module itself imports cleanly without psutil; only callers that take a real memory reading need it. Tests that inject `memory_reader=` (the existing mocking pattern) work unchanged. Two options were considered: lazy import (this), or adding psutil to the [test] extra. Lazy import is more honest about when psutil is actually needed — measuring real memory, not running tests. --- benchmarks/helpers.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/benchmarks/helpers.py b/benchmarks/helpers.py index 76c1eb43..993fbe93 100644 --- a/benchmarks/helpers.py +++ b/benchmarks/helpers.py @@ -10,7 +10,6 @@ from typing import Any, Callable import numpy as np -import psutil import kwave from kwave.data import Vector @@ -203,7 +202,14 @@ def current_memory_bytes() -> float: Note: this returns the *current* RSS at the moment of the call, not a historical peak. Use ``PeakMemorySampler`` to track peak-over-time, or ``ChildPeakMemorySampler`` for subprocess (``backend="cpp"``) measurement. + + ``psutil`` is imported lazily so importing the benchmarks package (e.g. + during pytest collection) doesn't require the ``[benchmark]`` extra to + be installed. Only callers that actually take a memory measurement + need ``psutil``; tests inject their own readers via ``memory_reader=``. """ + import psutil # noqa: PLC0415 — intentionally lazy; see docstring + return validate_memory_bytes(psutil.Process().memory_info().rss) From 6268b9ecee032a9d2c842b3f86de418e40b43f48 Mon Sep 17 00:00:00 2001 From: Walter Simson Date: Mon, 22 Jun 2026 04:10:12 +0000 Subject: [PATCH 9/9] =?UTF-8?q?benchmark:=20/simplify=20pass=20=E2=80=94?= =?UTF-8?q?=20drop=20kwarg,=20cache=20Process,=20fold=20platform=20consts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cleanups from the /simplify review: - Drop `mem_sampler_factory` parameter from `run()`. Tests now monkeypatch `benchmarks.benchmark.ChildPeakMemorySampler` via `unittest.mock.patch` (the pattern already used by the Windows-refusal test). The factory was a 9th kwarg whose only consumer was the test suite; removing it also eliminates the `_default_sampler_factory` closure and the redundant ternary that duplicated the backend predicate. - Hoist `psutil.Process()` to a module-level `_THIS_PROCESS` constant. Constructing per call opens /proc//stat on Linux (a real syscall); PeakMemorySampler ticks every 50 ms by default. ~23 μs saved per sample. Combined with adding psutil to the [test] extra, the previous lazy-import inside `current_memory_bytes` is no longer needed — reverted to top-of-module import. - Add `psutil>=5.9` to the [test] extra (was [benchmark] only). pytest collection of tests/test_benchmark.py transitively imports benchmarks.helpers, so psutil needs to be present whenever tests run. The [benchmark] extra is kept for parity (end users who install only [benchmark] get the same dep). - Fold `platform.system().lower()` and the Linux/macOS ru_maxrss unit difference into module-level `_PLATFORM` and `_RU_MAXRSS_UNIT_BYTES` constants. Removes two scattered platform checks and lets the POSIX-only `resource` module be imported at module scope (gated on `_PLATFORM != 'windows'`) instead of lazily on every __init__/__exit__. - Drop `ChildPeakMemorySampler._latest` attribute. `__exit__` now writes the final delta directly into `_peak_bytes` (matching PeakMemorySampler's shape); `peak_bytes` is a trivial accessor. - Drop `peak_memory_bytes` back-compat alias and the identity-pinning test that locked it in. `benchmarks/` is not in the wheel (verified via [tool.hatch.build.targets.wheel].packages allowlist), so there are no external importers to break with a rename. - Sharpen the Windows-refusal error message in `ChildPeakMemorySampler.__init__` to point at the real blocker (Executor.run_simulation doesn't expose the subprocess.Popen handle) rather than implying psutil is missing — psutil is already a hard dep of the [benchmark] extra. Test count: 17 (was 18; one removed was the alias-identity assertion). All pass on Linux. --- benchmarks/benchmark.py | 34 ++++-------- benchmarks/helpers.py | 63 ++++++++++++----------- pyproject.toml | 3 +- tests/test_benchmark.py | 111 +++++++++++++++++++--------------------- 4 files changed, 100 insertions(+), 111 deletions(-) diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index b6df1321..ea23fb75 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -44,7 +44,6 @@ def run( timer: Callable[[], float] = perf_counter, memory_reader: Callable[[], float] = current_memory_bytes, memory_sampling_interval: float = 0.05, - mem_sampler_factory: Callable[[], Any] | None = None, ) -> dict[str, Any]: solver = kspaceFirstOrder if solver is None else solver cases = grid_sizes(options) @@ -53,17 +52,6 @@ def run( raise ValueError("max_cases must be positive") cases = cases[:max_cases] - # Sampler picks itself based on backend: cpp runs the simulation in a - # subprocess, so Python-process RSS sampling is meaningless — use - # getrusage(RUSAGE_CHILDREN) instead. Override via mem_sampler_factory - # for tests. - def _default_sampler_factory() -> Any: - if backend == "cpp": - return ChildPeakMemorySampler() - return PeakMemorySampler(memory_reader, memory_sampling_interval) - - sampler_factory = mem_sampler_factory or _default_sampler_factory - path = default_output_path(options) if output_path is None else Path(output_path) result: dict[str, Any] = { "comp_size": [], @@ -75,18 +63,18 @@ def _default_sampler_factory() -> Any: } if options.report_mem_usage: # Probe early so unsupported combinations (e.g. Windows + cpp) fail - # before any output file is written. + # before any output file is written. cpp probes the sampler class + # (Windows refusal happens in __init__); python probes the reader + # so platform-missing /proc or ps surfaces here, not mid-simulation. try: - sampler_factory() + if backend == "cpp": + ChildPeakMemorySampler() + else: + validate_memory_bytes(memory_reader()) except NotImplementedError as exc: raise ValueError(str(exc)) from exc - if backend != "cpp": - # For python backend, also probe the reader so platform-missing - # /proc or ps surfaces here rather than mid-simulation. - try: - validate_memory_bytes(memory_reader()) - except Exception as exc: - raise ValueError("report_mem_usage is not supported on this platform") from exc + except Exception as exc: + raise ValueError("report_mem_usage is not supported on this platform") from exc result["mem_usage"] = [] for case_index, (nx, ny, nz, scale) in enumerate(cases): @@ -98,10 +86,10 @@ def _default_sampler_factory() -> Any: # once per grid (ru_maxrss is cumulative — re-baselining per # iteration would silently zero the delta). Python backend's # PeakMemorySampler is per-iteration by design. - grid_sampler = sampler_factory() if (options.report_mem_usage and backend == "cpp") else None + grid_sampler = ChildPeakMemorySampler() if (options.report_mem_usage and backend == "cpp") else None for loop_num in range(1, options.num_averages + 1): if options.report_mem_usage: - memory_cm = grid_sampler if grid_sampler is not None else sampler_factory() + memory_cm = grid_sampler if grid_sampler is not None else PeakMemorySampler(memory_reader, memory_sampling_interval) with memory_cm as memory_sampler: start = timer() solver( diff --git a/benchmarks/helpers.py b/benchmarks/helpers.py index 993fbe93..f368b37d 100644 --- a/benchmarks/helpers.py +++ b/benchmarks/helpers.py @@ -10,8 +10,23 @@ from typing import Any, Callable import numpy as np +import psutil import kwave + +# Platform constants are computed once at import: Windows determines whether +# ChildPeakMemorySampler can construct at all; the ru_maxrss unit differs +# between Linux (kilobytes) and macOS (bytes, per BSD convention). +_PLATFORM = platform.system().lower() +_RU_MAXRSS_UNIT_BYTES = 1024 if _PLATFORM == "linux" else 1 + +# resource is POSIX-only — gated behind the Windows refusal in +# ChildPeakMemorySampler.__init__. Import lazily so importing helpers on +# Windows doesn't fail at module load (the rest of the module is portable). +if _PLATFORM != "windows": + import resource as _resource +else: + _resource = None # type: ignore[assignment] from kwave.data import Vector from kwave.kgrid import kWaveGrid from kwave.kmedium import kWaveMedium @@ -196,26 +211,21 @@ def validate_memory_bytes(value: float) -> float: +# Module-level Process handle: psutil.Process.__init__ opens /proc//stat +# on Linux to validate + cache create_time, so constructing per call adds a +# real syscall every poll. PeakMemorySampler ticks every 50 ms by default — +# reuse one Process instance instead. +_THIS_PROCESS = psutil.Process() + + def current_memory_bytes() -> float: """Return the current resident-set-size of THIS Python process, in bytes. - Note: this returns the *current* RSS at the moment of the call, not a - historical peak. Use ``PeakMemorySampler`` to track peak-over-time, or + Returns the *current* RSS at the moment of the call, not a historical + peak. Use ``PeakMemorySampler`` to track peak-over-time, or ``ChildPeakMemorySampler`` for subprocess (``backend="cpp"``) measurement. - - ``psutil`` is imported lazily so importing the benchmarks package (e.g. - during pytest collection) doesn't require the ``[benchmark]`` extra to - be installed. Only callers that actually take a memory measurement - need ``psutil``; tests inject their own readers via ``memory_reader=``. """ - import psutil # noqa: PLC0415 — intentionally lazy; see docstring - - return validate_memory_bytes(psutil.Process().memory_info().rss) - - -# Back-compat alias — the old name is misleading but kept so external callers -# (and any pre-merge benchmark JSON tooling) don't break. -peak_memory_bytes = current_memory_bytes + return validate_memory_bytes(_THIS_PROCESS.memory_info().rss) class ChildPeakMemorySampler: @@ -243,16 +253,16 @@ class ChildPeakMemorySampler: """ def __init__(self) -> None: - if platform.system().lower() == "windows": + if _PLATFORM == "windows": raise NotImplementedError( "ChildPeakMemorySampler is not supported on Windows " "(resource.RUSAGE_CHILDREN is POSIX-only). " - "Use backend='python' to measure simulation memory on Windows." + "Subprocess memory measurement on Windows would require " + "polling psutil.Process(child_pid) — blocked by Executor.run_simulation " + "not exposing the subprocess.Popen handle to the benchmark layer." ) - import resource # POSIX-only; gated by the Windows check above - - self._baseline = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss - self._latest = self._baseline + self._baseline = _resource.getrusage(_resource.RUSAGE_CHILDREN).ru_maxrss + self._peak_bytes = 0.0 def __enter__(self) -> ChildPeakMemorySampler: # No-op: ru_maxrss is cumulative across the lifetime of the parent, @@ -261,17 +271,12 @@ def __enter__(self) -> ChildPeakMemorySampler: return self def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None: - import resource - - self._latest = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + delta = max(0, _resource.getrusage(_resource.RUSAGE_CHILDREN).ru_maxrss - self._baseline) + self._peak_bytes = float(delta * _RU_MAXRSS_UNIT_BYTES) @property def peak_bytes(self) -> float: - delta = max(0, self._latest - self._baseline) - # ru_maxrss units differ by platform (Linux: KB, macOS: bytes). - if platform.system().lower() == "linux": - return float(delta * 1024) - return float(delta) + return self._peak_bytes class PeakMemorySampler: diff --git a/pyproject.toml b/pyproject.toml index c4eb0bb1..ce94050a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,8 @@ test = ["pytest", "coverage==7.14.1", "phantominator", "testfixtures==8.3.0", - "requests==2.34.2"] + "requests==2.34.2", + "psutil>=5.9"] example = ["gdown==6.1.0"] benchmark = ["psutil>=5.9"] docs = [ "sphinx<9", diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py index 350ceff7..c102c9bf 100644 --- a/tests/test_benchmark.py +++ b/tests/test_benchmark.py @@ -6,7 +6,7 @@ import pytest from benchmarks.benchmark import BenchmarkOptions, build_case, grid_sizes, run -from benchmarks.helpers import ChildPeakMemorySampler, current_memory_bytes, peak_memory_bytes +from benchmarks.helpers import ChildPeakMemorySampler def small_options(**overrides): @@ -162,13 +162,6 @@ def unsupported_memory_reader(): assert not output_path.exists() -def test_peak_memory_bytes_is_back_compat_alias_for_current_memory_bytes(): - # The old name was misleading (Linux/macOS/Windows readers return current - # RSS, not historical peak). Keep the alias so external benchmark tooling - # that imported the old name doesn't break. - assert peak_memory_bytes is current_memory_bytes - - @pytest.mark.parametrize( "kwargs, expected_match", [ @@ -184,58 +177,64 @@ def test_options_post_init_rejects_invalid_geometry(kwargs, expected_match): small_options(**kwargs) -def test_cpp_backend_shares_child_sampler_across_iterations(tmp_path: Path): - """For backend='cpp', the factory is called ONCE per grid (not per iteration). - - ChildPeakMemorySampler's baseline must be captured once per grid because - ru_maxrss is cumulative — re-baselining per iteration would silently - zero the reported delta (Greptile P1 on the original implementation). - """ - options = small_options(report_mem_usage=True, num_averages=3) - output_path = tmp_path / "benchmark.json" - times = iter([0.0, 1.0, 1.0, 3.0, 3.0, 7.0]) +class _FakeChildSampler: + """Test-only stand-in for ChildPeakMemorySampler. Records construction + and enter/exit counts so we can assert the "one instance per grid, + N enter/exits per grid" lifetime model is honored.""" - constructed: list[object] = [] + instances: list = [] enter_count = 0 - class FakeChildSampler: - def __init__(self): - constructed.append(self) - self._peak = 0 + def __init__(self): + type(self).instances.append(self) + self._peak = 0 + + def __enter__(self): + type(self).enter_count += 1 + return self - def __enter__(self): - nonlocal enter_count - enter_count += 1 - return self + def __exit__(self, *exc): + # Simulate cumulative growth: peak grows by 1 KB each iteration. + self._peak += 1024 - def __exit__(self, *exc): - # Simulate cumulative growth: peak grows by 1 KB each iteration. - self._peak += 1024 + @property + def peak_bytes(self) -> float: + return float(self._peak) - @property - def peak_bytes(self) -> float: - return float(self._peak) + @classmethod + def reset(cls): + cls.instances = [] + cls.enter_count = 0 + + +def test_cpp_backend_shares_child_sampler_across_iterations(tmp_path: Path): + """For backend='cpp', one ChildPeakMemorySampler is constructed per grid + (NOT per iteration). ru_maxrss is cumulative; re-baselining per iter + would silently zero the delta (Greptile P1 on the original impl).""" + _FakeChildSampler.reset() + options = small_options(report_mem_usage=True, num_averages=3) + output_path = tmp_path / "benchmark.json" + times = iter([0.0, 1.0, 1.0, 3.0, 3.0, 7.0]) def fake_solver(*args, **kwargs): return {"p": np.zeros((1, 1))} - result = run( - options, - backend="cpp", - max_cases=1, - output_path=output_path, - solver=fake_solver, - timer=lambda: next(times), - mem_sampler_factory=FakeChildSampler, - ) + with patch("benchmarks.benchmark.ChildPeakMemorySampler", _FakeChildSampler): + result = run( + options, + backend="cpp", + max_cases=1, + output_path=output_path, + solver=fake_solver, + timer=lambda: next(times), + ) - # max_cases=1 grid → factory called twice: once at the startup probe, - # once for the actual grid loop. NOT once per iteration. - assert len(constructed) == 2, f"factory called {len(constructed)} times; expected 2 (1 probe + 1 grid)" + # max_cases=1 grid → constructed twice: once at the startup probe, once + # for the actual grid loop. NOT once per iteration. + assert len(_FakeChildSampler.instances) == 2 # num_averages=3 iterations on that one grid → 3 enter/exit pairs on the SAME instance. - assert enter_count == 3 - # The actively-used sampler grew monotonically to 3 KB. - assert constructed[1].peak_bytes == pytest.approx(3072.0) + assert _FakeChildSampler.enter_count == 3 + assert _FakeChildSampler.instances[1].peak_bytes == pytest.approx(3072.0) assert "mem_usage" in result and len(result["mem_usage"]) == 1 @@ -244,23 +243,19 @@ def test_cpp_backend_reports_clear_error_when_child_sampler_unsupported(tmp_path options = small_options(report_mem_usage=True) output_path = tmp_path / "benchmark.json" - def unsupported_sampler_factory(): + def raises_not_implemented(*args, **kwargs): raise NotImplementedError("ChildPeakMemorySampler is not supported on Windows (resource.RUSAGE_CHILDREN is POSIX-only).") - with pytest.raises(ValueError, match="not supported on Windows"): - run( - options, - backend="cpp", - max_cases=1, - output_path=output_path, - mem_sampler_factory=unsupported_sampler_factory, - ) + with patch("benchmarks.benchmark.ChildPeakMemorySampler", raises_not_implemented): + with pytest.raises(ValueError, match="not supported on Windows"): + run(options, backend="cpp", max_cases=1, output_path=output_path) # Failed-fast, before writing the output file. assert not output_path.exists() def test_child_peak_memory_sampler_refuses_on_windows(): - with patch("benchmarks.helpers.platform.system", return_value="Windows"): + # Patch the module-level _PLATFORM constant (computed at import time). + with patch("benchmarks.helpers._PLATFORM", "windows"): with pytest.raises(NotImplementedError, match="not supported on Windows"): ChildPeakMemorySampler()