From 62cfb26ea1132bd2f580246892d445c0c8a2cc06 Mon Sep 17 00:00:00 2001 From: ThorstenHellert Date: Sat, 21 Mar 2026 13:59:06 -0700 Subject: [PATCH] Add dynamic aperture, LOCO, and multipoles app modules with tests New application modules needed for ALSU storage ring commissioning simulations. Multipoles module includes bug fixes (accumulation, truncated random), new parameters (main_order, main_component, zero_orders), numpy array input support, and read_multipole_table for parsing MATLAB-format multipole files. --- pySC/apps/dynamic_aperture.py | 70 +++++ pySC/apps/loco.py | 306 +++++++++++++++++++++ pySC/apps/multipoles.py | 215 +++++++++++++++ tests/apps/test_dynamic_aperture.py | 130 +++++++++ tests/apps/test_loco.py | 172 ++++++++++++ tests/apps/test_multipoles.py | 405 ++++++++++++++++++++++++++++ 6 files changed, 1298 insertions(+) create mode 100644 pySC/apps/dynamic_aperture.py create mode 100644 pySC/apps/loco.py create mode 100644 pySC/apps/multipoles.py create mode 100644 tests/apps/test_dynamic_aperture.py create mode 100644 tests/apps/test_loco.py create mode 100644 tests/apps/test_multipoles.py diff --git a/pySC/apps/dynamic_aperture.py b/pySC/apps/dynamic_aperture.py new file mode 100644 index 00000000..adb5c967 --- /dev/null +++ b/pySC/apps/dynamic_aperture.py @@ -0,0 +1,70 @@ +import logging + +import numpy as np + +logger = logging.getLogger(__name__) + + +def dynamic_aperture( + SC, n_angles=16, n_turns=1000, accuracy=1e-6, + initial_radius=1e-3, max_radius=0.05, + center_on_orbit=True, use_design=False, +) -> dict: + """Calculate the dynamic aperture by binary search at each angle.""" + angles = np.linspace(0, 2 * np.pi, n_angles, endpoint=False) + + if center_on_orbit: + orbit = SC.lattice.get_orbit(use_design=use_design) + x0, y0 = orbit[0, 0], orbit[1, 0] + else: + x0, y0 = 0.0, 0.0 + + radii = np.zeros(n_angles) + + for i, theta in enumerate(angles): + lower = 0.0 + upper = initial_radius + + # Scale upper bound until particle is lost + while upper <= max_radius: + if _is_lost(SC, upper, theta, x0, y0, n_turns): + break + lower = upper + upper *= 2.0 + upper = min(upper, max_radius) + + # If even the smallest radius is lost, DA is zero at this angle + if lower == 0.0 and _is_lost(SC, lower, theta, x0, y0, n_turns): + radii[i] = 0.0 + continue + + # Binary search + while upper - lower > accuracy: + mid = (lower + upper) / 2.0 + if _is_lost(SC, mid, theta, x0, y0, n_turns): + upper = mid + else: + lower = mid + + radii[i] = lower + logger.debug("Angle %.4f rad: stable radius %.6e m", theta, lower) + + x = radii * np.cos(angles) + y = radii * np.sin(angles) + area = _shoelace_area(x, y) + + return {"radii": radii, "angles": angles, "area": area, "x": x, "y": y} + + +def _is_lost(SC, radius, theta, x0, y0, n_turns): + """Return True if a particle at the given radius and angle is lost.""" + bunch = np.zeros((1, 6)) + bunch[0, 0] = radius * np.cos(theta) + x0 + bunch[0, 2] = radius * np.sin(theta) + y0 + result = SC.lattice.track(bunch, n_turns=n_turns) + return np.any(np.isnan(result)) + + +def _shoelace_area(x, y): + """Compute polygon area using the shoelace formula.""" + return 0.5 * np.abs(np.dot(x, np.roll(y, -1)) - np.dot(y, np.roll(x, -1))) diff --git a/pySC/apps/loco.py b/pySC/apps/loco.py new file mode 100644 index 00000000..52afbcb0 --- /dev/null +++ b/pySC/apps/loco.py @@ -0,0 +1,306 @@ +""" +LOCO (Linear Optics from Closed Orbits) implementation for pySC. + +Provides functions to compute orbit response matrix Jacobians with respect +to quadrupole strengths, fit optics corrections via least-squares, and +apply the resulting corrections back to the lattice. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +import numpy as np +from scipy.optimize import least_squares + +from ..tuning.response_measurements import ( + measure_OrbitResponseMatrix, + measure_RFFrequencyOrbitResponse, +) + +if TYPE_CHECKING: + from ..core.simulated_commissioning import SimulatedCommissioning + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _objective(masked_params, orm_residuals, masked_jacobian, weights): + """Weighted residual for the LOCO least-squares problem.""" + residuals = orm_residuals - np.einsum("ijk,i->jk", masked_jacobian, masked_params) + w = np.sqrt(np.atleast_1d(weights)) + return (residuals * w[:, np.newaxis]).ravel() + + +def _get_parameters_mask(fit_flags, lengths): + """Build a boolean mask selecting the parameter groups to fit.""" + mask = np.zeros(sum(lengths), dtype=bool) + current_index = 0 + for flag, length in zip(fit_flags, lengths): + if flag: + mask[current_index : current_index + length] = True + current_index += length + return mask + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def calculate_jacobian( + SC: "SimulatedCommissioning", + orm_model, + quad_control_names, + dk=1e-5, + include_correctors=True, + include_bpms=True, + include_dispersion=False, + use_design=True, +): + """Compute the LOCO Jacobian of the ORM with respect to parameters. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object. + orm_model : np.ndarray + Model orbit response matrix, shape ``(n_bpms*2, n_correctors)``. + If *include_dispersion* is True the last column should be the + dispersion response. + quad_control_names : list[str] + Names of the quadrupole control knobs to include. + dk : float + Finite-difference step size for quadrupole strengths. + include_correctors : bool + Whether to append corrector-gain Jacobian entries. + include_bpms : bool + Whether to append BPM-gain Jacobian entries. + include_dispersion : bool + Whether to append the RF-frequency orbit response (dispersion) + as an extra column in each ORM measurement. + use_design : bool + If True, compute model ORMs from the design lattice. + + Returns + ------- + jacobian : np.ndarray + Shape ``(n_params, n_bpms*2, n_correctors [+1])``. + """ + n_rows, n_cols = orm_model.shape + jacobian_rows = [] + + # --- Quadrupole derivatives --- + for name in quad_control_names: + current = SC.magnet_settings.get(name) + SC.magnet_settings.set(name, current + dk) + + orm_bumped = measure_OrbitResponseMatrix(SC, use_design=use_design) + + if include_dispersion: + disp = measure_RFFrequencyOrbitResponse(SC, use_design=use_design) + disp_col = disp.reshape(-1, 1) + orm_bumped = np.hstack([orm_bumped, disp_col]) + + SC.magnet_settings.set(name, current) + + jacobian_rows.append((orm_bumped - orm_model) / dk) + + # --- Corrector-gain entries --- + if include_correctors: + for j in range(n_cols): + entry = np.zeros_like(orm_model) + entry[:, j] = orm_model[:, j] + jacobian_rows.append(entry) + + # --- BPM-gain entries --- + if include_bpms: + for i in range(n_rows): + entry = np.zeros_like(orm_model) + entry[i, :] = orm_model[i, :] + jacobian_rows.append(entry) + + jacobian = np.array(jacobian_rows) + logger.info( + "LOCO Jacobian computed: %d parameters, ORM shape (%d, %d)", + jacobian.shape[0], + n_rows, + n_cols, + ) + return jacobian + + +def loco_fit( + SC: "SimulatedCommissioning", + orm_measured, + orm_model, + jacobian, + fit_quads=True, + fit_correctors=True, + fit_bpms=True, + s_cut=None, + weights=1, + method="lm", +): + """Fit LOCO corrections using least-squares. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object (used only to infer dimension info). + orm_measured : np.ndarray + Measured orbit response matrix. + orm_model : np.ndarray + Model orbit response matrix. + jacobian : np.ndarray + Jacobian from :func:`calculate_jacobian`. + fit_quads, fit_correctors, fit_bpms : bool + Which parameter groups to fit. + s_cut : float or None + Singular-value cut-off (not used by ``method='lm'``; reserved for + future SVD-based solvers). + weights : float or np.ndarray + Weights applied to the residual. + method : str + Optimisation method forwarded to :func:`scipy.optimize.least_squares`. + + Returns + ------- + dict + ``'quad_corrections'``, ``'corrector_corrections'``, + ``'bpm_corrections'`` — each a numpy array (zeros where the + corresponding group was not fitted). + """ + n_rows, n_cols = orm_model.shape + n_total = jacobian.shape[0] + + # Jacobian layout: [quads, correctors, bpms] + # Corrector and BPM blocks are always present in the Jacobian + n_quad = n_total - n_cols - n_rows + lengths = [n_quad, n_cols, n_rows] + mask = _get_parameters_mask([fit_quads, fit_correctors, fit_bpms], lengths) + + initial_guess = np.zeros(n_total) + + result = least_squares( + lambda delta_params: _objective( + delta_params, orm_measured - orm_model, jacobian[mask], weights + ), + initial_guess[mask], + method=method, + verbose=0, + ) + corrections = result.x + + # Unpack into groups + quad_corrections = np.zeros(n_quad) + corrector_corrections = np.zeros(n_cols) + bpm_corrections = np.zeros(n_rows) + + idx = 0 + if fit_quads: + quad_corrections[:] = corrections[idx : idx + n_quad] + idx += n_quad + if fit_correctors: + corrector_corrections[:] = corrections[idx : idx + n_cols] + idx += n_cols + if fit_bpms: + bpm_corrections[:] = corrections[idx : idx + n_rows] + idx += n_rows + + logger.info( + "LOCO fit complete (method=%s): cost=%.6e, nfev=%d", + method, + result.cost, + result.nfev, + ) + + return { + "quad_corrections": quad_corrections, + "corrector_corrections": corrector_corrections, + "bpm_corrections": bpm_corrections, + } + + +def apply_loco_corrections(SC, corrections, quad_control_names, fraction=1.0): + """Apply fitted LOCO corrections to the lattice. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object. + corrections : dict + Output of :func:`loco_fit`. + quad_control_names : list[str] + Same list used in :func:`calculate_jacobian`. + fraction : float + Fraction of the correction to apply (useful for iterative schemes). + """ + for name, dk in zip(quad_control_names, corrections["quad_corrections"]): + current = SC.magnet_settings.get(name) + SC.magnet_settings.set(name, current - fraction * dk) + + logger.info( + "Applied LOCO corrections (fraction=%.2f) to %d quads", + fraction, + len(quad_control_names), + ) + + +def measure_orm(SC, dkick=1e-5, use_design=False): + """Convenience wrapper around :func:`measure_OrbitResponseMatrix`. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object. + dkick : float + Kick size for the finite-difference ORM measurement. + use_design : bool + If True, use the design lattice. + + Returns + ------- + np.ndarray + Orbit response matrix. + """ + return measure_OrbitResponseMatrix(SC, dkick=dkick, use_design=use_design) + + +def analyze_ring(SC, use_design=False): + """Compute summary optics figures of merit. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object. + use_design : bool + If True, analyse the design lattice (useful as a reference baseline + — in that case beta-beat and dispersion errors will be zero). + + Returns + ------- + dict + Keys: ``orbit_rms_x``, ``orbit_rms_y``, ``beta_beat_x``, + ``beta_beat_y``, ``disp_err_x``, ``disp_err_y``, ``tune_x``, + ``tune_y``, ``chromaticity_x``, ``chromaticity_y``. + """ + twiss = SC.lattice.get_twiss(use_design=use_design) + design = SC.lattice.get_twiss(use_design=True) + orbit = SC.lattice.get_orbit(use_design=use_design) + + return { + "orbit_rms_x": np.std(orbit[0]), + "orbit_rms_y": np.std(orbit[1]), + "beta_beat_x": np.std((twiss["betx"] - design["betx"]) / design["betx"]), + "beta_beat_y": np.std((twiss["bety"] - design["bety"]) / design["bety"]), + "disp_err_x": np.std(twiss["dx"] - design["dx"]), + "disp_err_y": np.std(twiss["dy"] - design["dy"]), + "tune_x": twiss["qx"], + "tune_y": twiss["qy"], + "chromaticity_x": twiss["dqx"], + "chromaticity_y": twiss["dqy"], + } diff --git a/pySC/apps/multipoles.py b/pySC/apps/multipoles.py new file mode 100644 index 00000000..816a1da9 --- /dev/null +++ b/pySC/apps/multipoles.py @@ -0,0 +1,215 @@ +"""Functions for setting systematic and random multipoles on magnets.""" + +import logging +from dataclasses import dataclass +from pathlib import Path + +import numpy as np + +logger = logging.getLogger(__name__) + + +@dataclass +class MultipoleTable: + """Result of reading a multipole table file. + + Attributes: + AB: array of shape (N, 2), columns are [A, B] components. + The nominal coefficient (where the original file had ~1.0) is zeroed. + main_order: 1-based order where the nominal coefficient was found. + main_component: 'A' or 'B', indicating which column held the nominal. + """ + AB: np.ndarray + main_order: int + main_component: str + + +def _multipoles_to_dict(multipoles): + """Convert multipoles to dict form if needed. + + Accepts either a dict of {(component, order): value} or an np.ndarray + of shape (N, 2) where columns are [A, B]. + """ + if isinstance(multipoles, np.ndarray): + if multipoles.ndim != 2 or multipoles.shape[1] != 2: + raise ValueError(f"Expected array of shape (N, 2), got {multipoles.shape}") + return { + (comp, row + 1): multipoles[row, col] + for row in range(multipoles.shape[0]) + for col, comp in enumerate(['A', 'B']) + if multipoles[row, col] != 0.0 + } + return multipoles + + +def _apply_zero_orders(multipoles, zero_orders): + """Remove entries for specified 1-based orders from multipoles dict.""" + if zero_orders is None: + return multipoles + return { + (comp, order): value + for (comp, order), value in multipoles.items() + if order not in zero_orders + } + + +def _extend_magnet_lists(magnet, idx): + """Extend magnet field lists to accommodate index `idx`.""" + if idx >= len(magnet.offset_A): + new_len = idx + 1 + magnet.offset_A.extend([0.0] * (new_len - len(magnet.offset_A))) + magnet.offset_B.extend([0.0] * (new_len - len(magnet.offset_B))) + magnet.A.extend([0.0] * (new_len - len(magnet.A))) + magnet.B.extend([0.0] * (new_len - len(magnet.B))) + magnet.max_order = idx + + +def set_systematic_multipoles(SC, magnet_names, multipoles, + relative_to_nominal=True, + main_order=None, main_component='B', + zero_orders=None): + """Set systematic multipoles on named magnets. + + Args: + SC: SimulatedCommissioning instance + magnet_names: list of magnet names (keys in SC.magnet_settings.magnets) + multipoles: dict of {(component, order): value} or np.ndarray of shape (N, 2) + where columns are [A, B]. Order is 1-based (1=dipole, 2=quad, etc.) + relative_to_nominal: if True, scale value by the nominal main field strength + main_order: 1-based order of the nominal component. + Default: magnet.max_order + 1 (i.e. the highest existing order) + main_component: 'A' or 'B'. Default: 'B' + zero_orders: list of 1-based orders to exclude before applying + """ + multipoles = _multipoles_to_dict(multipoles) + multipoles = _apply_zero_orders(multipoles, zero_orders) + + for name in magnet_names: + magnet = SC.magnet_settings.magnets[name] + + if relative_to_nominal: + order_idx = (main_order - 1) if main_order is not None else magnet.max_order + main_field = abs(SC.lattice.get_magnet_component( + magnet.sim_index, main_component, order_idx, use_design=True + )) + if main_field == 0: + logger.warning("Magnet '%s' has zero design main field, skipping relative scaling", name) + main_field = 1.0 + + for (component, order), value in multipoles.items(): + idx = order - 1 + _extend_magnet_lists(magnet, idx) + + scaled = value * main_field if relative_to_nominal else value + + if component == 'A': + magnet.offset_A[idx] += scaled + elif component == 'B': + magnet.offset_B[idx] += scaled + else: + raise ValueError(f"Component must be 'A' or 'B', got '{component}'") + + magnet.update() + logger.debug("Set systematic multipoles on '%s'", name) + + +def set_random_multipoles(SC, magnet_names, multipoles, zero_orders=None): + """Set random multipoles on named magnets. + + Uses truncated normal distribution (2-sigma) for realistic magnet errors. + + Args: + SC: SimulatedCommissioning instance + magnet_names: list of magnet names + multipoles: dict of {(component, order): rms_value} or np.ndarray of shape (N, 2) + zero_orders: list of 1-based orders to exclude before applying + """ + multipoles = _multipoles_to_dict(multipoles) + multipoles = _apply_zero_orders(multipoles, zero_orders) + + for name in magnet_names: + magnet = SC.magnet_settings.magnets[name] + + for (component, order), rms_value in multipoles.items(): + idx = order - 1 + _extend_magnet_lists(magnet, idx) + + value = SC.rng.normal_trunc(loc=0, scale=rms_value, sigma_truncate=2) + + if component == 'A': + magnet.offset_A[idx] += value + elif component == 'B': + magnet.offset_B[idx] += value + else: + raise ValueError(f"Component must be 'A' or 'B', got '{component}'") + + magnet.update() + logger.debug("Set random multipoles on '%s'", name) + + +def read_multipole_table(filepath): + """Read a multipole table file and return a MultipoleTable. + + Parses tab-or-space-delimited files with format:: + + # n PolynomA(n) PolynomB(n) + 0 0.0 -0.171 + 1 0.0 2.998 + + The nominal coefficient (the cell closest to 1.0) is identified and zeroed + in the returned array to prevent double-counting when used with excitation + scaling (Phase 2). This matches MATLAB's ``applySysMultipoles`` behavior. + + Args: + filepath: path to the multipole table file + + Returns: + MultipoleTable with AB array (nominal zeroed), main_order (1-based), + and main_component ('A' or 'B') + """ + filepath = Path(filepath) + rows = [] + with open(filepath) as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + parts = line.split() + if len(parts) < 3: + continue + try: + _order = int(parts[0]) + a_val = float(parts[1]) + b_val = float(parts[2]) + rows.append((a_val, b_val)) + except ValueError: + continue + + if not rows: + raise ValueError(f"No valid data rows found in {filepath}") + + AB = np.array(rows) # shape (N, 2), cols = [A, B] + + # Find the cell closest to 1.0 + distances = np.abs(AB - 1.0) + min_idx = np.unravel_index(np.argmin(distances), distances.shape) + min_dist = distances[min_idx] + + if min_dist > 1e-6: + # No cell near 1.0 — fall back to largest magnitude + mag_idx = np.unravel_index(np.argmax(np.abs(AB)), AB.shape) + logger.warning( + "No coefficient close to 1.0 found in %s; defaulting to largest " + "magnitude at row %d, col %d", filepath, mag_idx[0], mag_idx[1] + ) + row, col = int(mag_idx[0]), int(mag_idx[1]) + else: + row, col = int(min_idx[0]), int(min_idx[1]) + + main_order = row + 1 # 1-based + main_component = 'A' if col == 0 else 'B' + + # Zero the nominal coefficient to prevent double-counting + AB[row, col] = 0.0 + + return MultipoleTable(AB=AB, main_order=main_order, main_component=main_component) diff --git a/tests/apps/test_dynamic_aperture.py b/tests/apps/test_dynamic_aperture.py new file mode 100644 index 00000000..3a00565f --- /dev/null +++ b/tests/apps/test_dynamic_aperture.py @@ -0,0 +1,130 @@ +"""Tests for pySC.apps.dynamic_aperture — DA scan and helpers.""" +import numpy as np +import pytest +from unittest.mock import MagicMock, patch + +from pySC.apps.dynamic_aperture import ( + dynamic_aperture, + _is_lost, + _shoelace_area, +) + + +class TestShoelaceArea: + """Test _shoelace_area polygon area computation.""" + + def test_unit_square(self): + x = np.array([0, 1, 1, 0]) + y = np.array([0, 0, 1, 1]) + assert _shoelace_area(x, y) == pytest.approx(1.0) + + def test_triangle(self): + x = np.array([0, 2, 0]) + y = np.array([0, 0, 3]) + assert _shoelace_area(x, y) == pytest.approx(3.0) + + def test_degenerate_line(self): + x = np.array([0, 1, 2]) + y = np.array([0, 0, 0]) + assert _shoelace_area(x, y) == pytest.approx(0.0) + + def test_circle_approximation(self): + """Regular n-gon area approaches pi*r^2 for large n.""" + n = 1000 + r = 1.0 + theta = np.linspace(0, 2 * np.pi, n, endpoint=False) + x = r * np.cos(theta) + y = r * np.sin(theta) + np.testing.assert_allclose(_shoelace_area(x, y), np.pi * r**2, rtol=1e-4) + + +class TestIsLost: + """Test _is_lost particle tracking wrapper.""" + + def test_stable_particle(self): + SC = MagicMock() + SC.lattice.track.return_value = np.zeros((1, 6)) + assert not _is_lost(SC, radius=1e-3, theta=0, x0=0, y0=0, n_turns=100) + + def test_lost_particle(self): + SC = MagicMock() + result = np.full((1, 6), np.nan) + SC.lattice.track.return_value = result + assert _is_lost(SC, radius=0.1, theta=0, x0=0, y0=0, n_turns=100) + + def test_initial_conditions_include_offset(self): + SC = MagicMock() + SC.lattice.track.return_value = np.zeros((1, 6)) + + _is_lost(SC, radius=1e-3, theta=np.pi / 4, x0=0.001, y0=-0.002, n_turns=10) + + bunch = SC.lattice.track.call_args.args[0] + expected_x = 1e-3 * np.cos(np.pi / 4) + 0.001 + expected_y = 1e-3 * np.sin(np.pi / 4) - 0.002 + np.testing.assert_allclose(bunch[0, 0], expected_x) + np.testing.assert_allclose(bunch[0, 2], expected_y) + + +class TestDynamicAperture: + """Test dynamic_aperture scan with mocked tracking.""" + + def _make_sc(self, loss_radius=0.01): + """SC mock where particles are lost beyond loss_radius.""" + SC = MagicMock() + SC.lattice.get_orbit.return_value = np.zeros((2, 1)) + + def track_fn(bunch, n_turns=1000): + r = np.sqrt(bunch[0, 0] ** 2 + bunch[0, 2] ** 2) + if r > loss_radius: + return np.full_like(bunch, np.nan) + return bunch + + SC.lattice.track.side_effect = track_fn + return SC + + def test_returns_expected_keys(self): + SC = self._make_sc() + result = dynamic_aperture(SC, n_angles=4, n_turns=10, accuracy=1e-4) + for key in ("radii", "angles", "area", "x", "y"): + assert key in result + + def test_radii_shape_matches_angles(self): + n = 8 + SC = self._make_sc() + result = dynamic_aperture(SC, n_angles=n, n_turns=10, accuracy=1e-4) + assert result["radii"].shape == (n,) + assert result["angles"].shape == (n,) + + def test_radii_close_to_loss_radius(self): + loss_r = 0.01 + SC = self._make_sc(loss_radius=loss_r) + result = dynamic_aperture( + SC, n_angles=4, n_turns=10, + accuracy=1e-5, initial_radius=1e-3, + ) + # All radii should converge near loss_radius + np.testing.assert_allclose(result["radii"], loss_r, atol=1e-4) + + def test_area_positive(self): + SC = self._make_sc(loss_radius=0.01) + result = dynamic_aperture(SC, n_angles=16, n_turns=10, accuracy=1e-5) + assert result["area"] > 0 + + def test_zero_aperture_when_always_lost(self): + """If particles are lost at any radius, DA is zero.""" + SC = MagicMock() + SC.lattice.get_orbit.return_value = np.zeros((2, 1)) + SC.lattice.track.return_value = np.full((1, 6), np.nan) + + result = dynamic_aperture(SC, n_angles=4, n_turns=10, accuracy=1e-4) + np.testing.assert_array_equal(result["radii"], 0) + assert result["area"] == pytest.approx(0.0) + + def test_center_on_orbit_false(self): + """With center_on_orbit=False, orbit is not queried.""" + SC = self._make_sc(loss_radius=0.005) + result = dynamic_aperture( + SC, n_angles=4, n_turns=10, + accuracy=1e-5, center_on_orbit=False, + ) + SC.lattice.get_orbit.assert_not_called() diff --git a/tests/apps/test_loco.py b/tests/apps/test_loco.py new file mode 100644 index 00000000..2da0abca --- /dev/null +++ b/tests/apps/test_loco.py @@ -0,0 +1,172 @@ +"""Tests for pySC.apps.loco — LOCO helpers and fitting.""" +import numpy as np +import pytest +from unittest.mock import MagicMock, patch + +from pySC.apps.loco import ( + _objective, + _get_parameters_mask, + calculate_jacobian, + loco_fit, + apply_loco_corrections, +) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +class TestGetParametersMask: + """Test _get_parameters_mask boolean mask builder.""" + + def test_all_flags_true(self): + mask = _get_parameters_mask([True, True, True], [3, 4, 5]) + assert mask.shape == (12,) + assert mask.all() + + def test_all_flags_false(self): + mask = _get_parameters_mask([False, False, False], [3, 4, 5]) + assert not mask.any() + + def test_partial_flags(self): + mask = _get_parameters_mask([True, False, True], [2, 3, 2]) + expected = np.array([True, True, False, False, False, True, True]) + np.testing.assert_array_equal(mask, expected) + + def test_single_group(self): + mask = _get_parameters_mask([True], [5]) + assert mask.shape == (5,) + assert mask.all() + + +class TestObjective: + """Test _objective weighted residual computation.""" + + def test_zero_params_returns_weighted_residual(self): + """With zero parameter corrections, residual == orm_residuals * weights.""" + orm_res = np.ones((4, 3)) + jac = np.zeros((2, 4, 3)) + params = np.zeros(2) + result = _objective(params, orm_res, jac, weights=1) + np.testing.assert_allclose(result, orm_res.ravel()) + + def test_perfect_correction_gives_zero(self): + """If Jacobian * params == orm_residuals, residual is zero.""" + orm_res = np.ones((2, 2)) + # Single parameter, Jacobian entry is all ones + jac = np.ones((1, 2, 2)) + params = np.array([1.0]) + result = _objective(params, orm_res, jac, weights=1) + np.testing.assert_allclose(result, np.zeros(4), atol=1e-14) + + def test_weights_scale_residual(self): + """Weights multiply the residual row-wise.""" + orm_res = np.ones((3, 2)) + jac = np.zeros((1, 3, 2)) + params = np.zeros(1) + weights = np.array([1.0, 4.0, 9.0]) + result = _objective(params, orm_res, jac, weights) + # sqrt(weights) applied row-wise: [1, 2, 3] + expected = np.array([1, 1, 2, 2, 3, 3], dtype=float) + np.testing.assert_allclose(result, expected) + + +# --------------------------------------------------------------------------- +# Public API (with mocks for SC internals) +# --------------------------------------------------------------------------- + +class TestCalculateJacobian: + """Test calculate_jacobian structure and dimensions.""" + + def test_corrector_and_bpm_blocks(self): + """Corrector-gain and BPM-gain Jacobian blocks have correct structure.""" + SC = MagicMock() + n_bpms, n_corr = 4, 3 + orm_model = np.random.RandomState(0).randn(n_bpms, n_corr) + + with patch("pySC.apps.loco.measure_OrbitResponseMatrix") as mock_orm: + mock_orm.return_value = orm_model + 0.001 # small perturbation + jac = calculate_jacobian( + SC, orm_model, quad_control_names=["Q1"], + include_correctors=True, include_bpms=True, + include_dispersion=False, use_design=True, + ) + + # 1 quad + 3 correctors + 4 bpms = 8 + assert jac.shape == (1 + n_corr + n_bpms, n_bpms, n_corr) + + def test_no_correctors_no_bpms(self): + """With correctors and BPMs disabled, only quad entries remain.""" + SC = MagicMock() + orm_model = np.ones((2, 2)) + + with patch("pySC.apps.loco.measure_OrbitResponseMatrix") as mock_orm: + mock_orm.return_value = orm_model * 1.01 + jac = calculate_jacobian( + SC, orm_model, quad_control_names=["Q1", "Q2"], + include_correctors=False, include_bpms=False, + include_dispersion=False, + ) + + assert jac.shape[0] == 2 # only quads + + +class TestLocoFit: + """Test loco_fit returns correct structure.""" + + def test_returns_all_correction_keys(self): + SC = MagicMock() + n_bpms, n_corr, n_quad = 4, 3, 2 + n_total = n_quad + n_corr + n_bpms + orm_meas = np.random.RandomState(1).randn(n_bpms, n_corr) + orm_model = orm_meas + 0.01 + jac = np.random.RandomState(2).randn(n_total, n_bpms, n_corr) + + result = loco_fit(SC, orm_meas, orm_model, jac) + + assert "quad_corrections" in result + assert "corrector_corrections" in result + assert "bpm_corrections" in result + assert result["quad_corrections"].shape == (n_quad,) + assert result["corrector_corrections"].shape == (n_corr,) + assert result["bpm_corrections"].shape == (n_bpms,) + + def test_fit_subset(self): + """Fitting only quads leaves corrector/bpm corrections at zero.""" + SC = MagicMock() + n_bpms, n_corr, n_quad = 3, 2, 2 + n_total = n_quad + n_corr + n_bpms + orm_meas = np.eye(n_bpms, n_corr) * 0.01 + orm_model = np.zeros((n_bpms, n_corr)) + jac = np.random.RandomState(3).randn(n_total, n_bpms, n_corr) * 0.1 + + result = loco_fit( + SC, orm_meas, orm_model, jac, + fit_quads=True, fit_correctors=False, fit_bpms=False, + ) + + np.testing.assert_array_equal(result["corrector_corrections"], 0) + np.testing.assert_array_equal(result["bpm_corrections"], 0) + + +class TestApplyLocoCorrections: + """Test apply_loco_corrections calls magnet_settings correctly.""" + + def test_applies_fraction(self): + SC = MagicMock() + SC.magnet_settings.get.return_value = 1.0 + + corrections = { + "quad_corrections": np.array([0.1, -0.2]), + "corrector_corrections": np.array([]), + "bpm_corrections": np.array([]), + } + + apply_loco_corrections(SC, corrections, ["Q1", "Q2"], fraction=0.5) + + calls = SC.magnet_settings.set.call_args_list + assert len(calls) == 2 + # Q1: 1.0 - 0.5*0.1 = 0.95 + np.testing.assert_allclose(calls[0].args[1], 0.95) + # Q2: 1.0 - 0.5*(-0.2) = 1.1 + np.testing.assert_allclose(calls[1].args[1], 1.1) diff --git a/tests/apps/test_multipoles.py b/tests/apps/test_multipoles.py new file mode 100644 index 00000000..b5f7a6b4 --- /dev/null +++ b/tests/apps/test_multipoles.py @@ -0,0 +1,405 @@ +"""Tests for pySC.apps.multipoles — systematic and random multipole assignment.""" +import numpy as np +import pytest +from unittest.mock import MagicMock + +from pySC.apps.multipoles import ( + _extend_magnet_lists, + _multipoles_to_dict, + _apply_zero_orders, + set_systematic_multipoles, + set_random_multipoles, + read_multipole_table, + MultipoleTable, +) + + +class FakeMagnet: + """Lightweight stand-in for a magnet settings object.""" + + def __init__(self, max_order=2): + self.max_order = max_order + self.offset_A = [0.0] * (max_order + 1) + self.offset_B = [0.0] * (max_order + 1) + self.A = [0.0] * (max_order + 1) + self.B = [0.0] * (max_order + 1) + self.sim_index = 0 + self.update_count = 0 + + def update(self): + self.update_count += 1 + + +class TestExtendMagnetLists: + """Test _extend_magnet_lists grows field lists correctly.""" + + def test_extends_when_needed(self): + mag = FakeMagnet(max_order=1) + assert len(mag.offset_A) == 2 + _extend_magnet_lists(mag, 4) + assert len(mag.offset_A) == 5 + assert len(mag.offset_B) == 5 + assert len(mag.A) == 5 + assert len(mag.B) == 5 + assert mag.max_order == 4 + + def test_no_extend_when_within_bounds(self): + mag = FakeMagnet(max_order=5) + original_len = len(mag.offset_A) + _extend_magnet_lists(mag, 3) + assert len(mag.offset_A) == original_len + # max_order should NOT change when no extension needed + assert mag.max_order == 5 + + def test_extension_pads_with_zeros(self): + mag = FakeMagnet(max_order=0) + mag.offset_A = [1.0] + _extend_magnet_lists(mag, 2) + assert mag.offset_A == [1.0, 0.0, 0.0] + + +class TestMultipolesToDict: + """Test numpy array to dict conversion.""" + + def test_ndarray_conversion(self): + arr = np.array([[0.0, 0.1], [0.0, 1.0], [0.005, -0.003]]) + result = _multipoles_to_dict(arr) + assert result == { + ('B', 1): 0.1, + ('B', 2): 1.0, + ('A', 3): 0.005, + ('B', 3): -0.003, + } + + def test_skips_zeros(self): + arr = np.array([[0.0, 0.0], [0.0, 0.5]]) + result = _multipoles_to_dict(arr) + assert result == {('B', 2): 0.5} + + def test_dict_passthrough(self): + d = {('A', 1): 0.5} + assert _multipoles_to_dict(d) is d + + def test_bad_shape_raises(self): + with pytest.raises(ValueError, match="shape"): + _multipoles_to_dict(np.array([1.0, 2.0])) + + +class TestApplyZeroOrders: + """Test zero_orders filtering.""" + + def test_removes_specified_orders(self): + multipoles = {('A', 1): 0.1, ('B', 1): 0.2, ('B', 2): 0.3, ('A', 3): 0.4} + result = _apply_zero_orders(multipoles, [1, 3]) + assert result == {('B', 2): 0.3} + + def test_none_returns_unchanged(self): + multipoles = {('B', 1): 0.5} + assert _apply_zero_orders(multipoles, None) is multipoles + + +class TestSetSystematicMultipoles: + """Test set_systematic_multipoles applies values correctly.""" + + def _make_sc(self, magnet_names): + SC = MagicMock() + magnets = {} + for name in magnet_names: + magnets[name] = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = magnets + SC.lattice.get_magnet_component.return_value = 10.0 # nominal field + return SC + + def test_relative_scaling(self): + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 3): 0.01}, + relative_to_nominal=True, + ) + # 0.01 * 10.0 = 0.1 + assert SC.magnet_settings.magnets["Q1"].offset_B[2] == pytest.approx(0.1) + + def test_absolute_scaling(self): + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("A", 2): 0.05}, + relative_to_nominal=False, + ) + assert SC.magnet_settings.magnets["Q1"].offset_A[1] == pytest.approx(0.05) + + def test_invalid_component_raises(self): + SC = self._make_sc(["Q1"]) + with pytest.raises(ValueError, match="Component must be"): + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("C", 1): 0.1}, + relative_to_nominal=False, + ) + + def test_update_called_per_magnet(self): + SC = self._make_sc(["Q1", "Q2"]) + set_systematic_multipoles( + SC, ["Q1", "Q2"], + multipoles={("B", 1): 0.001}, + relative_to_nominal=False, + ) + for name in ["Q1", "Q2"]: + assert SC.magnet_settings.magnets[name].update_count == 1 + + def test_zero_main_field_uses_unity(self): + """When nominal field is zero, scaling falls back to 1.0.""" + SC = self._make_sc(["Q1"]) + SC.lattice.get_magnet_component.return_value = 0.0 + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.5}, + relative_to_nominal=True, + ) + # main_field set to 1.0, so value = 0.5 * 1.0 + assert SC.magnet_settings.magnets["Q1"].offset_B[0] == pytest.approx(0.5) + + def test_accumulation(self): + """Calling twice should accumulate, not overwrite (bug fix).""" + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.001}, + relative_to_nominal=False, + ) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.002}, + relative_to_nominal=False, + ) + assert SC.magnet_settings.magnets["Q1"].offset_B[0] == pytest.approx(0.003) + + def test_accumulation_different_components(self): + """Accumulation works across A and B independently.""" + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("A", 2): 0.01, ("B", 2): 0.02}, + relative_to_nominal=False, + ) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("A", 2): 0.03, ("B", 2): 0.04}, + relative_to_nominal=False, + ) + assert SC.magnet_settings.magnets["Q1"].offset_A[1] == pytest.approx(0.04) + assert SC.magnet_settings.magnets["Q1"].offset_B[1] == pytest.approx(0.06) + + def test_main_order_param(self): + """Explicit main_order overrides magnet.max_order for field lookup.""" + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.01}, + relative_to_nominal=True, + main_order=1, # dipole + ) + # Should look up index 0 (main_order - 1 = 0), not max_order=2 + SC.lattice.get_magnet_component.assert_called_with(0, 'B', 0, use_design=True) + + def test_main_component_A(self): + """main_component='A' looks up the A component for scaling.""" + SC = self._make_sc(["Q1"]) + SC.lattice.get_magnet_component.return_value = 5.0 + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.1}, + relative_to_nominal=True, + main_component='A', + ) + SC.lattice.get_magnet_component.assert_called_with(0, 'A', 2, use_design=True) + # 0.1 * 5.0 = 0.5 + assert SC.magnet_settings.magnets["Q1"].offset_B[0] == pytest.approx(0.5) + + def test_numpy_array_input(self): + """np.ndarray input produces same result as equivalent dict.""" + SC = self._make_sc(["Q1"]) + arr = np.array([[0.0, 0.1], [0.0, 1.0], [0.005, -0.003]]) + set_systematic_multipoles(SC, ["Q1"], multipoles=arr, relative_to_nominal=False) + mag = SC.magnet_settings.magnets["Q1"] + assert mag.offset_B[0] == pytest.approx(0.1) + assert mag.offset_B[1] == pytest.approx(1.0) + assert mag.offset_A[2] == pytest.approx(0.005) + assert mag.offset_B[2] == pytest.approx(-0.003) + + def test_zero_orders(self): + """zero_orders excludes specified orders.""" + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.1, ("B", 2): 0.2, ("B", 3): 0.3}, + relative_to_nominal=False, + zero_orders=[1, 2], + ) + mag = SC.magnet_settings.magnets["Q1"] + assert mag.offset_B[0] == pytest.approx(0.0) + assert mag.offset_B[1] == pytest.approx(0.0) + assert mag.offset_B[2] == pytest.approx(0.3) + + +class TestSetRandomMultipoles: + """Test set_random_multipoles adds random offsets.""" + + def test_adds_random_values(self): + SC = MagicMock() + mag = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = {"Q1": mag} + SC.rng.normal_trunc.return_value = 0.123 + + set_random_multipoles(SC, ["Q1"], multipoles={("B", 2): 1.0}) + + SC.rng.normal_trunc.assert_called_once_with(loc=0, scale=1.0, sigma_truncate=2) + assert mag.offset_B[1] == pytest.approx(0.123) + + def test_accumulates_on_existing(self): + SC = MagicMock() + mag = FakeMagnet(max_order=2) + mag.offset_A[0] = 0.5 + SC.magnet_settings.magnets = {"Q1": mag} + SC.rng.normal_trunc.return_value = 0.1 + + set_random_multipoles(SC, ["Q1"], multipoles={("A", 1): 1.0}) + + assert mag.offset_A[0] == pytest.approx(0.6) + + def test_invalid_component_raises(self): + SC = MagicMock() + SC.magnet_settings.magnets = {"Q1": FakeMagnet()} + SC.rng.normal_trunc.return_value = 0.0 + with pytest.raises(ValueError, match="Component must be"): + set_random_multipoles(SC, ["Q1"], multipoles={("X", 1): 1.0}) + + def test_numpy_array_input(self): + SC = MagicMock() + mag = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = {"Q1": mag} + SC.rng.normal_trunc.return_value = 0.05 + + arr = np.array([[0.0, 0.5], [0.3, 0.0]]) + set_random_multipoles(SC, ["Q1"], multipoles=arr) + + # Should be called twice: ('B',1)=0.5 and ('A',2)=0.3 + assert SC.rng.normal_trunc.call_count == 2 + + def test_zero_orders(self): + SC = MagicMock() + mag = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = {"Q1": mag} + SC.rng.normal_trunc.return_value = 0.1 + + set_random_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 1.0, ("B", 2): 1.0, ("A", 3): 1.0}, + zero_orders=[1], + ) + # Only orders 2 and 3 should be applied + assert SC.rng.normal_trunc.call_count == 2 + assert mag.offset_B[0] == pytest.approx(0.0) # order 1 zeroed out + + +class TestReadMultipoleTable: + """Test read_multipole_table parses files correctly.""" + + def test_basic_parse(self, tmp_path): + table_file = tmp_path / "quad_multipoles.txt" + table_file.write_text( + "# n PolynomA(n) PolynomB(n)\n" + "0 0.0 -0.00171\n" + "1 0.0 1.0\n" + "2 0.0 -0.003\n" + ) + result = read_multipole_table(table_file) + assert isinstance(result, MultipoleTable) + assert result.AB.shape == (3, 2) + assert result.main_order == 2 # 1-based, row 1 + assert result.main_component == 'B' + # Nominal zeroed + assert result.AB[1, 1] == 0.0 + # Other values preserved + assert result.AB[0, 1] == pytest.approx(-0.00171) + assert result.AB[2, 1] == pytest.approx(-0.003) + + def test_skew_magnet(self, tmp_path): + """Detects A-component nominal for a skew magnet.""" + table_file = tmp_path / "skew_quad.txt" + table_file.write_text( + "0 0.0 0.0\n" + "1 1.0 0.0\n" + "2 0.001 0.0\n" + ) + result = read_multipole_table(table_file) + assert result.main_order == 2 + assert result.main_component == 'A' + assert result.AB[1, 0] == 0.0 # nominal zeroed + + def test_comments_and_blanks_ignored(self, tmp_path): + table_file = tmp_path / "with_comments.txt" + table_file.write_text( + "# Header line\n" + "\n" + "# Another comment\n" + "0 0.0 0.5\n" + "1 0.0 1.0\n" + ) + result = read_multipole_table(table_file) + assert result.AB.shape == (2, 2) + assert result.main_order == 2 + + def test_empty_file_raises(self, tmp_path): + table_file = tmp_path / "empty.txt" + table_file.write_text("# only comments\n") + with pytest.raises(ValueError, match="No valid data rows"): + read_multipole_table(table_file) + + def test_fallback_to_largest_magnitude(self, tmp_path): + """When no cell is close to 1.0, falls back to largest magnitude.""" + table_file = tmp_path / "no_unity.txt" + table_file.write_text( + "0 0.0 0.0\n" + "1 0.0 50.0\n" + "2 0.001 0.003\n" + ) + result = read_multipole_table(table_file) + assert result.main_order == 2 # row 1 has 50.0 (largest) + assert result.main_component == 'B' + assert result.AB[1, 1] == 0.0 # nominal zeroed + + def test_tab_delimited(self, tmp_path): + table_file = tmp_path / "tabs.txt" + table_file.write_text("0\t0.0\t0.5\n1\t0.0\t1.0\n") + result = read_multipole_table(table_file) + assert result.main_order == 2 + assert result.main_component == 'B' + + def test_integration_with_set_systematic(self, tmp_path): + """Read a table and feed it directly to set_systematic_multipoles.""" + table_file = tmp_path / "sext.txt" + table_file.write_text( + "0 0.0 -0.001\n" + "1 0.0 0.0\n" + "2 0.0 1.0\n" + ) + table = read_multipole_table(table_file) + + SC = MagicMock() + mag = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = {"S1": mag} + SC.lattice.get_magnet_component.return_value = 100.0 + + set_systematic_multipoles( + SC, ["S1"], + multipoles=table.AB, + relative_to_nominal=True, + main_order=table.main_order, + main_component=table.main_component, + ) + # Dipole error: -0.001 * 100.0 = -0.1 + assert mag.offset_B[0] == pytest.approx(-0.1) + # Nominal (order 3, sextupole) was zeroed in table, so no contribution + assert mag.offset_B[2] == pytest.approx(0.0)