diff --git a/bencher/results/histogram_result.py b/bencher/results/histogram_result.py index 4f95712ce..d2561bb68 100644 --- a/bencher/results/histogram_result.py +++ b/bencher/results/histogram_result.py @@ -52,9 +52,12 @@ def to_plot( def _make_histogram(self, dataset: xr.Dataset, result_var: Parameter, **kwargs): """Render a single histogram from a dataset (no over_time handling).""" + units = getattr(result_var, "units", "") or "" + xlabel = f"{result_var.name} [{units}]" if units else result_var.name plot = dataset.hvplot( kind="hist", y=[result_var.name], + xlabel=xlabel, ylabel="count", legend="bottom_right", title=f"{result_var.name} vs Count", diff --git a/pyproject.toml b/pyproject.toml index 64572a84e..a6d17df3f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "holobench" -version = "1.106.2" +version = "1.107.0" authors = [{ name = "Austin Gregg-Smith", email = "blooop@gmail.com" }] description = "A package for benchmarking the performance of arbitrary functions" diff --git a/test/helpers.py b/test/helpers.py new file mode 100644 index 000000000..f0c27a035 --- /dev/null +++ b/test/helpers.py @@ -0,0 +1,64 @@ +"""Shared helpers for result-type unit tests. + +These small utilities were previously copy-pasted across several +``test_*_result.py`` modules; centralising them keeps the unwrap/inner-element +and run-config logic consistent in one place. +""" + +from __future__ import annotations + +import bencher as bn + + +def unwrap_hv(obj): + """Unwrap a panel Row/HoloViews pane returned by filter() to the hv object inside.""" + while True: + if hasattr(obj, "object"): + obj = obj.object + elif hasattr(obj, "objects"): + assert len(obj.objects) > 0 + obj = obj.objects[0] + else: + return obj + + +def inner_element(overlay): + """The plot methods return an hv.Overlay wrapping a single distribution element.""" + items = list(overlay) + assert len(items) == 1 + return items[0] + + +def run_cfg_with(repeats: int) -> bn.BenchRunCfg: + """A BenchRunCfg with caching and auto-plot disabled for the given repeat count.""" + return bn.BenchRunCfg( + repeats=repeats, cache_results=False, cache_samples=False, auto_plot=False + ) + + +def run_named_sweep(bench_class, name, input_vars, result_vars, repeats=1): + """Run a sweep on a freshly named ``Bench`` with caching and plot callbacks disabled. + + Shared by the bar and scatter result tests, which construct the bench by name. + """ + bench = bn.Bench(name, bench_class(), run_cfg=run_cfg_with(repeats)) + return bench.plot_sweep( + name, input_vars=input_vars, result_vars=result_vars, plot_callbacks=False + ) + + +def run_dist_sweep(worker_cls, input_vars, repeats, name_prefix): + """Run a categorical ``value`` sweep via ``to_bench`` for distribution-style tests. + + Shared by the box-whisker, violin and scatter-jitter result tests, which each + previously defined an identical ``_run_sweep`` differing only by name prefix. + """ + run_cfg = run_cfg_with(repeats) + bench = worker_cls().to_bench(run_cfg) + return bench.plot_sweep( + f"{name_prefix}_{worker_cls.__name__}_{repeats}", + input_vars=input_vars, + result_vars=["value"], + run_cfg=run_cfg, + plot_callbacks=False, + ) diff --git a/test/test_band_result.py b/test/test_band_result.py new file mode 100644 index 000000000..1d233ede5 --- /dev/null +++ b/test/test_band_result.py @@ -0,0 +1,218 @@ +"""Tests for bencher/results/holoview_results/band_result.py (BandResult).""" + +import math +from types import SimpleNamespace + +import holoviews as hv +import numpy as np +import pytest + +import bencher as bn +from bencher.results.bench_result_base import ReduceType +from bencher.results.holoview_results.band_result import BandResult +from test.helpers import run_cfg_with, unwrap_hv + + +def plot_opts(overlay: hv.Overlay) -> dict: + return overlay.opts.get("plot").kwargs + + +class BandBench(bn.ParametrizedSweep): + """Minimal 1-float sweep; the repeat dimension supplies the percentile sample pool.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + def benchmark(self): + self.throughput = self.size * 0.5 + math.sin(self.size) + + +class BandCatBench(bn.ParametrizedSweep): + """1 float + 1 categorical: the categorical dim is flattened into the sample pool.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + backend = bn.StringSweep(["redis", "local"], doc="Backend") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + def benchmark(self): + base = {"redis": 1.0, "local": 2.0}[self.backend] + self.throughput = self.size * base + + +class BandNanBench(bn.ParametrizedSweep): + """Sweep whose worker returns NaN for one input point.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + def benchmark(self): + self.throughput = float("nan") if self.size < 20 else self.size * 0.5 + + +class BandVecBench(bn.ParametrizedSweep): + """Vector (non-scalar) result — outside BandResult's SCALAR_RESULT_TYPES filter.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + vec = bn.ResultVec(size=2, units="m", doc="Vector result") + + def benchmark(self): + self.vec = [self.size, self.size * 2] + + +class BandTimeBench(bn.ParametrizedSweep): + """Sweep run over several time snapshots to exercise the over_time band path.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + offset = 0.0 + + def benchmark(self): + self.throughput = self.size * 0.5 + self.offset + + +@pytest.fixture(scope="module", name="res_1d") +def fixture_res_1d(): + run_cfg = run_cfg_with(repeats=5) + bench = BandBench().to_bench(run_cfg) + return bench.plot_sweep( + "band_1d", input_vars=["size"], result_vars=["throughput"], run_cfg=run_cfg + ) + + +@pytest.fixture(scope="module", name="res_cat") +def fixture_res_cat(): + run_cfg = run_cfg_with(repeats=2) + bench = BandCatBench().to_bench(run_cfg) + return bench.plot_sweep( + "band_cat", + input_vars=["size", "backend"], + result_vars=["throughput"], + run_cfg=run_cfg, + ) + + +@pytest.fixture(scope="module", name="res_time") +def fixture_res_time(): + benchable = BandTimeBench() + run_cfg = bn.BenchRunCfg( + over_time=True, repeats=2, cache_results=False, cache_samples=False, auto_plot=False + ) + bench = benchable.to_bench(run_cfg) + res = None + for i in range(3): + benchable.offset = i * 1.0 + run_cfg.clear_cache = True + run_cfg.clear_history = i == 0 + res = bench.plot_sweep( + "band_time", + input_vars=["size"], + result_vars=["throughput"], + run_cfg=run_cfg, + time_src=f"2026-06-{10 + i:02d} snap{i:04d}", + ) + return res + + +class TestBandResult: + def test_to_band_overlay_composition(self, res_1d): + """to_band yields two percentile Areas, a median Curve and a samples Scatter.""" + plot = res_1d.to_band() + assert plot is not None + overlay = unwrap_hv(plot) + assert isinstance(overlay, hv.Overlay) + # exact types: hv.Area is a subclass of hv.Curve, so isinstance would double count + assert len([el for el in overlay if type(el) is hv.Area]) == 2 + assert len([el for el in overlay if type(el) is hv.Curve]) == 1 + assert len([el for el in overlay if type(el) is hv.Scatter]) == 1 + + def test_band_labels_and_dims(self, res_1d): + """Element labels and kdims/vdims reflect the input and result variables.""" + overlay = unwrap_hv(res_1d.to_band()) + labels = sorted(el.label for el in overlay) + assert labels == sorted(["10th–90th pctl", "25th–75th pctl", "median", "samples"]) + for el in overlay: + assert [d.name for d in el.kdims] == ["size"] + outer = next(el for el in overlay if el.label == "10th–90th pctl") + assert [d.name for d in outer.vdims] == ["throughput_p10", "throughput_p90"] + median = next(el for el in overlay if el.label == "median") + assert [d.name for d in median.vdims] == ["throughput"] + + def test_band_title_and_ylabel(self, res_1d): + """Default title names var vs x-axis; ylabel includes the units.""" + overlay = unwrap_hv(res_1d.to_band()) + opts = plot_opts(overlay) + assert opts["title"] == "throughput vs size (aggregated over repeat)" + assert opts["ylabel"] == "throughput [MB/s]" + + def test_band_explicit_title_preserved(self, res_1d): + ds = res_1d.to_dataset(reduce=ReduceType.NONE) + rv = res_1d.bench_cfg.result_vars[0] + overlay = res_1d.to_band_ds(ds, rv, title="my custom title") + assert plot_opts(overlay)["title"] == "my custom title" + + def test_band_enable_scatter_false(self, res_1d): + """enable_scatter=False drops the samples Scatter layer.""" + ds = res_1d.to_dataset(reduce=ReduceType.NONE) + rv = res_1d.bench_cfg.result_vars[0] + overlay = res_1d.to_band_ds(ds, rv, enable_scatter=False) + assert not any(isinstance(el, hv.Scatter) for el in overlay) + assert any(isinstance(el, hv.Curve) for el in overlay) + + def test_band_categorical_flattened_into_samples(self, res_cat): + """A categorical dim becomes part of the sample pool; the float stays on x.""" + overlay = unwrap_hv(res_cat.to_band()) + assert isinstance(overlay, hv.Overlay) + for el in overlay: + assert [d.name for d in el.kdims] == ["size"] + assert plot_opts(overlay)["title"] == "throughput vs size (aggregated over backend)" + + def test_band_over_time_uses_time_axis(self, res_time): + """With over_time history, the band x-axis is the over_time dimension.""" + ds = res_time.to_dataset(reduce=ReduceType.NONE) + rv = res_time.bench_cfg.result_vars[0] + overlay = res_time.to_band_ds(ds, rv) + assert isinstance(overlay, hv.Overlay) + for el in overlay: + assert [d.name for d in el.kdims] == ["over_time"] + assert plot_opts(overlay)["title"] == "throughput vs over_time (aggregated over size)" + + def test_band_suppressed_when_regression_overlay_exists(self, res_1d): + """to_band_ds returns None when the regression overlay already shows the history.""" + ds = res_1d.to_dataset(reduce=ReduceType.NONE) + rv = res_1d.bench_cfg.result_vars[0] + original = res_1d.regression_report + res_1d.regression_report = SimpleNamespace( + results=[SimpleNamespace(variable="throughput", historical=[1.0, 2.0])] + ) + try: + assert res_1d.to_band_ds(ds, rv) is None + finally: + res_1d.regression_report = original + + def test_to_band_rejects_non_scalar_result(self): + """A non-scalar (vector) result is outside SCALAR_RESULT_TYPES, so no band is drawn. + + BandResult's filter accepts any float/cat/repeat shape (repeats>=1 included), + so the meaningful rejection path is the result type — a vector sweep must not + silently produce a misleading band overlay. + """ + run_cfg = run_cfg_with(repeats=3) + bench = BandVecBench().to_bench(run_cfg) + res = bench.plot_sweep( + "band_vec", input_vars=["size"], result_vars=["vec"], run_cfg=run_cfg + ) + result = res.to(BandResult, override=False) + assert not isinstance(unwrap_hv(result), hv.Overlay) + + def test_band_nan_input_does_not_crash(self): + """NaN results survive percentile computation and are masked out of the scatter.""" + run_cfg = run_cfg_with(repeats=3) + bench = BandNanBench().to_bench(run_cfg) + res = bench.plot_sweep( + "band_nan", input_vars=["size"], result_vars=["throughput"], run_cfg=run_cfg + ) + overlay = unwrap_hv(res.to_band()) + assert isinstance(overlay, hv.Overlay) + scatter = next(el for el in overlay if isinstance(el, hv.Scatter)) + assert not np.isnan(scatter.dimension_values("throughput")).any() diff --git a/test/test_bar_result.py b/test/test_bar_result.py new file mode 100644 index 000000000..77dfc8ccb --- /dev/null +++ b/test/test_bar_result.py @@ -0,0 +1,133 @@ +"""Tests for bencher/results/holoview_results/bar_result.py""" + +import math +import unittest + +import holoviews as hv +import panel as pn + +import bencher as bn +from bencher.results.holoview_results.bar_result import BarResult +from test.helpers import run_named_sweep as _run_sweep + + +class Cat1DBench(bn.ParametrizedSweep): + """Minimal 1-categorical sweep accepted by the bar filter (0 floats, 1 cat).""" + + method = bn.StringSweep(["alpha", "beta", "gamma"]) + score = bn.ResultFloat(units="m") + + def benchmark(self): + self.score = len(self.method) * 1.5 + + +class Cat1DNanBench(bn.ParametrizedSweep): + """Sweep where the worker returns NaN for one point (missing-value default).""" + + method = bn.StringSweep(["alpha", "beta", "gamma"]) + score = bn.ResultFloat(units="m") + + def benchmark(self): + self.score = float("nan") if self.method == "beta" else len(self.method) * 1.5 + + +class TwoCatBench(bn.ParametrizedSweep): + """Two categorical inputs so the bar chart groups by the second cat.""" + + method = bn.StringSweep(["alpha", "beta"]) + backend = bn.StringSweep(["cpu", "gpu"]) + score = bn.ResultFloat(units="m") + + def benchmark(self): + self.score = len(self.method) + len(self.backend) * 0.5 + + +class BoolBench(bn.ParametrizedSweep): + """ResultBool sweep for the repeats>=2 REDUCE scenario of to_bar.""" + + method = bn.StringSweep(["alpha", "beta"]) + passed = bn.ResultBool() + + def benchmark(self): + self.passed = self.method == "alpha" + + +class Float1DBench(bn.ParametrizedSweep): + """Float-input sweep that the bar filter (float_range 0..0) must reject.""" + + x = bn.FloatSweep(bounds=(0, 1)) + score = bn.ResultFloat(units="m") + + def benchmark(self): + self.score = self.x * 2 + + +class TestBarResult(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.res_cat = _run_sweep(Cat1DBench, "bar_cat", ["method"], ["score"]) + cls.res_nan = _run_sweep(Cat1DNanBench, "bar_nan", ["method"], ["score"]) + cls.res_2cat = _run_sweep(TwoCatBench, "bar_2cat", ["method", "backend"], ["score"]) + cls.res_bool = _run_sweep(BoolBench, "bar_bool", ["method"], ["passed"], repeats=2) + cls.res_float = _run_sweep(Float1DBench, "bar_float", ["x"], ["score"]) + + def test_to_bar_returns_row_with_bars(self): + result = self.res_cat.to_bar() + self.assertIsInstance(result, pn.Row) + self.assertGreater(len(result), 0) + self.assertIsInstance(result[0], pn.pane.HoloViews) + self.assertIsInstance(result[0].object, hv.Bars) + + def test_to_plot_delegates_to_bar(self): + result = BarResult.to_plot(self.res_cat) + self.assertIsInstance(result, pn.Row) + self.assertIsInstance(result[0].object, hv.Bars) + + def test_to_bar_ds_dims_and_labels(self): + """Input var on kdims, result var on vdims, ylabel includes units.""" + ds = self.res_cat.to_dataset() + rv = self.res_cat.bench_cfg.result_vars[0] + result = self.res_cat.to_bar_ds(ds, rv) + self.assertIsInstance(result, pn.pane.HoloViews) + element = result.object + self.assertIsInstance(element, hv.Bars) + self.assertEqual(element.kdims[0].name, "method") + self.assertEqual(element.vdims[0].name, "score") + opts = element.opts.get().kwargs + self.assertEqual(opts["title"], "score vs method") + self.assertEqual(opts["ylabel"], "score [m]") + + def test_to_bar_bool_with_repeats(self): + """ResultBool with repeats>=2 matches the REDUCE scenario and still plots.""" + result = self.res_bool.to_bar() + self.assertIsInstance(result, pn.Row) + self.assertGreater(len(result), 0) + self.assertIsInstance(result[0].object, hv.Bars) + + def test_to_bar_groups_by_extra_cats(self): + """With two categorical inputs the second cat becomes the by grouping.""" + ds = self.res_2cat.to_dataset() + rv = self.res_2cat.bench_cfg.result_vars[0] + result = self.res_2cat.to_bar_ds(ds, rv) + self.assertIsInstance(result, pn.pane.HoloViews) + element = result.object + kdim_names = [d.name for d in element.kdims] + self.assertIn("method", kdim_names) + self.assertIn("backend", kdim_names) + + def test_to_bar_nan_does_not_crash(self): + ds = self.res_nan.to_dataset() + self.assertTrue(any(math.isnan(v) for v in ds["score"].values.ravel())) + result = self.res_nan.to_bar() + self.assertIsInstance(result, pn.Row) + self.assertIsInstance(result[0].object, hv.Bars) + + def test_to_bar_rejects_float_sweep(self): + """A float input sweep fails the float_range=(0,0) filter when override=False. + + The filter returns None (or a Markdown debug panel), never a bar pane. + """ + result = self.res_float.to_bar(override=False) + self.assertNotIsInstance(result, (pn.Row, pn.pane.HoloViews)) + if result is not None: + self.assertIsInstance(result, pn.pane.Markdown) diff --git a/test/test_bench_cfg.py b/test/test_bench_cfg.py new file mode 100644 index 000000000..2874c39a1 --- /dev/null +++ b/test/test_bench_cfg.py @@ -0,0 +1,360 @@ +"""Tests for bencher/bench_cfg.py — BenchPlotSrvCfg, BenchRunCfg and BenchCfg. + +Subsampling helpers (subsampling_divisions_to_samples, samples_per_var) are +covered in test/test_usability.py, hash stability/golden hashes in +test/test_hash_persistent.py and normalize_show in test/test_run.py, so they +are not duplicated here. +""" + +import math +from datetime import datetime +from types import SimpleNamespace + +import panel as pn +import pytest + +import bencher as bn +from bencher.bench_cfg import BenchCfg, BenchPlotSrvCfg, BenchRunCfg, DimsCfg +from bencher.job import Executors +from bencher.variables.results import OptDir + + +class SweepCfg(bn.ParametrizedSweep): + """Small sweep used to populate BenchCfg input/result/const vars.""" + + theta = bn.FloatSweep(default=0, bounds=[0, math.pi], samples=4) + offset = bn.FloatSweep(default=0, bounds=[0, 1], samples=3) + out_sin = bn.ResultFloat(units="v") + + +class SweepCfgNoOptDir(bn.ParametrizedSweep): + """Result variable that is not an optimization target.""" + + out_fixed = bn.ResultFloat(units="v", direction=OptDir.none) + + +def make_bench_cfg(**overrides) -> BenchCfg: + """Build a fully-populated BenchCfg for describe/hash tests.""" + params = dict( + input_vars=[SweepCfg.param.theta], + result_vars=[SweepCfg.param.out_sin], + const_vars=[(SweepCfg.param.offset, 0.5)], + meta_vars=[], + all_vars=[SweepCfg.param.theta], + bench_name="bench_cfg_test", + title="My Title", + description="A longer description of the benchmark", + post_description="Comments on the output", + ) + params.update(overrides) + return BenchCfg(**params) + + +# ── BenchPlotSrvCfg defaults ──────────────────────────────────────────────── + + +class TestBenchPlotSrvCfgDefaults: + def test_defaults(self): + cfg = BenchPlotSrvCfg() + assert cfg.port is None + assert cfg.allow_ws_origin is False + assert cfg.show is True + + +# ── BenchRunCfg defaults ──────────────────────────────────────────────────── + + +class TestBenchRunCfgDefaults: + def test_execution_defaults(self): + cfg = BenchRunCfg() + assert cfg.repeats == 1 + assert cfg.subsampling_divisions == 0 + assert cfg.samples_per_var is None + assert cfg.executor == Executors.SERIAL + assert cfg.nightly is False + assert cfg.headless is False + assert cfg.dry_run is False + + def test_cache_defaults_all_false(self): + cfg = BenchRunCfg() + assert cfg.cache_results is False + assert cfg.cache_samples is False + assert cfg.clear_cache is False + assert cfg.clear_sample_cache is False + assert cfg.overwrite_sample_cache is False + assert cfg.only_hash_tag is False + assert cfg.only_plot is False + assert cfg.cache_size is None + + def test_display_defaults(self): + cfg = BenchRunCfg() + assert cfg.print_bench_inputs is True + assert cfg.print_bench_results is True + assert cfg.summarise_constant_inputs is True + assert cfg.print_pandas is False + assert cfg.print_xarray is False + assert cfg.serve_pandas is False + assert cfg.serve_pandas_flat is True + assert cfg.serve_xarray is False + + def test_visualization_defaults(self): + cfg = BenchRunCfg() + assert cfg.auto_plot is True + assert cfg.use_holoview is False + assert cfg.use_optuna is False + assert cfg.plot_size is None + assert cfg.plot_width is None + assert cfg.plot_height is None + assert cfg.backend == "panel" + + def test_time_defaults(self): + cfg = BenchRunCfg() + assert cfg.over_time is False + assert cfg.clear_history is False + assert cfg.max_time_events is None + assert cfg.max_slider_points == 10 + assert cfg.show_aggregated_time_tab is False + assert cfg.show_aggregate_plots is True + assert cfg.time_event is None + assert cfg.run_tag == "" + + def test_run_date_autopopulated(self): + before = datetime.now() + cfg = BenchRunCfg() + after = datetime.now() + assert isinstance(cfg.run_date, datetime) + assert before <= cfg.run_date <= after + + def test_regression_defaults(self): + cfg = BenchRunCfg() + assert cfg.regression_detection is False + assert cfg.regression_method == "adaptive" + assert cfg.regression_fail is False + + +# ── BenchRunCfg construction round-trips ──────────────────────────────────── + + +class TestBenchRunCfgRoundTrip: + def test_values_round_trip_through_construction(self): + cfg = BenchRunCfg(repeats=5, over_time=True, cache_results=True, cache_samples=True) + assert cfg.repeats == 5 + assert cfg.over_time is True + assert cfg.cache_results is True + assert cfg.cache_samples is True + + def test_explicit_run_date_preserved(self): + stamp = datetime(2024, 1, 2, 3, 4, 5) + cfg = BenchRunCfg(run_date=stamp) + assert cfg.run_date == stamp + + def test_deprecated_level_kwarg_maps_to_subsampling_divisions(self): + with pytest.warns(DeprecationWarning): + cfg = BenchRunCfg(level=3) + assert cfg.subsampling_divisions == 3 + + def test_deep_returns_independent_copy(self): + cfg = BenchRunCfg(repeats=4) + copy = cfg.deep() + assert copy is not cfg + assert copy.repeats == 4 + copy.repeats = 9 + assert cfg.repeats == 4 + + +# ── BenchRunCfg.with_defaults ─────────────────────────────────────────────── + + +class TestWithDefaults: + def test_none_run_cfg_creates_new_instance(self): + cfg = BenchRunCfg.with_defaults(None, repeats=7, over_time=True) + assert isinstance(cfg, BenchRunCfg) + assert cfg.repeats == 7 + assert cfg.over_time is True + + def test_explicit_caller_value_not_overridden(self): + base = BenchRunCfg(repeats=3) + merged = BenchRunCfg.with_defaults(base, repeats=7) + assert merged.repeats == 3 + + def test_default_value_is_overridden(self): + base = BenchRunCfg() # repeats still at its param default of 1 + merged = BenchRunCfg.with_defaults(base, repeats=7) + assert merged.repeats == 7 + + def test_original_cfg_not_mutated(self): + base = BenchRunCfg() + BenchRunCfg.with_defaults(base, repeats=7) + assert base.repeats == 1 + + def test_unknown_key_raises_value_error(self): + with pytest.raises(ValueError, match="not_a_real_param"): + BenchRunCfg.with_defaults(None, not_a_real_param=1) + + def test_deprecated_level_key_warns_and_maps(self): + with pytest.warns(DeprecationWarning): + cfg = BenchRunCfg.with_defaults(None, level=4) + assert cfg.subsampling_divisions == 4 + + +# ── BenchCfg.hash_persistent ──────────────────────────────────────────────── + + +class TestBenchCfgHashPersistent: + def test_same_config_same_hash(self): + assert make_bench_cfg().hash_persistent( + include_repeats=True + ) == make_bench_cfg().hash_persistent(include_repeats=True) + + def test_different_repeats_different_hash(self): + h1 = make_bench_cfg(repeats=1).hash_persistent(include_repeats=True) + h2 = make_bench_cfg(repeats=2).hash_persistent(include_repeats=True) + assert h1 != h2 + + def test_repeats_ignored_when_include_repeats_false(self): + h1 = make_bench_cfg(repeats=1).hash_persistent(include_repeats=False) + h2 = make_bench_cfg(repeats=2).hash_persistent(include_repeats=False) + assert h1 == h2 + + def test_different_tag_different_hash(self): + h1 = make_bench_cfg(tag="a").hash_persistent(include_repeats=True) + h2 = make_bench_cfg(tag="b").hash_persistent(include_repeats=True) + assert h1 != h2 + + def test_different_bench_name_different_hash(self): + h1 = make_bench_cfg(bench_name="bench_a").hash_persistent(include_repeats=True) + h2 = make_bench_cfg(bench_name="bench_b").hash_persistent(include_repeats=True) + assert h1 != h2 + + def test_const_var_value_changes_hash(self): + h1 = make_bench_cfg( + const_vars=[(SweepCfg.param.offset, 0.5)], + ).hash_persistent(include_repeats=True) + h2 = make_bench_cfg( + const_vars=[(SweepCfg.param.offset, 0.9)], + ).hash_persistent(include_repeats=True) + assert h1 != h2 + + +# ── BenchCfg describe/summary helpers ─────────────────────────────────────── + + +class TestDescribeBenchmark: + def test_mentions_input_and_result_vars(self): + desc = make_bench_cfg().describe_benchmark() + assert "Input Variables:" in desc + assert "theta" in desc + assert "Result Variables:" in desc + assert "out_sin" in desc + + def test_mentions_constants_with_value(self): + desc = make_bench_cfg().describe_benchmark() + assert "Constants:" in desc + assert "offset" in desc + assert "value: 0.5" in desc + + def test_constants_hidden_when_summarise_disabled(self): + desc = make_bench_cfg(summarise_constant_inputs=False).describe_benchmark() + assert "Constants:" not in desc + assert "offset" not in desc + + def test_includes_meta_information(self): + cfg = make_bench_cfg(run_tag="my_run_tag") + desc = cfg.describe_benchmark() + assert f"run date: {cfg.run_date}" in desc + assert "run tag: my_run_tag" in desc + assert "cache_results: False" in desc + + def test_reports_sample_counts(self): + desc = make_bench_cfg().describe_benchmark() + assert "number of samples: 4" in desc + + +class TestSweepSentence: + def test_sentence_mentions_vars_and_shape(self): + sentence = make_bench_cfg().sweep_sentence() + assert isinstance(sentence, pn.pane.Markdown) + text = sentence.object + assert "theta" in text + assert "out_sin" in text + # theta has 4 samples; a second dimension of 1 is appended + assert "4x1" in text + + def test_sentence_two_dims(self): + cfg = make_bench_cfg(all_vars=[SweepCfg.param.theta, SweepCfg.param.offset]) + text = cfg.sweep_sentence().object + assert "theta by offset" in text + # reversed order of all_vars: offset (3 samples) x theta (4 samples) + assert "3x4" in text + + +class TestPanelHelpers: + def test_to_title(self): + title = make_bench_cfg().to_title() + assert isinstance(title, pn.pane.Markdown) + assert title.object == "# My Title" + assert title.name == "My Title" + + def test_to_description(self): + desc = make_bench_cfg().to_description(width=600) + assert isinstance(desc, pn.pane.Markdown) + assert desc.object == "A longer description of the benchmark" + assert desc.width == 600 + + def test_to_post_description(self): + post = make_bench_cfg().to_post_description() + assert post.object == "Comments on the output" + + def test_to_description_empty_when_none(self): + assert make_bench_cfg(description=None).to_description().object == "" + + def test_inputs_as_str(self): + assert make_bench_cfg().inputs_as_str() == ["theta"] + + +# ── input var partitioning and optuna targets ─────────────────────────────── + + +class TestPartitionInputVars: + def test_partition_by_optimize_flag(self): + opt_var = SimpleNamespace(optimize=True) + non_opt_var = SimpleNamespace(optimize=False) + no_flag_var = SimpleNamespace() + opt, non_opt = BenchCfg.partition_input_vars([opt_var, non_opt_var, no_flag_var]) + assert opt == [opt_var, no_flag_var] # missing flag defaults to optimized + assert non_opt == [non_opt_var] + + def test_optimized_input_vars_properties(self): + non_opt_var = SimpleNamespace(optimize=False) + cfg = make_bench_cfg(input_vars=[SweepCfg.param.theta, non_opt_var]) + assert cfg.optimized_input_vars == [SweepCfg.param.theta] + assert cfg.non_optimized_input_vars == [non_opt_var] + + def test_properties_handle_none_input_vars(self): + cfg = make_bench_cfg(input_vars=None) + assert cfg.optimized_input_vars == [] + assert cfg.non_optimized_input_vars == [] + + +class TestOptunaTargets: + def test_targets_exclude_direction_none(self): + cfg = make_bench_cfg(result_vars=[SweepCfg.param.out_sin, SweepCfgNoOptDir.param.out_fixed]) + assert cfg.optuna_targets() == ["out_sin"] + + def test_targets_as_var_returns_objects(self): + cfg = make_bench_cfg(result_vars=[SweepCfg.param.out_sin]) + assert cfg.optuna_targets(as_var=True) == [SweepCfg.param.out_sin] + + +# ── DimsCfg ───────────────────────────────────────────────────────────────── + + +class TestDimsCfg: + def test_dims_extracted_from_bench_cfg(self): + cfg = make_bench_cfg(all_vars=[SweepCfg.param.theta, SweepCfg.param.offset]) + dims = DimsCfg(cfg) + assert dims.dims_name == ["theta", "offset"] + assert dims.dims_size == [4, 3] + assert dims.dim_ranges_index == [[0, 1, 2, 3], [0, 1, 2]] + assert list(dims.coords.keys()) == ["theta", "offset"] + assert len(dims.coords["theta"]) == 4 diff --git a/test/test_bench_result.py b/test/test_bench_result.py new file mode 100644 index 000000000..6b4eb481d --- /dev/null +++ b/test/test_bench_result.py @@ -0,0 +1,227 @@ +"""Tests for BenchResult container behavior (bencher/results/bench_result.py).""" + +import unittest + +import numpy as np +import panel as pn + +import bencher as bn +from bencher.results.bench_result import BenchResult +from bencher.results.holoview_results.line_result import LineResult + + +class Linear(bn.ParametrizedSweep): + """Minimal 1-float-input sweep with a deterministic worker (value = 2 * x).""" + + x = bn.FloatSweep(default=0, bounds=[0, 2], samples=3) + value = bn.ResultFloat(units="m") + + def benchmark(self): + self.value = self.x * 2.0 + + +class LinearWithNan(bn.ParametrizedSweep): + """Same as Linear but returns NaN for the midpoint (x == 1).""" + + x = bn.FloatSweep(default=0, bounds=[0, 2], samples=3) + value = bn.ResultFloat(units="m") + + def benchmark(self): + self.value = float("nan") if self.x == 1.0 else self.x * 2.0 + + +def run_sweep(sweep_cls=Linear) -> BenchResult: + """Run the smallest possible sweep (1 input var, 3 samples, repeats=1, no plots).""" + bench = bn.Bench("test_bench_result", sweep_cls()) + return bench.plot_sweep( + "sweep", + input_vars=["x"], + result_vars=["value"], + run_cfg=bn.BenchRunCfg(repeats=1, cache_results=False, cache_samples=False), + auto_plot=False, + ) + + +def collect_hv_elements(panel_obj) -> list: + """Recursively collect holoviews elements from a Panel layout.""" + elements = [] + if hasattr(panel_obj, "opts") and hasattr(panel_obj, "kdims"): + elements.append(panel_obj) + elif hasattr(panel_obj, "object") and hasattr(panel_obj.object, "opts"): + elements.append(panel_obj.object) + elif hasattr(panel_obj, "__iter__"): + for child in panel_obj: + elements.extend(collect_hv_elements(child)) + return elements + + +def _failing_cb(self, **kwargs): # pylint: disable=unused-argument + raise RuntimeError("intentional test failure") + + +def _marker_cb_a(self, **kwargs): # pylint: disable=unused-argument + return pn.pane.Markdown("marker_a") + + +def _marker_cb_b(self, **kwargs): # pylint: disable=unused-argument + return pn.pane.Markdown("marker_b") + + +class TestBenchResultTo(unittest.TestCase): + """Tests for the BenchResult.to(result_type) conversion path.""" + + @classmethod + def setUpClass(cls): + cls.res = run_sweep() + + def test_to_line_result_returns_viewable(self): + plot = self.res.to(LineResult) + self.assertIsNotNone(plot) + self.assertIsInstance(plot, pn.viewable.Viewable) + + def test_to_line_result_plots_worker_values(self): + plot = self.res.to(LineResult) + elements = collect_hv_elements(plot) + self.assertGreater(len(elements), 0, "Expected at least one holoviews element") + df = elements[0].dframe() + self.assertIn("x", df.columns) + self.assertIn("value", df.columns) + df = df.sort_values("x") + np.testing.assert_allclose(df["x"].to_numpy(), [0.0, 1.0, 2.0]) + np.testing.assert_allclose(df["value"].to_numpy(), [0.0, 2.0, 4.0]) + + def test_to_does_not_mutate_source(self): + ds_before = self.res.ds + self.res.to(LineResult) + self.assertIs(self.res.ds, ds_before) + + +class TestBenchResultToAuto(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.res = run_sweep() + + def test_to_auto_explicit_plot_list(self): + panes = self.res.to_auto(plot_list=[LineResult.to_plot]) + self.assertIsInstance(panes, pn.Column) + self.assertEqual(len(panes), 1) + self.assertGreater(len(collect_hv_elements(panes)), 0) + + def test_to_auto_remove_plots(self): + both = self.res.to_auto(plot_list=[_marker_cb_a, _marker_cb_b]) + self.assertEqual([p.object for p in both], ["marker_a", "marker_b"]) + removed = self.res.to_auto( + plot_list=[_marker_cb_a, _marker_cb_b], + remove_plots=[_marker_cb_b], + ) + self.assertEqual([p.object for p in removed], ["marker_a"]) + + def test_to_auto_all_removed_returns_placeholder(self): + panes = self.res.to_auto(plot_list=[LineResult.to_plot], remove_plots=[LineResult.to_plot]) + self.assertEqual(len(panes), 1) + self.assertIsInstance(panes[0], pn.pane.Markdown) + self.assertIn("No Plotters are able to represent these results", panes[0].object) + + def test_to_auto_failing_callback_logged_not_raised(self): + with self.assertLogs(level="ERROR") as captured: + panes = self.res.to_auto(plot_list=[_failing_cb, LineResult.to_plot]) + self.assertTrue(any("_failing_cb" in msg for msg in captured.output)) + # The failing callback is skipped but the working one still renders. + self.assertEqual(len(panes), 1) + self.assertGreater(len(collect_hv_elements(panes)), 0) + + +class TestBenchResultToAutoPlots(unittest.TestCase): + def test_to_auto_plots_first_entry_is_sweep_summary(self): + res = run_sweep() + col = res.to_auto_plots() + self.assertIsInstance(col, pn.Column) + self.assertGreaterEqual(len(col), 2) + self.assertEqual(col[0].name, "Plots View") + + +class TestBenchResultPlot(unittest.TestCase): + def test_plot_none_callbacks_returns_none(self): + res = run_sweep() + res.bench_cfg.plot_callbacks = None + self.assertIsNone(res.plot()) + + def test_plot_empty_callbacks_returns_empty_column(self): + res = run_sweep() + res.bench_cfg.plot_callbacks = [] + out = res.plot() + self.assertIsInstance(out, pn.Column) + self.assertEqual(len(out), 0) + + def test_plot_list_callbacks_one_entry_each(self): + res = run_sweep() + res.bench_cfg.plot_callbacks = [ + lambda r: pn.pane.Markdown("first"), + lambda r: pn.pane.Markdown("second"), + ] + out = res.plot() + self.assertIsInstance(out, pn.Column) + self.assertEqual(len(out), 2) + self.assertEqual(out[0].object, "first") + self.assertEqual(out[1].object, "second") + + def test_plot_callbacks_receive_result_instance(self): + res = run_sweep() + seen = [] + res.bench_cfg.plot_callbacks = [lambda r: seen.append(r) or pn.pane.Markdown("cb")] + res.plot() + self.assertEqual(seen, [res]) + + +class TestDefaultPlotCallbacks(unittest.TestCase): + def test_default_plot_callbacks_non_empty(self): + callbacks = BenchResult.default_plot_callbacks() + self.assertIsInstance(callbacks, list) + self.assertGreater(len(callbacks), 0) + self.assertTrue(all(callable(cb) for cb in callbacks)) + self.assertIn(LineResult.to_plot, callbacks) + + +class TestFromExisting(unittest.TestCase): + def test_from_existing_copies_state(self): + res = run_sweep() + clone = BenchResult.from_existing(res) + self.assertIsNot(clone, res) + self.assertIsInstance(clone, BenchResult) + self.assertIs(clone.ds, res.ds) + self.assertIs(clone.bench_cfg, res.bench_cfg) + self.assertIs(clone.plt_cnt_cfg, res.plt_cnt_cfg) + self.assertIs(clone.regression_report, res.regression_report) + + def test_from_existing_produces_same_dataset(self): + res = run_sweep() + clone = BenchResult.from_existing(res) + np.testing.assert_allclose( + clone.to_dataset()["value"].values, res.to_dataset()["value"].values + ) + + +class TestNanRobustness(unittest.TestCase): + """A worker returning NaN for one point must not crash plotting paths.""" + + @classmethod + def setUpClass(cls): + cls.res = run_sweep(LinearWithNan) + + def test_nan_present_in_dataset(self): + vals = self.res.to_dataset()["value"].values.flatten() + self.assertEqual(int(np.isnan(vals).sum()), 1) + np.testing.assert_allclose(np.sort(vals[~np.isnan(vals)]), [0.0, 4.0]) + + def test_to_line_with_nan_does_not_crash(self): + plot = self.res.to(LineResult) + self.assertIsNotNone(plot) + + def test_to_auto_plots_with_nan_does_not_crash(self): + col = self.res.to_auto_plots() + self.assertIsInstance(col, pn.Column) + self.assertEqual(col[0].name, "Plots View") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_bencher.py b/test/test_bencher.py index a2bbd9d9d..d57a1e179 100644 --- a/test/test_bencher.py +++ b/test/test_bencher.py @@ -201,8 +201,9 @@ def test_pareto(self, input_vars, result_vars, repeats) -> None: ), ) - # TODO There are still name collisions when run on all possible inputs, but at the moment the name collisions end up plotting an identical graph anyway so it doesn't matter that much. Future work is to enable this test to confirm that all graph names are fully unique even if they have the same pixels. - @pytest.mark.skip() + @pytest.mark.skip( + reason="name collisions across input permutations; see plans/05-test-coverage.md task 4" + ) @settings(deadline=10000) @given( input_vars=st.sampled_from(input_var_cat_permutations), diff --git a/test/test_box_whisker_result.py b/test/test_box_whisker_result.py new file mode 100644 index 000000000..9a198471b --- /dev/null +++ b/test/test_box_whisker_result.py @@ -0,0 +1,124 @@ +"""Tests for bencher/results/holoview_results/distribution_result/box_whisker_result.py + +Also covers the shared DistributionResult base behavior (filtering, kdim/vdim +setup, title/ylabel labelling) through the BoxWhisker subclass. +""" + +import math +import unittest + +import holoviews as hv +import panel as pn + +import bencher as bn +from bencher.results.bench_result_base import ReduceType +from bencher.results.holoview_results.distribution_result.box_whisker_result import ( + BoxWhiskerResult, +) +from test.helpers import inner_element as _inner_element, run_dist_sweep + + +class DistBench(bn.ParametrizedSweep): + """Deterministic 1-categorical benchmark with per-repeat variation.""" + + _call_count = 0 + + category = bn.StringSweep(["alpha", "beta"]) + value = bn.ResultFloat(units="ms") + + def benchmark(self): + DistBench._call_count += 1 + base = 1.0 if self.category == "alpha" else 2.0 + self.value = base + 0.01 * DistBench._call_count + + +class TwoCatBench(bn.ParametrizedSweep): + category = bn.StringSweep(["alpha", "beta"]) + backend = bn.StringSweep(["cpu", "gpu"]) + value = bn.ResultFloat(units="ms") + + def benchmark(self): + self.value = 1.0 if self.category == "alpha" else 2.0 + + +class NanBench(bn.ParametrizedSweep): + """One category always returns NaN (the missing-value default).""" + + category = bn.StringSweep(["ok", "broken"]) + value = bn.ResultFloat(units="ms") + + def benchmark(self): + self.value = float("nan") if self.category == "broken" else 1.0 + + +def _run_sweep(worker_cls, input_vars, repeats): + return run_dist_sweep(worker_cls, input_vars, repeats, "test_box_whisker") + + +class TestBoxWhiskerResult(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.res = _run_sweep(DistBench, ["category"], repeats=3) + # store the list, not the Parameter itself: param Parameters are + # descriptors, so a class-attribute Parameter would resolve to its + # default value on attribute access. + cls.result_vars = cls.res.bench_cfg.result_vars + cls.ds = cls.res.to_dataset(ReduceType.NONE) + + def test_to_boxplot_ds_returns_boxwhisker_element(self): + overlay = self.res.to_boxplot_ds(self.ds, self.result_vars[0]) + self.assertIsInstance(overlay, hv.Overlay) + self.assertIsInstance(_inner_element(overlay), hv.BoxWhisker) + + def test_kdims_vdims_match_input_and_result_vars(self): + el = _inner_element(self.res.to_boxplot_ds(self.ds, self.result_vars[0])) + self.assertEqual([d.name for d in el.kdims], ["category"]) + self.assertEqual([d.name for d in el.vdims], ["value"]) + + def test_title_and_ylabel_contain_result_var_and_units(self): + el = _inner_element(self.res.to_boxplot_ds(self.ds, self.result_vars[0])) + opts = hv.Store.lookup_options("bokeh", el, "plot").kwargs + self.assertEqual(opts["ylabel"], "value [ms]") + self.assertEqual(opts["title"], "value vs category vs repeat") + + def test_distribution_contains_all_repeat_samples(self): + """With repeats=3 each x position must hold 3 individual samples.""" + el = _inner_element(self.res.to_boxplot_ds(self.ds, self.result_vars[0])) + counts = el.dframe().groupby("category").size().to_dict() + self.assertEqual(counts, {"alpha": 3, "beta": 3}) + + def test_to_plot_returns_panel_row_with_holoviews_pane(self): + plot = BoxWhiskerResult.to_plot(self.res) + self.assertIsInstance(plot, pn.Row) + self.assertGreater(len(plot), 0) + + def test_to_plot_rejected_for_single_repeat(self): + """Distribution plots need repeats>=2; with override=False the filter rejects.""" + res_1rep = _run_sweep(DistBench, ["category"], repeats=1) + plot = BoxWhiskerResult.to_plot(res_1rep, override=False) + self.assertNotIsInstance(plot, pn.Row) + self.assertTrue(plot is None or isinstance(plot, pn.pane.Markdown)) + + def test_two_categorical_inputs_grouped_kdims(self): + """The base class uses every categorical input var as a kdim.""" + res2 = _run_sweep(TwoCatBench, ["category", "backend"], repeats=3) + ds2 = res2.to_dataset(ReduceType.NONE) + el = _inner_element(res2.to_boxplot_ds(ds2, res2.bench_cfg.result_vars[0])) + self.assertEqual([d.name for d in el.kdims], ["category", "backend"]) + # 2 cats x 2 backends x 3 repeats = 12 samples + self.assertEqual(len(el.dframe()), 12) + + def test_nan_results_do_not_crash(self): + res_nan = _run_sweep(NanBench, ["category"], repeats=3) + plot = BoxWhiskerResult.to_plot(res_nan) + self.assertIsInstance(plot, pn.Row) + ds_nan = res_nan.to_dataset(ReduceType.NONE) + el = _inner_element(res_nan.to_boxplot_ds(ds_nan, res_nan.bench_cfg.result_vars[0])) + df = el.dframe() + broken = df[df["category"] == "broken"]["value"] + self.assertEqual(len(broken), 3) + self.assertTrue(all(math.isnan(v) for v in broken)) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_combinations.py b/test/test_combinations.py deleted file mode 100644 index 443ae906e..000000000 --- a/test/test_combinations.py +++ /dev/null @@ -1,183 +0,0 @@ -from __future__ import annotations - -import pytest -import unittest -from hypothesis import given, settings, strategies as st -import bencher as bn -from datetime import datetime - -from strenum import StrEnum -from enum import auto -from param import Parameter -from itertools import combinations - - -class Enum1(StrEnum): - """A generic enum""" - - enum1_val1 = auto() - enum1_val2 = auto() - - -class Enum2(StrEnum): - """Another generic enum""" - - enum2_val1 = auto() - enum2_val2 = auto() - - -class BenchCfgTest(bn.ParametrizedSweep): - """A class for representing all types of input""" - - float1 = bn.FloatSweep(default=0, bounds=[0, 1], doc="generic float 1", samples=2) - float2 = bn.FloatSweep(default=0, bounds=[0, 1], doc="generic float 2", samples=2) - int1 = bn.IntSweep(default=0, bounds=[0, 2], doc="generic int 1") - int2 = bn.IntSweep(default=0, bounds=[0, 2], doc="generic int 2") - bool1 = bn.BoolSweep(doc="generic bool 1") - bool2 = bn.BoolSweep(doc="generic bool 2") - enum1 = bn.EnumSweep(Enum1) - enum2 = bn.EnumSweep(Enum2) - - -class BenchCfgTestOut(bn.ParametrizedSweep): - """A class for representing all types of result""" - - out1 = bn.ResultFloat(doc="generic result variable 1") - out2 = bn.ResultFloat(doc="generic result variable 2") - outvec2 = bn.ResultVec(2, doc="A generic 2D vector") - outvec3 = bn.ResultVec(3, doc="A generic 3D vector") - - -def bench_func(cfg: BenchCfgTest) -> BenchCfgTestOut: - """A generic benchmark function""" - output = BenchCfgTestOut() - output.out1 = cfg.float1 - output.out2 = 2.0 - output.outvec2 = [0, 1] - output.outvec3 = [0, 1, 2] - return output - - -# all possible types of input -input_types = [ - BenchCfgTest.param.float1, - BenchCfgTest.param.float2, - BenchCfgTest.param.int1, - BenchCfgTest.param.int2, - BenchCfgTest.param.bool1, - BenchCfgTest.param.bool2, - BenchCfgTest.param.enum1, - BenchCfgTest.param.enum2, -] - -# all possible types of result -result_var_permutations = [ - [BenchCfgTestOut.param.out1], - [BenchCfgTestOut.param.out1, BenchCfgTestOut.param.out2], - # [BenchCfgTestOut.param.outvec2], - # [BenchCfgTestOut.param.outvec3], -] - - -# the function used to generate all possible combination or permutations of input -generator_func = combinations - -input_var_permutations = [] -all_inputs = [] - -# all possible permutations of the input for a given number of inputs -for num_inputs in range(1, 3): - input_var_permutations.extend([list(c) for c in generator_func(input_types, num_inputs)]) - - -for p in input_var_permutations: - print(",".join([pa.name for pa in p])) - - -@pytest.mark.skip -class TestAllCombinations(unittest.TestCase): - """This class uses hypothesis to test as large a range as possible of input parameter combinations to make sure bencher always returns an error message rather than crashing. After a long running parameter sweep the highest priority is to show as much data as possible even if some of the data processing or visualisations are not possible to calculate. (and result in an exception)""" - - def run_bencher_over_time( - self, - input_vars: list[Parameter], - result_vars: list[bn.ResultFloat], - repeats: int, - ): - """Base function used to run benchers with a set of inputs,results and repeats over time""" - bench = bn.Bench("test_bencher", bench_func, BenchCfgTest) - - for i in range(2): - bench.plot_sweep( - title="test_unique_filenames", - input_vars=input_vars, - result_vars=result_vars, - run_cfg=bn.BenchRunCfg( - repeats=repeats, - over_time=True, - clear_history=i == 0, # clear the history on the first iteration - ), - time_src=datetime( - 1970, 1, i + 1 - ), # repeatable time so outputs are same at the pixel level - ) - - @settings(deadline=10000, max_examples=50) - @given( - input_vars=st.sampled_from(input_var_permutations), - result_vars=st.sampled_from(result_var_permutations), - repeats=st.sampled_from([1, 2]), - ) - def test_all_input_combinations_over_time_hyp( - self, - input_vars: list[Parameter], - result_vars: list[bn.ResultFloat], - repeats: int, - ): - """Use hypothesis to enumerate combinations of inputs to bencher - - Args: - input_vars (list[Parameter]): all possible sets of inputs - result_vars (list[bn.ResultFloat]): all possible sets of results - repeats (int): 1 or 2 repeats (more than 2 repeats hits the same code as 2 repeats) - """ - self.run_bencher_over_time(input_vars, result_vars, repeats) - - def test_falsifying_examples(self): - """This test runs all the falsifying examples that were caught by hypothesis""" - - # TODO this has been been "fixed" by catching the pandas keyerrors for plot_surface_holo(). It needs to be fixed properly by investigating aggregation of bool datatypes. At the moment bool variables can cause agreggation errors. Possibly convert the bool to an enum type?? - self.run_bencher_over_time( - [ - BenchCfgTest.param.float1, - BenchCfgTest.param.enum1, - BenchCfgTest.param.bool1, - ], - [BenchCfgTestOut.param.out1], - 1, - ) - - # Properly fixed - self.run_bencher_over_time( - [ - BenchCfgTest.param.bool1, - BenchCfgTest.param.bool2, - BenchCfgTest.param.enum1, - BenchCfgTest.param.enum2, - ], - [BenchCfgTestOut.param.out1], - 1, - ) - - # TODO, These inputs need to be fixed. - # self.run_bencher_over_time( - # [BenchCfgTest.param.float1], - # [BenchCfgTestOut.param.outvec3], - # 1, - # ) - - # self.run_bencher_over_time( - # [BenchCfgTest.param.float1, BenchCfgTest.param.float2], - # [BenchCfgTestOut.param.outvec2], - # 1, - # ) diff --git a/test/test_composable_container_dataframe.py b/test/test_composable_container_dataframe.py new file mode 100644 index 000000000..ae0447cf5 --- /dev/null +++ b/test/test_composable_container_dataframe.py @@ -0,0 +1,94 @@ +"""Behavioral tests for ComposableContainerDataset composition from pandas DataFrames. + +Complements test_composable_container_dataset.py (which covers dims/sizes per +ComposeType on raw DataArrays) by asserting that the *data* survives composition: +values, variable names, coordinates, and append order all stay intact. +""" + +import numpy as np +import pandas as pd +import xarray as xr + +from bencher.results.composable_container.composable_container_base import ComposeType +from bencher.results.composable_container.composable_container_dataframe import ( + ComposableContainerDataset, +) + + +def _make_df(values) -> pd.DataFrame: + return pd.DataFrame({"metric": values}, index=pd.Index([0, 1, 2], name="step")) + + +def _make_ds(values) -> xr.Dataset: + return _make_df(values).to_xarray() + + +class TestComposableContainerDataframe: + def test_append_preserves_order_and_identity(self): + ds_a, ds_b = _make_ds([1.0, 2.0, 3.0]), _make_ds([4.0, 5.0, 6.0]) + c = ComposableContainerDataset(compose_method=ComposeType.right) + c.append(ds_a) + c.append(ds_b) + assert c.container == [ds_a, ds_b] + assert c.container[0] is ds_a and c.container[1] is ds_b + + def test_single_pandas_dataframe_passthrough(self): + df = _make_df([1.0, 2.0, 3.0]) + c = ComposableContainerDataset(compose_method=ComposeType.down) + c.append(df) + result = c.render() + assert result is df # untouched: no xarray conversion or concat for one item + pd.testing.assert_frame_equal(result, _make_df([1.0, 2.0, 3.0])) + + def test_right_concat_keeps_values_in_append_order(self): + c = ComposableContainerDataset(compose_method=ComposeType.right) + c.append(_make_ds([1.0, 2.0, 3.0])) + c.append(_make_ds([4.0, 5.0, 6.0])) + result = c.render() + assert isinstance(result, xr.Dataset) + assert list(result.data_vars) == ["metric"] + np.testing.assert_allclose(result["metric"].isel(col=0).values, [1.0, 2.0, 3.0]) + np.testing.assert_allclose(result["metric"].isel(col=1).values, [4.0, 5.0, 6.0]) + + def test_down_concat_preserves_coords_and_values(self): + c = ComposableContainerDataset(compose_method=ComposeType.down) + c.append(_make_ds([1.0, 2.0, 3.0])) + c.append(_make_ds([4.0, 5.0, 6.0])) + result = c.render() + assert result.sizes == {"row": 2, "step": 3} + np.testing.assert_array_equal(result.coords["step"].values, [0, 1, 2]) + np.testing.assert_allclose(result["metric"].isel(row=1).values, [4.0, 5.0, 6.0]) + + def test_sequence_concat_preserves_values(self): + c = ComposableContainerDataset(compose_method=ComposeType.sequence) + c.append(_make_ds([1.0, 2.0, 3.0])) + c.append(_make_ds([4.0, 5.0, 6.0])) + result = c.render() + np.testing.assert_allclose( + result["metric"].transpose("sequence", "step").values, + [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], + ) + + def test_overlay_means_elementwise(self): + c = ComposableContainerDataset(compose_method=ComposeType.overlay) + c.append(_make_ds([1.0, 2.0, 3.0])) + c.append(_make_ds([3.0, 4.0, 5.0])) + result = c.render() + assert "overlay" not in result.dims + np.testing.assert_allclose(result["metric"].values, [2.0, 3.0, 4.0]) + + def test_overlay_skips_nan_values(self): + """NaN is the missing-value default; overlay mean must skip it per element.""" + c = ComposableContainerDataset(compose_method=ComposeType.overlay) + c.append(_make_ds([1.0, float("nan"), 3.0])) + c.append(_make_ds([3.0, 4.0, float("nan")])) + result = c.render() + np.testing.assert_allclose(result["metric"].values, [2.0, 4.0, 3.0]) + + def test_var_name_and_value_fields_stored(self): + c = ComposableContainerDataset( + compose_method=ComposeType.right, var_name="size", var_value="10" + ) + assert c.var_name == "size" + assert c.var_value == "10" + assert c.label_formatter(c.var_name, c.var_value) == "size=10" diff --git a/test/test_curve_result.py b/test/test_curve_result.py new file mode 100644 index 000000000..5d1bdac86 --- /dev/null +++ b/test/test_curve_result.py @@ -0,0 +1,129 @@ +"""Tests for bencher/results/holoview_results/curve_result.py (CurveResult).""" + +import math + +import holoviews as hv +import panel as pn +import pytest + +import bencher as bn +from bencher.results.bench_result_base import ReduceType +from bencher.results.holoview_results.curve_result import CurveResult +from test.helpers import run_cfg_with, unwrap_hv + + +class CurveBench(bn.ParametrizedSweep): + """Minimal 1-float sweep for curve plots (repeats provide the spread).""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + def benchmark(self): + self.throughput = self.size * 0.5 + math.sin(self.size) + + +class CurveCatBench(bn.ParametrizedSweep): + """1 float + 1 categorical sweep to exercise the groupby overlay path.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + backend = bn.StringSweep(["redis", "local"], doc="Backend") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + def benchmark(self): + base = {"redis": 1.0, "local": 2.0}[self.backend] + self.throughput = self.size * base + + +class CurveNanBench(bn.ParametrizedSweep): + """Sweep whose worker returns NaN for one input point.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + def benchmark(self): + self.throughput = float("nan") if self.size < 20 else self.size * 0.5 + + +@pytest.fixture(scope="module", name="res_1d") +def fixture_res_1d(): + run_cfg = run_cfg_with(repeats=3) + bench = CurveBench().to_bench(run_cfg) + return bench.plot_sweep( + "curve_1d", input_vars=["size"], result_vars=["throughput"], run_cfg=run_cfg + ) + + +@pytest.fixture(scope="module", name="res_cat") +def fixture_res_cat(): + run_cfg = run_cfg_with(repeats=2) + bench = CurveCatBench().to_bench(run_cfg) + return bench.plot_sweep( + "curve_cat", + input_vars=["size", "backend"], + result_vars=["throughput"], + run_cfg=run_cfg, + ) + + +class TestCurveResult: + def test_to_curve_returns_curve_and_spread(self, res_1d): + """With repeats>1, to_curve yields an Overlay holding a Curve plus a Spread band.""" + plot = res_1d.to_curve() + assert plot is not None + overlay = unwrap_hv(plot) + assert isinstance(overlay, hv.Overlay) + curves = [el for el in overlay if isinstance(el, hv.Curve)] + spreads = [el for el in overlay if isinstance(el, hv.Spread)] + assert len(curves) == 1 + assert len(spreads) == 1 + + def test_curve_dims_and_label(self, res_1d): + """The Curve uses the input var as kdim, the result var as vdim and label.""" + overlay = unwrap_hv(res_1d.to_curve()) + curve = next(el for el in overlay if isinstance(el, hv.Curve)) + assert [d.name for d in curve.kdims] == ["size"] + assert [d.name for d in curve.vdims] == ["throughput"] + assert curve.label == "throughput" + spread = next(el for el in overlay if isinstance(el, hv.Spread)) + assert [d.name for d in spread.vdims] == ["throughput", "throughput_std"] + + def test_to_plot_delegates_to_curve(self, res_1d): + result = CurveResult.to_plot(res_1d) + assert result is not None + assert isinstance(unwrap_hv(result), hv.Overlay) + + def test_to_curve_ds_with_categorical_groupby(self, res_cat): + """One Curve per category, labelled with the categorical value.""" + ds = res_cat.to_dataset(reduce=ReduceType.REDUCE) + rv = res_cat.bench_cfg.result_vars[0] + overlay = res_cat.to_curve_ds(ds, rv) + assert isinstance(overlay, hv.Overlay) + labels = sorted(el.label for el in overlay if isinstance(el, hv.Curve)) + assert labels == ["local", "redis"] + # each category curve keeps the float input on the x-axis + for el in overlay: + if isinstance(el, hv.Curve): + assert [d.name for d in el.kdims] == ["size"] + + def test_to_curve_rejected_without_repeats(self): + """repeats=1 fails the repeats_range(2, None) filter when override=False.""" + run_cfg = run_cfg_with(repeats=1) + bench = CurveBench().to_bench(run_cfg) + res = bench.plot_sweep( + "curve_r1", input_vars=["size"], result_vars=["throughput"], run_cfg=run_cfg + ) + result = res.to_curve(override=False) + assert isinstance(result, pn.pane.Markdown) + + def test_to_curve_nan_input_does_not_crash(self): + """A NaN result for one sweep point still produces a Curve overlay.""" + run_cfg = run_cfg_with(repeats=2) + bench = CurveNanBench().to_bench(run_cfg) + res = bench.plot_sweep( + "curve_nan", input_vars=["size"], result_vars=["throughput"], run_cfg=run_cfg + ) + plot = res.to_curve() + assert plot is not None + overlay = unwrap_hv(plot) + assert isinstance(overlay, hv.Overlay) + assert any(isinstance(el, hv.Curve) for el in overlay) diff --git a/test/test_dataset_result.py b/test/test_dataset_result.py new file mode 100644 index 000000000..4cb1f8aaa --- /dev/null +++ b/test/test_dataset_result.py @@ -0,0 +1,77 @@ +"""Tests for DataSetResult (bencher/results/dataset_result.py).""" + +import unittest + +import numpy as np +import pandas as pd +import panel as pn + +import bencher as bn +from bencher.results.dataset_result import DataSetResult + + +SCALES = [1.0, 2.0] + + +def expected_frame(scale: float) -> pd.DataFrame: + return pd.DataFrame({"y": [scale * 1.0, scale * 2.0, scale * 3.0]}) + + +class DataFrameSweep(bn.ParametrizedSweep): + """1-input sweep whose worker returns a small, scale-dependent DataFrame.""" + + scale = bn.FloatSweep(default=1.0, bounds=[1.0, 2.0], samples=2) + table = bn.ResultDataSet(doc="small dataframe result") + + def benchmark(self): + self.table = bn.ResultDataSet(expected_frame(self.scale)) + + +def run_sweep(): + bench = bn.Bench("test_dataset_result", DataFrameSweep()) + return bench.plot_sweep( + "dataset_sweep", + input_vars=["scale"], + result_vars=["table"], + run_cfg=bn.BenchRunCfg(repeats=1, cache_results=False, cache_samples=False), + auto_plot=False, + ) + + +class TestDataSetResult(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.res = run_sweep() + + def test_to_plot_returns_viewable(self): + viewer = self.res.to(DataSetResult) + self.assertIsNotNone(viewer) + self.assertIsInstance(viewer, pn.viewable.Viewable) + self.assertGreater(len(viewer), 0) + + def test_dataset_list_round_trips_worker_frames(self): + """Every worker-produced DataFrame is stored and recoverable unchanged.""" + self.assertEqual(len(self.res.dataset_list), len(SCALES)) + for ref, scale in zip(self.res.dataset_list, SCALES): + pd.testing.assert_frame_equal(ref.obj, expected_frame(scale)) + + def test_ds_indices_map_to_correct_frames(self): + """The xarray dataset stores indices into dataset_list, keyed by input value.""" + ds = self.res.to_dataset() + for scale in SCALES: + idx = int(ds["table"].sel(scale=scale).values) + frame = self.res.dataset_list[idx].obj + pd.testing.assert_frame_equal(frame, expected_frame(scale)) + + def test_ds_to_container_returns_underlying_frame(self): + """ds_to_container (used by the viewer) unwraps the stored DataFrame.""" + ds = self.res.to_dataset() + rv = self.res.bench_cfg.result_vars[0] + point = ds.sel(scale=SCALES[1]) + frame = self.res.ds_to_container(point, rv, container=None) + pd.testing.assert_frame_equal(frame, expected_frame(SCALES[1])) + np.testing.assert_allclose(frame["y"].to_numpy(), [2.0, 4.0, 6.0]) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_file_server.py b/test/test_file_server.py index f45c90b2d..97e858af4 100644 --- a/test/test_file_server.py +++ b/test/test_file_server.py @@ -1,5 +1,6 @@ """Tests for bencher/file_server.py""" +import socket import threading import time import tempfile @@ -11,6 +12,18 @@ from bencher.file_server import create_server, run_file_server +def wait_for_port(port: int, timeout: float = 5.0, step: float = 0.1) -> None: + """Poll until the server accepts TCP connections, instead of a fixed sleep.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with socket.create_connection(("127.0.0.1", port), timeout=step): + return + except OSError: + time.sleep(step) + raise TimeoutError(f"Server on port {port} did not accept connections within {timeout}s") + + class TestFileServer(unittest.TestCase): def test_create_server(self): with tempfile.TemporaryDirectory() as tmpdir: @@ -20,7 +33,7 @@ def test_create_server(self): server = create_server(tmpdir, port=0) threading.Thread(target=server.serve_forever, daemon=True).start() port = server.server_address[1] - time.sleep(0.3) + wait_for_port(port) try: with urllib.request.urlopen(f"http://127.0.0.1:{port}/test.txt") as resp: @@ -35,7 +48,7 @@ def test_create_server_missing_file(self): server = create_server(tmpdir, port=0) threading.Thread(target=server.serve_forever, daemon=True).start() port = server.server_address[1] - time.sleep(0.3) + wait_for_port(port) try: with self.assertRaises(urllib.error.HTTPError) as ctx: @@ -54,7 +67,7 @@ def test_run_file_server(self): server = run_file_server(directory=tmpdir, port=0) port = server.server_address[1] - time.sleep(0.3) + wait_for_port(port) try: with urllib.request.urlopen(f"http://127.0.0.1:{port}/health.txt") as resp: diff --git a/test/test_histogram_result.py b/test/test_histogram_result.py new file mode 100644 index 000000000..3ebdbfe45 --- /dev/null +++ b/test/test_histogram_result.py @@ -0,0 +1,176 @@ +"""Tests for bencher/results/histogram_result.py""" + +import unittest + +import holoviews as hv +import numpy as np + +import bencher as bn +from bencher.results.histogram_result import HistogramResult + +N_REPEATS = 10 + + +class DeterministicWorker(bn.ParametrizedSweep): + """No-input worker producing values 0..N-1 across repeats (one value per call).""" + + value = bn.ResultFloat(units="m") + _counter = [0] + + def benchmark(self): + self.value = float(self._counter[0]) + self._counter[0] += 1 + + +class NanWorker(bn.ParametrizedSweep): + """No-input worker that returns NaN for exactly one repeat.""" + + value = bn.ResultFloat(units="m") + _counter = [0] + + def benchmark(self): + i = self._counter[0] + self._counter[0] += 1 + self.value = float("nan") if i == 3 else float(i) + + +class FloatInputWorker(bn.ParametrizedSweep): + """Worker with a float input — outside the histogram filter's native signature.""" + + x = bn.FloatSweep(bounds=[0, 1], samples=3) + value = bn.ResultFloat(units="m") + + def benchmark(self): + self.value = self.x * 2.0 + + +def _repeats_run_cfg() -> bn.BenchRunCfg: + return bn.BenchRunCfg(repeats=N_REPEATS, cache_results=False, cache_samples=False) + + +def _collect_histograms(panel_obj) -> list[hv.Histogram]: + """Recursively collect hv.Histogram elements from a panel/holoviews tree.""" + found = [] + if panel_obj is None: + return found + inner = getattr(panel_obj, "object", None) + if hasattr(inner, "traverse"): + found.extend(inner.traverse(lambda x: x, [hv.Histogram])) + for child in getattr(panel_obj, "objects", []): + found.extend(_collect_histograms(child)) + return found + + +class TestHistogramResult(unittest.TestCase): + @classmethod + def setUpClass(cls): + DeterministicWorker._counter[0] = 0 # pylint: disable=protected-access + bench = DeterministicWorker().to_bench(_repeats_run_cfg()) + cls.res = bench.plot_sweep( + "test_hist", + input_vars=[], + result_vars=["value"], + run_cfg=_repeats_run_cfg(), + plot_callbacks=False, + ) + cls.raw_ds = cls.res.to_dataset(reduce=bn.ReduceType.NONE) + + rc_float = bn.BenchRunCfg(repeats=1, cache_results=False, cache_samples=False) + bench_float = FloatInputWorker().to_bench(rc_float) + cls.res_float = bench_float.plot_sweep( + "test_hist_float_input", + input_vars=["x"], + result_vars=["value"], + run_cfg=rc_float, + plot_callbacks=False, + ) + + def _single_histogram(self, plot) -> hv.Histogram: + hists = plot.traverse(lambda x: x, [hv.Histogram]) + self.assertEqual(len(hists), 1) + return hists[0] + + def test_to_histogram_ds_dimension_names(self): + """The histogram x-dimension is the result var name; counts go on y.""" + rv = self.res.bench_cfg.result_vars[0] + plot = self.res.to_histogram_ds(self.raw_ds, rv) + hist = self._single_histogram(plot) + self.assertEqual(hist.kdims[0].name, "value") + self.assertEqual(hist.vdims[0].name, "value_count") + + def test_binning_counts_and_edges(self): + """All N samples are binned and the bin edges span the data range [0, N-1].""" + rv = self.res.bench_cfg.result_vars[0] + plot = self.res.to_histogram_ds(self.raw_ds, rv) + hist = self._single_histogram(plot) + frequencies = hist.dimension_values(1) + self.assertEqual(frequencies.sum(), N_REPEATS) + self.assertEqual(hist.edges[0], 0.0) + self.assertEqual(hist.edges[-1], float(N_REPEATS - 1)) + + def test_binning_respects_bins_kwarg(self): + """A bins= kwarg is forwarded to hvplot and controls the bin count.""" + rv = self.res.bench_cfg.result_vars[0] + plot = self.res.to_histogram_ds(self.raw_ds, rv, bins=5) + hist = self._single_histogram(plot) + frequencies = hist.dimension_values(1) + self.assertEqual(len(frequencies), 5) + self.assertEqual(frequencies.sum(), N_REPEATS) + + def test_axis_labels_and_title(self): + """Title contains the result var name; x axis shows units, y axis is 'count'.""" + rv = self.res.bench_cfg.result_vars[0] + plot = self.res.to_histogram_ds(self.raw_ds, rv) + opts = plot.opts.get().kwargs + self.assertEqual(opts["title"], "value vs Count") + self.assertEqual(opts["xlabel"], "value [m]") + self.assertEqual(opts["ylabel"], "count") + self.assertEqual(opts["xrotation"], 30) + + def test_to_plot_repeats_only_sweep(self): + """to_plot natively matches a 0-input repeats sweep (no override needed).""" + pane = self.res.to(HistogramResult, override=False) + hists = _collect_histograms(pane) + self.assertEqual(len(hists), 1) + self.assertEqual(hists[0].kdims[0].name, "value") + self.assertEqual(hists[0].dimension_values(1).sum(), N_REPEATS) + + def test_to_plot_rejects_float_input_sweep(self): + """The filter (0 floats, 0 inputs) rejects a float-input sweep without override.""" + pane = self.res_float.to(HistogramResult, override=False) + self.assertEqual(_collect_histograms(pane), []) + + def test_to_plot_override_float_input_sweep(self): + """With override the histogram renders, binning one sample per input point.""" + pane = self.res_float.to(HistogramResult) + hists = _collect_histograms(pane) + self.assertEqual(len(hists), 1) + self.assertEqual(hists[0].kdims[0].name, "value") + self.assertEqual(hists[0].dimension_values(1).sum(), 3) + + def test_nan_values_are_dropped_not_fatal(self): + """A NaN sample must not crash rendering; it is excluded from the bin counts.""" + NanWorker._counter[0] = 0 # pylint: disable=protected-access + bench = NanWorker().to_bench(_repeats_run_cfg()) + res = bench.plot_sweep( + "test_hist_nan", + input_vars=[], + result_vars=["value"], + run_cfg=_repeats_run_cfg(), + plot_callbacks=False, + ) + raw_ds = res.to_dataset(reduce=bn.ReduceType.NONE) + rv = res.bench_cfg.result_vars[0] + + plot = res.to_histogram_ds(raw_ds, rv) + hist = self._single_histogram(plot) + frequencies = hist.dimension_values(1) + self.assertTrue(np.isfinite(frequencies).all()) + self.assertEqual(frequencies.sum(), N_REPEATS - 1) + + pane = res.to(HistogramResult, override=False) + self.assertEqual(len(_collect_histograms(pane)), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_optimize_result.py b/test/test_optimize_result.py new file mode 100644 index 000000000..022138da4 --- /dev/null +++ b/test/test_optimize_result.py @@ -0,0 +1,189 @@ +"""Tests for bencher/results/optimize_result.py — the OptimizeResult dataclass surface. + +Complements test/test_optimize.py (which exercises Bench.optimize end-to-end) by testing +the OptimizeResult accessors directly against deterministic, hand-built optuna studies, +plus minimal sweep-driven structural checks not covered there. +""" + +from __future__ import annotations + +import math + +import optuna +import pytest +from optuna.distributions import FloatDistribution + +import bencher as bn +from bencher.results.optimize_result import OptimizeResult + +# --------------------------------------------------------------------------- +# Deterministic study builders +# --------------------------------------------------------------------------- + + +def _make_single_objective_study() -> optuna.Study: + study = optuna.create_study(direction="minimize", study_name="single_study") + for x, val in [(1.0, 1.0), (0.5, 0.25), (2.0, 4.0)]: + study.add_trial( + optuna.trial.create_trial( + params={"x": x}, + distributions={"x": FloatDistribution(-5, 5)}, + value=val, + ) + ) + return study + + +def _make_multi_objective_study() -> optuna.Study: + study = optuna.create_study(directions=["minimize", "maximize"], study_name="multi_study") + # (obj1=minimize, obj2=maximize): (3.0, 0.0) is dominated by (1.0, 1.0); + # the other three trials form the Pareto front. + for x, values in [ + (1.0, (1.0, 1.0)), + (2.0, (2.0, 3.0)), + (0.5, (0.5, 0.5)), + (3.0, (3.0, 0.0)), + ]: + study.add_trial( + optuna.trial.create_trial( + params={"x": x}, + distributions={"x": FloatDistribution(0, 5)}, + values=list(values), + ) + ) + return study + + +# --------------------------------------------------------------------------- +# Direct dataclass-surface tests +# --------------------------------------------------------------------------- + + +class TestSingleObjectiveSurface: + def test_best_params_and_value(self): + res = OptimizeResult(study=_make_single_objective_study()) + assert res.best_value == 0.25 + assert res.best_params == {"x": 0.5} + + def test_field_defaults(self): + res = OptimizeResult(study=_make_single_objective_study()) + assert res.n_warm_start_trials == 0 + assert res.n_new_trials == 0 + assert res.target_names == [] + assert res.bench_cfg is None + + def test_best_trials_returns_single_best(self): + res = OptimizeResult(study=_make_single_objective_study()) + trials = res.best_trials + assert len(trials) == 1 + assert trials[0].params == {"x": 0.5} + assert trials[0].values == [0.25] + + def test_summary_contents(self): + res = OptimizeResult( + study=_make_single_objective_study(), + n_warm_start_trials=2, + n_new_trials=1, + target_names=["loss"], + ) + text = res.summary() + assert "Study: single_study" in text + assert "warm-start trials: 2" in text + assert "new trials: 1" in text + assert "total trials: 3" in text + assert "best value: 0.25" in text + assert "'x': 0.5" in text + + +class TestMultiObjectiveSurface: + def test_pareto_front_membership(self): + res = OptimizeResult(study=_make_multi_objective_study()) + pareto_xs = sorted(t.params["x"] for t in res.best_trials) + assert pareto_xs == [0.5, 1.0, 2.0] + + def test_single_objective_accessors_raise(self): + res = OptimizeResult(study=_make_multi_objective_study()) + with pytest.raises(RuntimeError, match="single-objective"): + _ = res.best_value + with pytest.raises(RuntimeError, match="single-objective"): + _ = res.best_params + + def test_summary_reports_pareto_size(self): + res = OptimizeResult(study=_make_multi_objective_study()) + text = res.summary() + assert "Pareto-front size: 3" in text + assert "best params" not in text + + +# --------------------------------------------------------------------------- +# Sweep-driven structural checks (minimal; behavior of optimize() itself is +# already covered by test_optimize.py) +# --------------------------------------------------------------------------- + + +class SingleObjectiveSphere(bn.ParametrizedSweep): + x = bn.FloatSweep(default=0, bounds=[-5, 5], samples=5) + loss = bn.ResultFloat("ul", bn.OptDir.minimize) + + def benchmark(self): + self.loss = float(self.x**2) + + +class TwoObjectives(bn.ParametrizedSweep): + x = bn.FloatSweep(default=0, bounds=[0, 5], samples=5) + obj1 = bn.ResultFloat("ul", bn.OptDir.minimize) + obj2 = bn.ResultFloat("ul", bn.OptDir.maximize) + + def benchmark(self): + self.obj1 = float(self.x**2) + self.obj2 = float(-((self.x - 3) ** 2)) + + +class NanSphere(bn.ParametrizedSweep): + """Sphere whose worker returns NaN for exactly one evaluation.""" + + x = bn.FloatSweep(default=0, bounds=[-5, 5], samples=5) + loss = bn.ResultFloat("ul", bn.OptDir.minimize) + _counter = [0] + + def benchmark(self): + i = self._counter[0] + self._counter[0] += 1 + self.loss = float("nan") if i == 2 else float(self.x**2) + + +def _run_cfg() -> bn.BenchRunCfg: + return bn.BenchRunCfg(repeats=1, cache_results=False, cache_samples=False) + + +class TestSweepStructure: + def test_single_objective_sweep_structure(self): + bench = bn.Bench("opt_res_single", SingleObjectiveSphere(), run_cfg=_run_cfg()) + res = bench.optimize(n_trials=5, plot=False) + assert isinstance(res, OptimizeResult) + assert isinstance(res.study, optuna.Study) + assert res.bench_cfg is not None + assert len(res.study.directions) == 1 + assert len(res.study.trials) == res.n_warm_start_trials + res.n_new_trials + assert set(res.best_params) == {"x"} + + def test_multi_objective_sweep_structure(self): + bench = bn.Bench("opt_res_multi", TwoObjectives(), run_cfg=_run_cfg()) + res = bench.optimize(n_trials=5, plot=False) + assert isinstance(res, OptimizeResult) + assert res.target_names == ["obj1", "obj2"] + assert len(res.study.directions) == 2 + assert len(res.best_trials) >= 1 + for trial in res.best_trials: + assert len(trial.values) == 2 + assert set(trial.params) == {"x"} + + def test_nan_worker_does_not_crash(self): + """A NaN objective fails that trial but the study and summary still work.""" + NanSphere._counter[0] = 0 # pylint: disable=protected-access + bench = bn.Bench("opt_res_nan", NanSphere(), run_cfg=_run_cfg()) + res = bench.optimize(n_trials=6, plot=False) + states = [t.state for t in res.study.trials] + assert optuna.trial.TrialState.FAIL in states + assert math.isfinite(res.best_value) + assert "best value" in res.summary() diff --git a/test/test_scatter_jitter_result.py b/test/test_scatter_jitter_result.py new file mode 100644 index 000000000..8081ad188 --- /dev/null +++ b/test/test_scatter_jitter_result.py @@ -0,0 +1,134 @@ +"""Tests for bencher/results/holoview_results/distribution_result/scatter_jitter_result.py""" + +import math +import unittest + +import holoviews as hv +import panel as pn + +import bencher as bn +from bencher.results.bench_result_base import ReduceType +from bencher.results.holoview_results.distribution_result.scatter_jitter_result import ( + ScatterJitterResult, +) +from test.helpers import inner_element as _inner_element, run_dist_sweep + + +class JitterBench(bn.ParametrizedSweep): + """Deterministic 1-categorical benchmark with per-repeat variation.""" + + _call_count = 0 + + category = bn.StringSweep(["alpha", "beta"]) + value = bn.ResultFloat(units="ms") + + def benchmark(self): + JitterBench._call_count += 1 + base = 1.0 if self.category == "alpha" else 2.0 + self.value = base + 0.01 * JitterBench._call_count + + +class TwoCatBench(bn.ParametrizedSweep): + category = bn.StringSweep(["alpha", "beta"]) + backend = bn.StringSweep(["cpu", "gpu"]) + value = bn.ResultFloat(units="ms") + + def benchmark(self): + self.value = 1.0 if self.category == "alpha" else 2.0 + + +class NanBench(bn.ParametrizedSweep): + """One category always returns NaN (the missing-value default).""" + + category = bn.StringSweep(["ok", "broken"]) + value = bn.ResultFloat(units="ms") + + def benchmark(self): + self.value = float("nan") if self.category == "broken" else 1.0 + + +def _run_sweep(worker_cls, input_vars, repeats): + return run_dist_sweep(worker_cls, input_vars, repeats, "test_scatter_jitter") + + +class TestScatterJitterResult(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.res = _run_sweep(JitterBench, ["category"], repeats=3) + # store the list, not the Parameter itself: param Parameters are + # descriptors, so a class-attribute Parameter would resolve to its + # default value on attribute access. + cls.result_vars = cls.res.bench_cfg.result_vars + cls.ds = cls.res.to_dataset(ReduceType.NONE) + + def test_to_scatter_jitter_ds_returns_scatter_element(self): + overlay = self.res.to_scatter_jitter_ds(self.ds, self.result_vars[0]) + self.assertIsInstance(overlay, hv.Overlay) + self.assertIsInstance(_inner_element(overlay), hv.Scatter) + + def test_kdims_vdims_match_input_and_result_vars(self): + el = _inner_element(self.res.to_scatter_jitter_ds(self.ds, self.result_vars[0])) + self.assertEqual([d.name for d in el.kdims], ["category"]) + self.assertEqual([d.name for d in el.vdims], ["value"]) + + def test_title_and_ylabel_contain_result_var_and_units(self): + el = _inner_element(self.res.to_scatter_jitter_ds(self.ds, self.result_vars[0])) + opts = hv.Store.lookup_options("bokeh", el, "plot").kwargs + self.assertEqual(opts["ylabel"], "value [ms]") + self.assertEqual(opts["title"], "value vs category vs repeat") + + def test_default_jitter_opt_applied(self): + el = _inner_element(self.res.to_scatter_jitter_ds(self.ds, self.result_vars[0])) + opts = hv.Store.lookup_options("bokeh", el, "plot").kwargs + self.assertEqual(opts["jitter"], 0.1) + + def test_custom_jitter_opt_propagated(self): + el = _inner_element( + self.res.to_scatter_jitter_ds(self.ds, self.result_vars[0], jitter=0.25) + ) + opts = hv.Store.lookup_options("bokeh", el, "plot").kwargs + self.assertEqual(opts["jitter"], 0.25) + + def test_scatter_shows_every_individual_sample(self): + """Scatter jitter plots raw points: repeats x categories rows, values intact.""" + el = _inner_element(self.res.to_scatter_jitter_ds(self.ds, self.result_vars[0])) + df = el.dframe() + counts = df.groupby("category").size().to_dict() + self.assertEqual(counts, {"alpha": 3, "beta": 3}) + # all alpha samples stay near 1, all beta samples near 2 (no aggregation) + self.assertTrue((df[df["category"] == "alpha"]["value"] < 1.5).all()) + self.assertTrue((df[df["category"] == "beta"]["value"] > 1.5).all()) + + def test_to_plot_returns_panel_row_with_holoviews_pane(self): + plot = ScatterJitterResult.to_plot(self.res) + self.assertIsInstance(plot, pn.Row) + self.assertGreater(len(plot), 0) + + def test_to_plot_rejected_for_single_repeat(self): + """Scatter jitter needs repeats>=2; with override=False the filter rejects.""" + res_1rep = _run_sweep(JitterBench, ["category"], repeats=1) + plot = ScatterJitterResult.to_plot(res_1rep, override=False) + self.assertNotIsInstance(plot, pn.Row) + self.assertTrue(plot is None or isinstance(plot, pn.pane.Markdown)) + + def test_to_plot_rejected_for_two_categorical_inputs(self): + """Unlike box/violin, scatter jitter accepts at most 1 categorical input.""" + res_2cat = _run_sweep(TwoCatBench, ["category", "backend"], repeats=3) + plot = ScatterJitterResult.to_plot(res_2cat, override=False) + self.assertNotIsInstance(plot, pn.Row) + self.assertTrue(plot is None or isinstance(plot, pn.pane.Markdown)) + + def test_nan_results_do_not_crash(self): + res_nan = _run_sweep(NanBench, ["category"], repeats=3) + plot = ScatterJitterResult.to_plot(res_nan) + self.assertIsInstance(plot, pn.Row) + ds_nan = res_nan.to_dataset(ReduceType.NONE) + el = _inner_element(res_nan.to_scatter_jitter_ds(ds_nan, res_nan.bench_cfg.result_vars[0])) + df = el.dframe() + broken = df[df["category"] == "broken"]["value"] + self.assertEqual(len(broken), 3) + self.assertTrue(all(math.isnan(v) for v in broken)) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_scatter_result.py b/test/test_scatter_result.py new file mode 100644 index 000000000..1ae1c8839 --- /dev/null +++ b/test/test_scatter_result.py @@ -0,0 +1,142 @@ +"""Tests for bencher/results/holoview_results/scatter_result.py""" + +import math +import unittest +import warnings + +import holoviews as hv +import panel as pn + +import bencher as bn +from bencher.results.holoview_results.scatter_result import ScatterResult +from test.helpers import run_named_sweep + + +class Cat1DBench(bn.ParametrizedSweep): + """Minimal 1-categorical sweep accepted by the scatter filter (0 floats, 1 cat).""" + + method = bn.StringSweep(["alpha", "beta", "gamma"]) + score = bn.ResultFloat(units="m") + + def benchmark(self): + self.score = len(self.method) * 1.5 + + +class Cat1DNanBench(bn.ParametrizedSweep): + """Sweep where the worker returns NaN for one point (missing-value default).""" + + method = bn.StringSweep(["alpha", "beta", "gamma"]) + score = bn.ResultFloat(units="m") + + def benchmark(self): + self.score = float("nan") if self.method == "beta" else len(self.method) * 1.5 + + +class TwoCatBench(bn.ParametrizedSweep): + """Two categorical inputs so the scatter groups by the second cat.""" + + method = bn.StringSweep(["alpha", "beta"]) + backend = bn.StringSweep(["cpu", "gpu"]) + score = bn.ResultFloat(units="m") + + def benchmark(self): + self.score = len(self.method) + len(self.backend) * 0.5 + + +class Float1DBench(bn.ParametrizedSweep): + """Float-input sweep that the scatter filter (float_range 0..0) must reject.""" + + x = bn.FloatSweep(bounds=(0, 1)) + score = bn.ResultFloat(units="m") + + def benchmark(self): + self.score = self.x * 2 + + +with warnings.catch_warnings(): + # ResultVar is deprecated, but to_scatter's result_types filter only accepts it, + # so the full public path is exercised with a ResultVar result. + warnings.simplefilter("ignore", DeprecationWarning) + + class LegacyScatterBench(bn.ParametrizedSweep): + method = bn.StringSweep(["alpha", "beta"]) + score = bn.ResultVar(units="m") + + def benchmark(self): + self.score = len(self.method) * 1.0 + + +def _run_sweep(bench_class, name, input_vars, repeats=1): + return run_named_sweep(bench_class, name, input_vars, ["score"], repeats) + + +class TestScatterResult(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.res_cat = _run_sweep(Cat1DBench, "scatter_cat", ["method"]) + cls.res_nan = _run_sweep(Cat1DNanBench, "scatter_nan", ["method"]) + cls.res_2cat = _run_sweep(TwoCatBench, "scatter_2cat", ["method", "backend"]) + cls.res_float = _run_sweep(Float1DBench, "scatter_float", ["x"]) + cls.res_legacy = _run_sweep(LegacyScatterBench, "scatter_legacy", ["method"]) + + def test_to_scatter_ds_returns_scatter_pane(self): + ds = self.res_cat.to_dataset() + rv = self.res_cat.bench_cfg.result_vars[0] + result = self.res_cat._to_scatter_ds(ds, rv) # pylint: disable=protected-access + self.assertIsInstance(result, pn.pane.HoloViews) + self.assertIsInstance(result.object, hv.Scatter) + + def test_to_scatter_ds_dims_and_title(self): + """Input var on kdims, result var on vdims, title from to_plot_title.""" + ds = self.res_cat.to_dataset() + rv = self.res_cat.bench_cfg.result_vars[0] + result = self.res_cat._to_scatter_ds(ds, rv) # pylint: disable=protected-access + element = result.object + self.assertEqual(element.kdims[0].name, "method") + self.assertEqual(element.vdims[0].name, "score") + self.assertEqual(element.opts.get().kwargs["title"], "score vs method") + + def test_to_scatter_full_path_with_result_var(self): + """The public to_scatter path produces a Row of Scatter panes for ResultVar.""" + result = self.res_legacy.to_scatter() + self.assertIsInstance(result, pn.Row) + self.assertGreater(len(result), 0) + self.assertIsInstance(result[0], pn.pane.HoloViews) + self.assertIsInstance(result[0].object, hv.Scatter) + + def test_to_plot_delegates_to_scatter(self): + result = ScatterResult.to_plot(self.res_legacy) + self.assertIsInstance(result, pn.Row) + self.assertIsInstance(result[0].object, hv.Scatter) + + def test_to_scatter_result_float_returns_none(self): + """Documents current behavior: result_types=(ResultVar,) excludes plain + ResultFloat results, so the public to_scatter path yields no panes.""" + self.assertIsNone(self.res_cat.to_scatter()) + + def test_to_scatter_ds_nan_does_not_crash(self): + ds = self.res_nan.to_dataset() + rv = self.res_nan.bench_cfg.result_vars[0] + self.assertTrue(any(math.isnan(v) for v in ds["score"].values.ravel())) + result = self.res_nan._to_scatter_ds(ds, rv) # pylint: disable=protected-access + self.assertIsInstance(result, pn.pane.HoloViews) + self.assertIsInstance(result.object, hv.Scatter) + + def test_to_scatter_ds_groups_by_extra_cats(self): + """With >1 categorical input, the scatter groups by the remaining cats.""" + ds = self.res_2cat.to_dataset() + rv = self.res_2cat.bench_cfg.result_vars[0] + result = self.res_2cat._to_scatter_ds(ds, rv) # pylint: disable=protected-access + self.assertIsInstance(result, pn.pane.HoloViews) + self.assertIsInstance(result.object, hv.NdOverlay) + self.assertEqual(result.object.kdims[0].name, "backend") + + def test_to_scatter_rejects_float_sweep(self): + """A float input sweep fails the float_range=(0,0) filter when override=False. + + The filter returns None (or a Markdown debug panel), never a scatter pane. + """ + result = self.res_float.to_scatter(override=False) + self.assertNotIsInstance(result, (pn.Row, pn.pane.HoloViews)) + if result is not None: + self.assertIsInstance(result, pn.pane.Markdown) diff --git a/test/test_sweep_vars.py b/test/test_sweep_vars.py index 6860f69b7..05ffc5880 100644 --- a/test/test_sweep_vars.py +++ b/test/test_sweep_vars.py @@ -1,6 +1,5 @@ import unittest from hypothesis import given, strategies as st # pylint: disable=unused-import -import pytest from bencher.variables.inputs import IntSweep, EnumSweep, StringSweep, BoolSweep, FloatSweep from bencher.variables.parametrised_sweep import ParametrizedSweep from bencher.variables.results import ResultFloat @@ -199,13 +198,6 @@ def test_int_sweep_samples(self, samples): self.assertEqual(int_sweep.default, 0) self.assertEqual(len(int_sweep.values()), samples) - @pytest.mark.skip - @given(st.integers(min_value=1, max_value=10)) - def test_int_sweep_samples_all(self, samples): - int_sweep = IntSweep(bounds=[0, 10], samples=samples) - self.assertEqual(int_sweep.default, 0) - self.assertEqual(len(int_sweep.values()), samples) - def test_sweep_bounds_property(self): fs = FloatSweep(bounds=(0, 1)) self.assertEqual(fs.sweep_bounds, (0, 1)) diff --git a/test/test_table_result.py b/test/test_table_result.py new file mode 100644 index 000000000..826018d1f --- /dev/null +++ b/test/test_table_result.py @@ -0,0 +1,91 @@ +"""Tests for bencher/results/holoview_results/table_result.py (TableResult).""" + +import holoviews as hv +import numpy as np +import pytest + +import bencher as bn +from bencher.results.holoview_results.table_result import TableResult +from test.helpers import run_cfg_with + + +class TableBench(bn.ParametrizedSweep): + """Minimal 1-float sweep for table output.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + def benchmark(self): + self.throughput = self.size * 0.5 + + +class TableNanBench(bn.ParametrizedSweep): + """Sweep whose worker returns NaN for one input point.""" + + size = bn.FloatSweep(default=50, bounds=[10, 100], samples=3, doc="Size") + throughput = bn.ResultFloat(units="MB/s", doc="Throughput") + + def benchmark(self): + self.throughput = float("nan") if self.size < 20 else self.size * 0.5 + + +@pytest.fixture(scope="module", name="res_repeats") +def fixture_res_repeats(): + run_cfg = run_cfg_with(repeats=3) + bench = TableBench().to_bench(run_cfg) + return bench.plot_sweep( + "table_repeats", input_vars=["size"], result_vars=["throughput"], run_cfg=run_cfg + ) + + +@pytest.fixture(scope="module", name="res_single") +def fixture_res_single(): + run_cfg = run_cfg_with(repeats=1) + bench = TableBench().to_bench(run_cfg) + return bench.plot_sweep( + "table_single", input_vars=["size"], result_vars=["throughput"], run_cfg=run_cfg + ) + + +class TestTableResult: + def test_to_plot_returns_table(self, res_repeats): + table = TableResult.to_plot(res_repeats) + assert isinstance(table, hv.Table) + + def test_table_dims(self, res_repeats): + """Input vars (plus repeat) are kdims; the result var is a vdim.""" + table = TableResult.to_plot(res_repeats) + assert [d.name for d in table.kdims] == ["size", "repeat"] + assert [d.name for d in table.vdims] == ["throughput"] + + def test_table_row_count(self, res_repeats): + """One row per sweep sample: 3 sizes x 3 repeats.""" + table = TableResult.to_plot(res_repeats) + assert len(table) == 9 + + def test_table_values_match_worker_output(self, res_repeats): + """Table rows hold the values computed by benchmark().""" + table = TableResult.to_plot(res_repeats) + sizes = table.dimension_values("size") + throughputs = table.dimension_values("throughput") + np.testing.assert_allclose(throughputs, sizes * 0.5) + + def test_table_squeezes_single_repeat(self, res_single): + """With repeats=1 the repeat dim is squeezed out of the kdims.""" + table = TableResult.to_plot(res_single) + assert isinstance(table, hv.Table) + assert [d.name for d in table.kdims] == ["size"] + assert len(table) == 3 + + def test_table_nan_input_does_not_crash(self): + """A NaN result value still appears as a row in the table.""" + run_cfg = run_cfg_with(repeats=1) + bench = TableNanBench().to_bench(run_cfg) + res = bench.plot_sweep( + "table_nan", input_vars=["size"], result_vars=["throughput"], run_cfg=run_cfg + ) + table = TableResult.to_plot(res) + assert isinstance(table, hv.Table) + assert len(table) == 3 + values = table.dimension_values("throughput") + assert np.isnan(values).sum() == 1 diff --git a/test/test_usability.py b/test/test_usability.py index fd110eb6c..63659a9f1 100644 --- a/test/test_usability.py +++ b/test/test_usability.py @@ -12,7 +12,7 @@ class BenchFloat(bn.ParametrizedSweep): """Test class using the benchmark() pattern.""" theta = bn.FloatSweep(default=0, bounds=[0, math.pi], samples=5) - out_sin = bn.ResultVar(units="v") + out_sin = bn.ResultFloat(units="v") def benchmark(self): self.out_sin = math.sin(self.theta) diff --git a/test/test_violin_result.py b/test/test_violin_result.py new file mode 100644 index 000000000..020d233ac --- /dev/null +++ b/test/test_violin_result.py @@ -0,0 +1,100 @@ +"""Tests for bencher/results/holoview_results/distribution_result/violin_result.py""" + +import math +import unittest + +import holoviews as hv +import panel as pn + +import bencher as bn +from bencher.results.bench_result_base import ReduceType +from bencher.results.holoview_results.distribution_result.violin_result import ViolinResult +from test.helpers import inner_element as _inner_element, run_dist_sweep + + +class ViolinBench(bn.ParametrizedSweep): + """Deterministic 1-categorical benchmark with per-repeat variation.""" + + _call_count = 0 + + category = bn.StringSweep(["alpha", "beta"]) + value = bn.ResultFloat(units="ms") + + def benchmark(self): + ViolinBench._call_count += 1 + base = 1.0 if self.category == "alpha" else 2.0 + self.value = base + 0.01 * ViolinBench._call_count + + +class NanBench(bn.ParametrizedSweep): + """One category always returns NaN (the missing-value default).""" + + category = bn.StringSweep(["ok", "broken"]) + value = bn.ResultFloat(units="ms") + + def benchmark(self): + self.value = float("nan") if self.category == "broken" else 1.0 + + +def _run_sweep(worker_cls, input_vars, repeats): + return run_dist_sweep(worker_cls, input_vars, repeats, "test_violin") + + +class TestViolinResult(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.res = _run_sweep(ViolinBench, ["category"], repeats=3) + # store the list, not the Parameter itself: param Parameters are + # descriptors, so a class-attribute Parameter would resolve to its + # default value on attribute access. + cls.result_vars = cls.res.bench_cfg.result_vars + cls.ds = cls.res.to_dataset(ReduceType.NONE) + + def test_to_violin_ds_returns_violin_element(self): + overlay = self.res.to_violin_ds(self.ds, self.result_vars[0]) + self.assertIsInstance(overlay, hv.Overlay) + self.assertIsInstance(_inner_element(overlay), hv.Violin) + + def test_kdims_vdims_match_input_and_result_vars(self): + el = _inner_element(self.res.to_violin_ds(self.ds, self.result_vars[0])) + self.assertEqual([d.name for d in el.kdims], ["category"]) + self.assertEqual([d.name for d in el.vdims], ["value"]) + + def test_title_and_ylabel_contain_result_var_and_units(self): + el = _inner_element(self.res.to_violin_ds(self.ds, self.result_vars[0])) + opts = hv.Store.lookup_options("bokeh", el, "plot").kwargs + self.assertEqual(opts["ylabel"], "value [ms]") + self.assertEqual(opts["title"], "value vs category vs repeat") + + def test_distribution_contains_all_repeat_samples(self): + """With repeats=3 each x position must hold 3 individual samples.""" + el = _inner_element(self.res.to_violin_ds(self.ds, self.result_vars[0])) + counts = el.dframe().groupby("category").size().to_dict() + self.assertEqual(counts, {"alpha": 3, "beta": 3}) + + def test_to_plot_returns_panel_row_with_holoviews_pane(self): + plot = ViolinResult.to_plot(self.res) + self.assertIsInstance(plot, pn.Row) + self.assertGreater(len(plot), 0) + + def test_to_plot_rejected_for_single_repeat(self): + """Distribution plots need repeats>=2; with override=False the filter rejects.""" + res_1rep = _run_sweep(ViolinBench, ["category"], repeats=1) + plot = ViolinResult.to_plot(res_1rep, override=False) + self.assertNotIsInstance(plot, pn.Row) + self.assertTrue(plot is None or isinstance(plot, pn.pane.Markdown)) + + def test_nan_results_do_not_crash(self): + res_nan = _run_sweep(NanBench, ["category"], repeats=3) + plot = ViolinResult.to_plot(res_nan) + self.assertIsInstance(plot, pn.Row) + ds_nan = res_nan.to_dataset(ReduceType.NONE) + el = _inner_element(res_nan.to_violin_ds(ds_nan, res_nan.bench_cfg.result_vars[0])) + df = el.dframe() + broken = df[df["category"] == "broken"]["value"] + self.assertEqual(len(broken), 3) + self.assertTrue(all(math.isnan(v) for v in broken)) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_volume_result.py b/test/test_volume_result.py new file mode 100644 index 000000000..6ccd75938 --- /dev/null +++ b/test/test_volume_result.py @@ -0,0 +1,140 @@ +"""Behavioral tests for bencher/results/volume_result.py (VolumeResult).""" + +# pylint: disable=redefined-outer-name # pytest fixtures are injected by name + +import math + +import pytest +import panel as pn + +import bencher as bn + + +class VolBench(bn.ParametrizedSweep): + """3-float-input benchmark with a deterministic, axis-separable result.""" + + x = bn.FloatSweep(default=0, bounds=(0.0, 1.0), samples=2, units="m") + y = bn.FloatSweep(default=0, bounds=(0.0, 1.0), samples=2, units="s") + z = bn.FloatSweep(default=0, bounds=(0.0, 1.0), samples=2, units="kg") + + value = bn.ResultFloat(units="ul") + + def benchmark(self): + self.value = self.x + 10 * self.y + 100 * self.z + + +class VolBenchNan(VolBench): + """Same sweep but the origin point returns NaN (missing-value default).""" + + def benchmark(self): + if self.x == 0.0 and self.y == 0.0 and self.z == 0.0: + self.value = float("nan") + else: + self.value = self.x + 10 * self.y + 100 * self.z + + +def _run_cfg() -> bn.BenchRunCfg: + return bn.BenchRunCfg(cache_results=False, cache_samples=False, auto_plot=False, repeats=1) + + +def _sweep(bench_class, input_vars): + bench = bn.Bench("test_volume_result", bench_class(), run_cfg=_run_cfg()) + return bench.plot_sweep("volume sweep", input_vars=input_vars, result_vars=["value"]) + + +def _find_plotly_pane(obj) -> pn.pane.Plotly: + """Extract the single Plotly pane from the (possibly nested) panel layout.""" + if isinstance(obj, pn.pane.Plotly): + return obj + assert isinstance(obj, pn.layout.ListLike), f"unexpected container type {type(obj)}" + panes = [c for c in obj if isinstance(c, pn.pane.Plotly)] + assert len(panes) == 1, f"expected exactly one Plotly pane, got {len(panes)}" + return panes[0] + + +@pytest.fixture(scope="module") +def vol_result(): + return _sweep(VolBench, ["x", "y", "z"]) + + +@pytest.fixture(scope="module") +def vol_pane(vol_result): + return _find_plotly_pane(vol_result.to_volume()) + + +class TestVolumeConstruction: + def test_to_volume_returns_plotly_backed_pane(self, vol_result): + out = vol_result.to_volume() + pane = _find_plotly_pane(out) + assert isinstance(pane, pn.pane.Plotly) + assert pane.name == "volume_plotly" + + def test_figure_contains_single_volume_trace(self, vol_pane): + fig = vol_pane.object + assert set(fig.keys()) == {"data", "layout"} + assert len(fig["data"]) == 1 + assert type(fig["data"][0]).__name__ == "Volume" + + def test_axis_titles_embed_names_and_units(self, vol_pane): + scene = vol_pane.object["layout"].scene + assert scene.xaxis.title.text == "x [m]" + assert scene.yaxis.title.text == "y [s]" + assert scene.zaxis.title.text == "z [kg]" + + def test_layout_title_names_result_and_inputs(self, vol_pane): + assert vol_pane.object["layout"].title.text == "value vs (x vs y vs z)" + + def test_trace_values_match_worker_output(self, vol_pane): + trace = vol_pane.object["data"][0] + xs, ys, zs, vals = list(trace.x), list(trace.y), list(trace.z), list(trace.value) + assert len(vals) == 8 # 2 x 2 x 2 grid + for x, y, z, v in zip(xs, ys, zs, vals): + assert v == pytest.approx(x + 10 * y + 100 * z) + assert trace.isomin == pytest.approx(0.0) + assert trace.isomax == pytest.approx(111.0) + + def test_to_plot_delegates_to_volume(self, vol_result): + pane = _find_plotly_pane(vol_result.to_plot()) + assert isinstance(pane, pn.pane.Plotly) + assert pane.object["layout"].title.text == "value vs (x vs y vs z)" + + +class TestVolumeRejection: + def test_one_float_sweep_rejected_without_override(self): + """An unsupported shape must not produce a volume plot. + + The documented fallback is the filter-match diagnostics: a Markdown pane + when print_debug is enabled (the default), otherwise None — never a Plotly pane. + """ + res = _sweep(VolBench, ["x"]) + out = res.to_volume(override=False) + assert not isinstance(out, (pn.pane.Plotly, pn.layout.ListLike)) + assert isinstance(out, pn.pane.Markdown) + assert "matches: False" in out.object + assert "float" in out.object # the float-count requirement is what failed + + def test_one_float_sweep_rejection_is_silent_without_debug(self): + res = _sweep(VolBench, ["x"]) + res.plt_cnt_cfg.print_debug = False + assert res.to_volume(override=False) is None + + def test_over_time_returns_none(self, vol_result): + prev = vol_result.bench_cfg.over_time + vol_result.bench_cfg.over_time = True + try: + assert vol_result.to_volume(override=True) is None + finally: + vol_result.bench_cfg.over_time = prev + + +class TestVolumeNanRobustness: + def test_nan_point_still_builds_volume(self): + res = _sweep(VolBenchNan, ["x", "y", "z"]) + pane = _find_plotly_pane(res.to_volume()) + trace = pane.object["data"][0] + vals = list(trace.value) + assert len(vals) == 8 + assert sum(1 for v in vals if math.isnan(v)) == 1 + # iso bounds are computed from the finite values only + assert trace.isomin == pytest.approx(1.0) + assert trace.isomax == pytest.approx(111.0) diff --git a/test/test_worker_job.py b/test/test_worker_job.py new file mode 100644 index 000000000..e1eeb6470 --- /dev/null +++ b/test/test_worker_job.py @@ -0,0 +1,132 @@ +"""Tests for bencher/worker_job.py — WorkerJob input preparation and hashing. + +WorkerJob pickling is covered in test/test_multiprocessing_executor.py and the +JobCache machinery in test/test_job.py, so they are not duplicated here. +""" + +from bencher.worker_job import WorkerJob + + +def make_job( + function_input_vars=(1, 2), + dims_name=("x", "y"), + constant_inputs=None, + bench_cfg_sample_hash="sample_hash", + tag="", +) -> WorkerJob: + """Construct a WorkerJob with setup_hashes() already applied.""" + job = WorkerJob( + function_input_vars=list(function_input_vars), + index_tuple=(0, 0), + dims_name=list(dims_name), + constant_inputs=constant_inputs, + bench_cfg_sample_hash=bench_cfg_sample_hash, + tag=tag, + ) + job.setup_hashes() + return job + + +# ── construction defaults ─────────────────────────────────────────────────── + + +class TestWorkerJobDefaults: + def test_defaults_before_setup(self): + job = WorkerJob( + function_input_vars=[1], + index_tuple=(0,), + dims_name=["x"], + constant_inputs=None, + bench_cfg_sample_hash="h", + tag="", + ) + assert job.function_input is None + assert job.canonical_input is None + assert job.fn_inputs_sorted is None + assert job.function_input_signature_pure is None + assert job.found_in_cache is False + assert job.msgs == [] + + def test_msgs_lists_are_independent(self): + job_a = make_job() + job_b = make_job() + job_a.msgs.append("only on a") + assert job_b.msgs == [] + + +# ── setup_hashes: function input construction ─────────────────────────────── + + +class TestFunctionInputConstruction: + def test_function_input_zips_dims_with_values(self): + job = make_job(function_input_vars=(1, 2), dims_name=("x", "y")) + assert job.function_input == {"x": 1, "y": 2} + + def test_canonical_input_sorted_by_dim_name(self): + # dims given in non-alphabetical order: canonical form sorts by key + job = make_job(function_input_vars=(2, 1), dims_name=("y", "x")) + assert job.function_input == {"y": 2, "x": 1} + assert job.canonical_input == (1, 2) + + def test_constant_inputs_merged_into_function_input(self): + job = make_job(constant_inputs={"c": 5}) + assert job.function_input == {"x": 1, "y": 2, "c": 5} + + def test_constant_inputs_excluded_from_canonical_input(self): + # canonical_input is computed before constants are merged + job = make_job(constant_inputs={"c": 5}) + assert job.canonical_input == (1, 2) + + def test_constant_inputs_override_dim_values(self): + job = make_job(function_input_vars=(1, 2), dims_name=("x", "y"), constant_inputs={"y": 9}) + assert job.function_input == {"x": 1, "y": 9} + + def test_fn_inputs_sorted_is_sorted_key_value_pairs(self): + job = make_job(function_input_vars=(2, 1), dims_name=("y", "x"), constant_inputs={"a": 3}) + assert job.fn_inputs_sorted == [("a", 3), ("x", 1), ("y", 2)] + + +# ── setup_hashes: hash behavior ───────────────────────────────────────────── + + +class TestFunctionInputSignature: + def test_same_inputs_same_hash(self): + assert make_job().function_input_signature_pure == make_job().function_input_signature_pure + + def test_signature_is_sha1_hex_string(self): + sig = make_job().function_input_signature_pure + assert isinstance(sig, str) + assert len(sig) == 40 + int(sig, 16) # valid hexadecimal + + def test_different_values_different_hash(self): + job_a = make_job(function_input_vars=(1, 2)) + job_b = make_job(function_input_vars=(1, 3)) + assert job_a.function_input_signature_pure != job_b.function_input_signature_pure + + def test_different_dim_names_different_hash(self): + job_a = make_job(dims_name=("x", "y")) + job_b = make_job(dims_name=("x", "z")) + assert job_a.function_input_signature_pure != job_b.function_input_signature_pure + + def test_different_tag_different_hash(self): + job_a = make_job(tag="tag_a") + job_b = make_job(tag="tag_b") + assert job_a.function_input_signature_pure != job_b.function_input_signature_pure + + def test_constant_inputs_affect_hash(self): + job_a = make_job(constant_inputs={"c": 5}) + job_b = make_job(constant_inputs={"c": 6}) + assert job_a.function_input_signature_pure != job_b.function_input_signature_pure + + def test_dim_order_does_not_affect_hash(self): + # the signature is built from sorted inputs, so dim ordering is irrelevant + job_a = make_job(function_input_vars=(1, 2), dims_name=("x", "y")) + job_b = make_job(function_input_vars=(2, 1), dims_name=("y", "x")) + assert job_a.function_input_signature_pure == job_b.function_input_signature_pure + + def test_bench_cfg_sample_hash_does_not_affect_pure_signature(self): + # the "pure" signature covers only the function inputs and tag + job_a = make_job(bench_cfg_sample_hash="hash_a") + job_b = make_job(bench_cfg_sample_hash="hash_b") + assert job_a.function_input_signature_pure == job_b.function_input_signature_pure