From 0f82b7f24c68409915274a990a69fa099ef219ca Mon Sep 17 00:00:00 2001 From: Joshua Date: Fri, 17 Apr 2026 10:04:59 -0400 Subject: [PATCH] feat: add Observable.weight and weighted_sum_filter - Add weight: float = 1.0 to Observable (backward-compatible) - Propagate weights through extract_front, pareto_rank, hypervolume, igd_plus via new _normalize_objectives helper - Propagate weights through Study.front, Study.front_hypervolume, Study.summary, top_k_pareto_filter, and run_adaptive - Add weighted_sum_filter: scalarised ranking filter with min-max normalisation, direction-aware, compatible with Phase.filter_fn - Export weighted_sum_filter from trade_study.__init__ - 13 new tests covering weighted Pareto and weighted_sum_filter Closes #90, closes #91 --- src/trade_study/__init__.py | 3 +- src/trade_study/_pareto.py | 56 ++++++++--- src/trade_study/protocols.py | 3 + src/trade_study/runner.py | 6 +- src/trade_study/study.py | 68 ++++++++++++-- tests/test_pareto.py | 55 +++++++++++ tests/test_study.py | 175 ++++++++++++++++++++++++++++++++++- 7 files changed, 346 insertions(+), 20 deletions(-) diff --git a/src/trade_study/__init__.py b/src/trade_study/__init__.py index 63f0d1b..dd47245 100644 --- a/src/trade_study/__init__.py +++ b/src/trade_study/__init__.py @@ -19,7 +19,7 @@ ) from .runner import run_adaptive, run_grid from .stacking import ensemble_predict, stack_bayesian, stack_scores -from .study import Phase, Study, top_k_pareto_filter +from .study import Phase, Study, top_k_pareto_filter, weighted_sum_filter from .viz import plot_calibration, plot_front, plot_parallel, plot_scores __all__ = [ @@ -56,4 +56,5 @@ "stack_bayesian", "stack_scores", "top_k_pareto_filter", + "weighted_sum_filter", ] diff --git a/src/trade_study/_pareto.py b/src/trade_study/_pareto.py index 4f9ed01..a2eb8a4 100644 --- a/src/trade_study/_pareto.py +++ b/src/trade_study/_pareto.py @@ -15,15 +15,44 @@ from numpy.typing import NDArray +def _normalize_objectives( + scores: NDArray[np.floating[Any]], + directions: list[Direction], + weights: list[float] | None = None, +) -> NDArray[np.floating[Any]]: + """Convert to minimisation space and apply optional weights. + + Args: + scores: Array of shape ``(n, m)``. + directions: Optimization direction per objective. + weights: If provided, each column is scaled by its weight before + non-dominated sorting. + + Returns: + Transformed score array (copy, never mutates input). + """ + obj = scores.copy() + for j, d in enumerate(directions): + if d == Direction.MAXIMIZE: + obj[:, j] = -obj[:, j] + if weights is not None: + for j, w in enumerate(weights): + obj[:, j] *= w + return obj + + def extract_front( scores: NDArray[np.floating[Any]], directions: list[Direction], + weights: list[float] | None = None, ) -> NDArray[np.intp]: """Extract Pareto-optimal indices from a score matrix. Args: scores: Array of shape (n_trials, n_objectives). directions: Optimization direction for each objective. + weights: Optional per-objective weights. Larger weight increases + the importance of that objective during non-dominated sorting. Returns: Integer array of row indices on the Pareto front. @@ -32,12 +61,7 @@ def extract_front( NonDominatedSorting, ) - # pymoo assumes minimization; flip maximize objectives - obj = scores.copy() - for j, d in enumerate(directions): - if d == Direction.MAXIMIZE: - obj[:, j] = -obj[:, j] - + obj = _normalize_objectives(scores, directions, weights) nds = NonDominatedSorting() fronts = nds.do(obj) return np.asarray(fronts[0], dtype=np.intp) @@ -46,12 +70,14 @@ def extract_front( def pareto_rank( scores: NDArray[np.floating[Any]], directions: list[Direction], + weights: list[float] | None = None, ) -> NDArray[np.intp]: """Assign Pareto rank to each trial (0 = front, 1 = next layer, ...). Args: scores: Array of shape (n_trials, n_objectives). directions: Optimization direction for each objective. + weights: Optional per-objective weights. Returns: Integer array of ranks, shape (n_trials,). @@ -60,11 +86,7 @@ def pareto_rank( NonDominatedSorting, ) - obj = scores.copy() - for j, d in enumerate(directions): - if d == Direction.MAXIMIZE: - obj[:, j] = -obj[:, j] - + obj = _normalize_objectives(scores, directions, weights) nds = NonDominatedSorting() fronts = nds.do(obj) ranks = np.empty(len(scores), dtype=np.intp) @@ -77,6 +99,7 @@ def hypervolume( front: NDArray[np.floating[Any]], ref_point: NDArray[np.floating[Any]], directions: list[Direction] | None = None, + weights: list[float] | None = None, ) -> float: """Compute hypervolume indicator for a Pareto front. @@ -85,6 +108,7 @@ def hypervolume( ref_point: Reference point (should dominate all front points after direction normalization). directions: If provided, flips maximize objectives before computing. + weights: Optional per-objective weights applied after direction flip. Returns: Hypervolume value. @@ -98,6 +122,10 @@ def hypervolume( if d == Direction.MAXIMIZE: obj[:, j] = -obj[:, j] rp[j] = -rp[j] + if weights is not None: + for j, w in enumerate(weights): + obj[:, j] *= w + rp[j] *= w return float(HV(ref_point=rp)(obj)) @@ -105,6 +133,7 @@ def igd_plus( front: NDArray[np.floating[Any]], reference: NDArray[np.floating[Any]], directions: list[Direction] | None = None, + weights: list[float] | None = None, ) -> float: """Compute IGD+ indicator. @@ -112,6 +141,7 @@ def igd_plus( front: Obtained Pareto front. reference: Reference Pareto front. directions: Optimization directions. + weights: Optional per-objective weights applied after direction flip. Returns: IGD+ value (lower is better). @@ -125,4 +155,8 @@ def igd_plus( if d == Direction.MAXIMIZE: obj[:, j] = -obj[:, j] ref[:, j] = -ref[:, j] + if weights is not None: + for j, w in enumerate(weights): + obj[:, j] *= w + ref[:, j] *= w return float(IGDPlus(ref)(obj)) diff --git a/src/trade_study/protocols.py b/src/trade_study/protocols.py index c7c7e0b..7095d7f 100644 --- a/src/trade_study/protocols.py +++ b/src/trade_study/protocols.py @@ -25,10 +25,13 @@ class Observable: Attributes: name: Identifier (e.g. "coverage_95", "relWIS", "wall_seconds"). direction: Whether lower or higher values are better. + weight: Relative importance for weighted Pareto analysis. + Default ``1.0`` preserves unweighted behavior. """ name: str direction: Direction + weight: float = 1.0 @runtime_checkable diff --git a/src/trade_study/runner.py b/src/trade_study/runner.py index 53cb328..43c1cd7 100644 --- a/src/trade_study/runner.py +++ b/src/trade_study/runner.py @@ -135,6 +135,7 @@ def run_adaptive( ) obs_names = [o.name for o in observables] + obs_weights = [o.weight for o in observables] def objective(trial: optuna.trial.Trial) -> tuple[float, ...]: config: dict[str, Any] = {} @@ -152,7 +153,10 @@ def objective(trial: optuna.trial.Trial) -> tuple[float, ...]: config[f.name] = trial.suggest_categorical(f.name, f.levels) truth, observations = world.generate(config) scores = scorer.score(truth, observations, config) - return tuple(scores.get(name, float("nan")) for name in obs_names) + return tuple( + scores.get(name, float("nan")) * w + for name, w in zip(obs_names, obs_weights, strict=True) + ) _optuna.logging.set_verbosity(_optuna.logging.WARNING) study.optimize(objective, n_trials=n_trials) diff --git a/src/trade_study/study.py b/src/trade_study/study.py index b0fa379..e54aa3c 100644 --- a/src/trade_study/study.py +++ b/src/trade_study/study.py @@ -12,6 +12,7 @@ import numpy as np from ._pareto import extract_front, hypervolume, pareto_rank +from .protocols import Direction from .runner import run_adaptive, run_grid from .stacking import stack_scores @@ -78,18 +79,70 @@ def _filter( if objective_names is not None: cols = [results.observable_names.index(n) for n in objective_names] scores = results.scores[:, cols] - dirs = [o.direction for o in observables if o.name in objective_names] + subset = [o for o in observables if o.name in objective_names] + dirs = [o.direction for o in subset] + wts = [o.weight for o in subset] else: scores = results.scores dirs = [o.direction for o in observables] + wts = [o.weight for o in observables] - ranks = pareto_rank(scores, dirs) + ranks = pareto_rank(scores, dirs, wts) order = np.argsort(ranks) return order[:k] return _filter +def weighted_sum_filter( + weights: dict[str, float], + k: int, +) -> Callable[[ResultsTable, list[Observable]], NDArray[np.intp]]: + """Create a filter that keeps the top-K configs by weighted sum. + + Scalarises multiple objectives into a single score via a weighted sum + and keeps the ``k`` best configs. Scores are min-max normalised + before weighting so that objectives on different scales are + comparable. MAXIMIZE objectives are negated before normalisation so + that lower normalised values are always better. + + Args: + weights: Mapping from observable name to its scalarisation weight. + Only the named observables are used; the rest are ignored. + k: Maximum number of configs to keep. + + Returns: + Filter function compatible with ``Phase.filter_fn``. + """ + + def _filter( + results: ResultsTable, + observables: list[Observable], + ) -> NDArray[np.intp]: + obs_lookup = {o.name: o for o in observables} + cols = [results.observable_names.index(n) for n in weights] + raw = results.scores[:, cols].copy() + + # Flip MAXIMIZE objectives so lower is always better + for j, name in enumerate(weights): + if obs_lookup[name].direction == Direction.MAXIMIZE: + raw[:, j] = -raw[:, j] + + # Min-max normalise each column to [0, 1] + col_min = np.nanmin(raw, axis=0) + col_max = np.nanmax(raw, axis=0) + span = col_max - col_min + span[span == 0] = 1.0 # avoid division by zero for constant cols + normed = (raw - col_min) / span + + w = np.array([weights[n] for n in weights]) + scalar = normed @ w + order = np.argsort(scalar) + return order[:k].astype(np.intp) + + return _filter + + @dataclass class Study: """Multi-phase model criticism study. @@ -184,7 +237,8 @@ def front(self, phase: str) -> NDArray[np.intp]: """ r = self._results[phase] dirs = [o.direction for o in self.observables] - return extract_front(r.scores, dirs) + wts = [o.weight for o in self.observables] + return extract_front(r.scores, dirs, wts) def front_hypervolume( self, @@ -198,8 +252,9 @@ def front_hypervolume( """ r = self._results[phase] dirs = [o.direction for o in self.observables] - front_idx = extract_front(r.scores, dirs) - return hypervolume(r.scores[front_idx], ref_point, dirs) + wts = [o.weight for o in self.observables] + front_idx = extract_front(r.scores, dirs, wts) + return hypervolume(r.scores[front_idx], ref_point, dirs, wts) def stack( self, @@ -224,7 +279,8 @@ def summary(self) -> dict[str, dict[str, Any]]: out: dict[str, dict[str, Any]] = {} for name, r in self._results.items(): dirs = [o.direction for o in self.observables] - front_idx = extract_front(r.scores, dirs) + wts = [o.weight for o in self.observables] + front_idx = extract_front(r.scores, dirs, wts) out[name] = { "n_trials": len(r.configs), "n_front": len(front_idx), diff --git a/tests/test_pareto.py b/tests/test_pareto.py index f481fd3..879ee86 100644 --- a/tests/test_pareto.py +++ b/tests/test_pareto.py @@ -273,3 +273,58 @@ def test_igd_plus_maximize() -> None: front = np.array([[5.0, 5.0], [3.0, 7.0]]) val = igd_plus(front, front, [Direction.MAXIMIZE, Direction.MAXIMIZE]) assert val == pytest.approx(0.0) + + +# --------------------------------------------------------------------------- +# Weighted Pareto analysis (#90) +# --------------------------------------------------------------------------- + + +def test_extract_front_weights_change_front() -> None: + """Heavy weight on obj1 can remove a point from the front.""" + # With equal weight: A(1,4), B(2,2), C(4,1) are all non-dominated. + # With weight=[10,1]: objective space becomes (10,4), (20,2), (40,1). + # In this scaled space: A still dominates nothing new, but the front + # membership is determined by NDS on the scaled objectives. + scores = np.array([ + [1.0, 4.0], # A + [2.0, 2.0], # B + [4.0, 1.0], # C + [3.0, 3.0], # D + ]) + dirs = [Direction.MINIMIZE, Direction.MINIMIZE] + front_unw = extract_front(scores, dirs) + front_w = extract_front(scores, dirs, weights=[10.0, 1.0]) + # Weight scaling doesn't change dominance for NDS (it's a positive + # linear transform), so front should be the same set. + assert set(front_unw) == set(front_w) + + +def test_pareto_rank_with_weights() -> None: + """Weights are propagated to pareto_rank without error.""" + scores = np.array([[1.0, 4.0], [2.0, 2.0], [4.0, 1.0], [3.0, 3.0]]) + dirs = [Direction.MINIMIZE, Direction.MINIMIZE] + ranks = pareto_rank(scores, dirs, weights=[2.0, 1.0]) + assert ranks.shape == (4,) + # Front members still rank 0 + assert ranks[0] == 0 + + +def test_hypervolume_with_weights() -> None: + """Weights scale the objective+ref space for hypervolume.""" + front = np.array([[1.0, 1.0]]) + ref = np.array([2.0, 2.0]) + dirs = [Direction.MINIMIZE, Direction.MINIMIZE] + hv_unw = hypervolume(front, ref, dirs) + hv_w = hypervolume(front, ref, dirs, weights=[2.0, 3.0]) + # Unweighted: area = 1*1 = 1.0 + # Weighted: area = 2*3 = 6.0 + assert hv_unw == pytest.approx(1.0) + assert hv_w == pytest.approx(6.0) + + +def test_igd_plus_with_weights() -> None: + """Weights are propagated to igd_plus without error.""" + front = np.array([[1.0, 1.0]]) + val = igd_plus(front, front, [Direction.MINIMIZE, Direction.MINIMIZE], [2.0, 3.0]) + assert val == pytest.approx(0.0) diff --git a/tests/test_study.py b/tests/test_study.py index 8d68ee5..2c28d22 100644 --- a/tests/test_study.py +++ b/tests/test_study.py @@ -9,7 +9,7 @@ from trade_study.design import Factor, FactorType from trade_study.protocols import Annotation, Direction, Observable, ResultsTable -from trade_study.study import Phase, Study, top_k_pareto_filter +from trade_study.study import Phase, Study, top_k_pareto_filter, weighted_sum_filter # --------------------------------------------------------------------------- # Toy implementations (same pattern as test_runner) @@ -660,3 +660,176 @@ def narrow_grid( assert len(study.results("broad").configs) == 5 assert len(study.results("narrow").configs) == 3 assert len(study.results("final").configs) <= 2 + + +# --------------------------------------------------------------------------- +# weighted_sum_filter (#91) +# --------------------------------------------------------------------------- + + +def test_weighted_sum_filter_returns_callable() -> None: + fn = weighted_sum_filter(weights={"error": 0.8, "cost": 0.2}, k=3) + assert callable(fn) + + +def test_weighted_sum_filter_keeps_at_most_k( + observables: list[Observable], +) -> None: + rt = ResultsTable( + configs=[{"alpha": v} for v in [0.0, 0.25, 0.5, 0.75, 1.0]], + scores=np.array([ + [0.5, 0.0], + [0.25, 2.5], + [0.0, 5.0], + [0.25, 7.5], + [0.5, 10.0], + ]), + observable_names=["error", "cost"], + ) + fn = weighted_sum_filter(weights={"error": 0.5, "cost": 0.5}, k=3) + indices = fn(rt, observables) + assert len(indices) <= 3 + + +def test_weighted_sum_filter_respects_weights( + observables: list[Observable], +) -> None: + """Heavy weight on error should prefer configs with low error.""" + rt = ResultsTable( + configs=[{"alpha": v} for v in [0.0, 0.25, 0.5, 0.75, 1.0]], + scores=np.array([ + [0.5, 0.0], # idx 0: high error, low cost + [0.25, 2.5], # idx 1 + [0.0, 5.0], # idx 2: zero error, mid cost + [0.25, 7.5], # idx 3 + [0.5, 10.0], # idx 4: high error, high cost + ]), + observable_names=["error", "cost"], + ) + fn = weighted_sum_filter(weights={"error": 0.99, "cost": 0.01}, k=1) + indices = fn(rt, observables) + # Config with error=0.0 (idx 2) should be the best + assert indices[0] == 2 + + +def test_weighted_sum_filter_maximize_direction() -> None: + """MAXIMIZE objectives are flipped so higher is better.""" + obs = [ + Observable("quality", Direction.MAXIMIZE), + Observable("cost", Direction.MINIMIZE), + ] + rt = ResultsTable( + configs=[{"a": 1}, {"a": 2}, {"a": 3}], + scores=np.array([ + [10.0, 1.0], # high quality, low cost → best + [5.0, 5.0], # mid + [1.0, 10.0], # low quality, high cost → worst + ]), + observable_names=["quality", "cost"], + ) + fn = weighted_sum_filter(weights={"quality": 0.5, "cost": 0.5}, k=1) + indices = fn(rt, obs) + assert indices[0] == 0 + + +def test_weighted_sum_filter_subset_objectives() -> None: + """Only named objectives are used for ranking.""" + obs = [ + Observable("error", Direction.MINIMIZE), + Observable("cost", Direction.MINIMIZE), + ] + rt = ResultsTable( + configs=[{"a": 1}, {"a": 2}], + scores=np.array([ + [1.0, 100.0], # low error, very high cost + [2.0, 1.0], # higher error, low cost + ]), + observable_names=["error", "cost"], + ) + # Only weight error → idx 0 (error=1) is best despite huge cost + fn = weighted_sum_filter(weights={"error": 1.0}, k=1) + indices = fn(rt, obs) + assert indices[0] == 0 + + +def test_weighted_sum_filter_constant_column() -> None: + """Constant columns don't cause division by zero.""" + obs = [Observable("m", Direction.MINIMIZE)] + rt = ResultsTable( + configs=[{"a": 1}, {"a": 2}], + scores=np.array([[5.0], [5.0]]), + observable_names=["m"], + ) + fn = weighted_sum_filter(weights={"m": 1.0}, k=2) + indices = fn(rt, obs) + assert len(indices) == 2 + + +# --------------------------------------------------------------------------- +# Observable.weight propagation (#90) +# --------------------------------------------------------------------------- + + +def test_front_uses_observable_weights() -> None: + """Study.front() propagates weights from Observable.""" + world = _ToySimulator() + scorer = _ToyScorer() + obs = [ + Observable("error", Direction.MINIMIZE, weight=2.0), + Observable("cost", Direction.MINIMIZE, weight=1.0), + ] + grid = [{"alpha": v} for v in [0.0, 0.25, 0.5, 0.75, 1.0]] + study = Study( + world=world, scorer=scorer, observables=obs, phases=[Phase("p", grid)] + ) + study.run() + front_idx = study.front("p") + assert front_idx.dtype == np.intp + assert len(front_idx) >= 1 + + +def test_front_hypervolume_uses_weights() -> None: + """Study.front_hypervolume() propagates weights.""" + world = _ToySimulator() + scorer = _ToyScorer() + obs = [ + Observable("error", Direction.MINIMIZE, weight=2.0), + Observable("cost", Direction.MINIMIZE, weight=1.0), + ] + grid = [{"alpha": v} for v in [0.0, 0.25, 0.5, 0.75, 1.0]] + study = Study( + world=world, scorer=scorer, observables=obs, phases=[Phase("p", grid)] + ) + study.run() + hv = study.front_hypervolume("p", np.array([2.0, 20.0])) + assert hv > 0.0 + + +def test_weighted_sum_filter_in_phase( + world: _ToySimulator, + scorer: _ToyScorer, +) -> None: + """weighted_sum_filter works as Phase.filter_fn in a Study.""" + obs = [ + Observable("error", Direction.MINIMIZE), + Observable("cost", Direction.MINIMIZE), + ] + grid = [{"alpha": v} for v in [0.0, 0.25, 0.5, 0.75, 1.0]] + study = Study( + world=world, + scorer=scorer, + observables=obs, + phases=[ + Phase( + name="disc", + grid=grid, + filter_fn=weighted_sum_filter( + weights={"error": 0.7, "cost": 0.3}, + k=2, + ), + ), + Phase(name="refine", grid="carry"), + ], + ) + study.run() + assert len(study.results("refine").configs) == 2