diff --git a/pySC/apps/dynamic_aperture.py b/pySC/apps/dynamic_aperture.py new file mode 100644 index 00000000..adb5c967 --- /dev/null +++ b/pySC/apps/dynamic_aperture.py @@ -0,0 +1,70 @@ +import logging + +import numpy as np + +logger = logging.getLogger(__name__) + + +def dynamic_aperture( + SC, n_angles=16, n_turns=1000, accuracy=1e-6, + initial_radius=1e-3, max_radius=0.05, + center_on_orbit=True, use_design=False, +) -> dict: + """Calculate the dynamic aperture by binary search at each angle.""" + angles = np.linspace(0, 2 * np.pi, n_angles, endpoint=False) + + if center_on_orbit: + orbit = SC.lattice.get_orbit(use_design=use_design) + x0, y0 = orbit[0, 0], orbit[1, 0] + else: + x0, y0 = 0.0, 0.0 + + radii = np.zeros(n_angles) + + for i, theta in enumerate(angles): + lower = 0.0 + upper = initial_radius + + # Scale upper bound until particle is lost + while upper <= max_radius: + if _is_lost(SC, upper, theta, x0, y0, n_turns): + break + lower = upper + upper *= 2.0 + upper = min(upper, max_radius) + + # If even the smallest radius is lost, DA is zero at this angle + if lower == 0.0 and _is_lost(SC, lower, theta, x0, y0, n_turns): + radii[i] = 0.0 + continue + + # Binary search + while upper - lower > accuracy: + mid = (lower + upper) / 2.0 + if _is_lost(SC, mid, theta, x0, y0, n_turns): + upper = mid + else: + lower = mid + + radii[i] = lower + logger.debug("Angle %.4f rad: stable radius %.6e m", theta, lower) + + x = radii * np.cos(angles) + y = radii * np.sin(angles) + area = _shoelace_area(x, y) + + return {"radii": radii, "angles": angles, "area": area, "x": x, "y": y} + + +def _is_lost(SC, radius, theta, x0, y0, n_turns): + """Return True if a particle at the given radius and angle is lost.""" + bunch = np.zeros((1, 6)) + bunch[0, 0] = radius * np.cos(theta) + x0 + bunch[0, 2] = radius * np.sin(theta) + y0 + result = SC.lattice.track(bunch, n_turns=n_turns) + return np.any(np.isnan(result)) + + +def _shoelace_area(x, y): + """Compute polygon area using the shoelace formula.""" + return 0.5 * np.abs(np.dot(x, np.roll(y, -1)) - np.dot(y, np.roll(x, -1))) diff --git a/pySC/apps/loco.py b/pySC/apps/loco.py new file mode 100644 index 00000000..52afbcb0 --- /dev/null +++ b/pySC/apps/loco.py @@ -0,0 +1,306 @@ +""" +LOCO (Linear Optics from Closed Orbits) implementation for pySC. + +Provides functions to compute orbit response matrix Jacobians with respect +to quadrupole strengths, fit optics corrections via least-squares, and +apply the resulting corrections back to the lattice. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +import numpy as np +from scipy.optimize import least_squares + +from ..tuning.response_measurements import ( + measure_OrbitResponseMatrix, + measure_RFFrequencyOrbitResponse, +) + +if TYPE_CHECKING: + from ..core.simulated_commissioning import SimulatedCommissioning + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _objective(masked_params, orm_residuals, masked_jacobian, weights): + """Weighted residual for the LOCO least-squares problem.""" + residuals = orm_residuals - np.einsum("ijk,i->jk", masked_jacobian, masked_params) + w = np.sqrt(np.atleast_1d(weights)) + return (residuals * w[:, np.newaxis]).ravel() + + +def _get_parameters_mask(fit_flags, lengths): + """Build a boolean mask selecting the parameter groups to fit.""" + mask = np.zeros(sum(lengths), dtype=bool) + current_index = 0 + for flag, length in zip(fit_flags, lengths): + if flag: + mask[current_index : current_index + length] = True + current_index += length + return mask + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def calculate_jacobian( + SC: "SimulatedCommissioning", + orm_model, + quad_control_names, + dk=1e-5, + include_correctors=True, + include_bpms=True, + include_dispersion=False, + use_design=True, +): + """Compute the LOCO Jacobian of the ORM with respect to parameters. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object. + orm_model : np.ndarray + Model orbit response matrix, shape ``(n_bpms*2, n_correctors)``. + If *include_dispersion* is True the last column should be the + dispersion response. + quad_control_names : list[str] + Names of the quadrupole control knobs to include. + dk : float + Finite-difference step size for quadrupole strengths. + include_correctors : bool + Whether to append corrector-gain Jacobian entries. + include_bpms : bool + Whether to append BPM-gain Jacobian entries. + include_dispersion : bool + Whether to append the RF-frequency orbit response (dispersion) + as an extra column in each ORM measurement. + use_design : bool + If True, compute model ORMs from the design lattice. + + Returns + ------- + jacobian : np.ndarray + Shape ``(n_params, n_bpms*2, n_correctors [+1])``. + """ + n_rows, n_cols = orm_model.shape + jacobian_rows = [] + + # --- Quadrupole derivatives --- + for name in quad_control_names: + current = SC.magnet_settings.get(name) + SC.magnet_settings.set(name, current + dk) + + orm_bumped = measure_OrbitResponseMatrix(SC, use_design=use_design) + + if include_dispersion: + disp = measure_RFFrequencyOrbitResponse(SC, use_design=use_design) + disp_col = disp.reshape(-1, 1) + orm_bumped = np.hstack([orm_bumped, disp_col]) + + SC.magnet_settings.set(name, current) + + jacobian_rows.append((orm_bumped - orm_model) / dk) + + # --- Corrector-gain entries --- + if include_correctors: + for j in range(n_cols): + entry = np.zeros_like(orm_model) + entry[:, j] = orm_model[:, j] + jacobian_rows.append(entry) + + # --- BPM-gain entries --- + if include_bpms: + for i in range(n_rows): + entry = np.zeros_like(orm_model) + entry[i, :] = orm_model[i, :] + jacobian_rows.append(entry) + + jacobian = np.array(jacobian_rows) + logger.info( + "LOCO Jacobian computed: %d parameters, ORM shape (%d, %d)", + jacobian.shape[0], + n_rows, + n_cols, + ) + return jacobian + + +def loco_fit( + SC: "SimulatedCommissioning", + orm_measured, + orm_model, + jacobian, + fit_quads=True, + fit_correctors=True, + fit_bpms=True, + s_cut=None, + weights=1, + method="lm", +): + """Fit LOCO corrections using least-squares. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object (used only to infer dimension info). + orm_measured : np.ndarray + Measured orbit response matrix. + orm_model : np.ndarray + Model orbit response matrix. + jacobian : np.ndarray + Jacobian from :func:`calculate_jacobian`. + fit_quads, fit_correctors, fit_bpms : bool + Which parameter groups to fit. + s_cut : float or None + Singular-value cut-off (not used by ``method='lm'``; reserved for + future SVD-based solvers). + weights : float or np.ndarray + Weights applied to the residual. + method : str + Optimisation method forwarded to :func:`scipy.optimize.least_squares`. + + Returns + ------- + dict + ``'quad_corrections'``, ``'corrector_corrections'``, + ``'bpm_corrections'`` — each a numpy array (zeros where the + corresponding group was not fitted). + """ + n_rows, n_cols = orm_model.shape + n_total = jacobian.shape[0] + + # Jacobian layout: [quads, correctors, bpms] + # Corrector and BPM blocks are always present in the Jacobian + n_quad = n_total - n_cols - n_rows + lengths = [n_quad, n_cols, n_rows] + mask = _get_parameters_mask([fit_quads, fit_correctors, fit_bpms], lengths) + + initial_guess = np.zeros(n_total) + + result = least_squares( + lambda delta_params: _objective( + delta_params, orm_measured - orm_model, jacobian[mask], weights + ), + initial_guess[mask], + method=method, + verbose=0, + ) + corrections = result.x + + # Unpack into groups + quad_corrections = np.zeros(n_quad) + corrector_corrections = np.zeros(n_cols) + bpm_corrections = np.zeros(n_rows) + + idx = 0 + if fit_quads: + quad_corrections[:] = corrections[idx : idx + n_quad] + idx += n_quad + if fit_correctors: + corrector_corrections[:] = corrections[idx : idx + n_cols] + idx += n_cols + if fit_bpms: + bpm_corrections[:] = corrections[idx : idx + n_rows] + idx += n_rows + + logger.info( + "LOCO fit complete (method=%s): cost=%.6e, nfev=%d", + method, + result.cost, + result.nfev, + ) + + return { + "quad_corrections": quad_corrections, + "corrector_corrections": corrector_corrections, + "bpm_corrections": bpm_corrections, + } + + +def apply_loco_corrections(SC, corrections, quad_control_names, fraction=1.0): + """Apply fitted LOCO corrections to the lattice. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object. + corrections : dict + Output of :func:`loco_fit`. + quad_control_names : list[str] + Same list used in :func:`calculate_jacobian`. + fraction : float + Fraction of the correction to apply (useful for iterative schemes). + """ + for name, dk in zip(quad_control_names, corrections["quad_corrections"]): + current = SC.magnet_settings.get(name) + SC.magnet_settings.set(name, current - fraction * dk) + + logger.info( + "Applied LOCO corrections (fraction=%.2f) to %d quads", + fraction, + len(quad_control_names), + ) + + +def measure_orm(SC, dkick=1e-5, use_design=False): + """Convenience wrapper around :func:`measure_OrbitResponseMatrix`. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object. + dkick : float + Kick size for the finite-difference ORM measurement. + use_design : bool + If True, use the design lattice. + + Returns + ------- + np.ndarray + Orbit response matrix. + """ + return measure_OrbitResponseMatrix(SC, dkick=dkick, use_design=use_design) + + +def analyze_ring(SC, use_design=False): + """Compute summary optics figures of merit. + + Parameters + ---------- + SC : SimulatedCommissioning + The simulation object. + use_design : bool + If True, analyse the design lattice (useful as a reference baseline + — in that case beta-beat and dispersion errors will be zero). + + Returns + ------- + dict + Keys: ``orbit_rms_x``, ``orbit_rms_y``, ``beta_beat_x``, + ``beta_beat_y``, ``disp_err_x``, ``disp_err_y``, ``tune_x``, + ``tune_y``, ``chromaticity_x``, ``chromaticity_y``. + """ + twiss = SC.lattice.get_twiss(use_design=use_design) + design = SC.lattice.get_twiss(use_design=True) + orbit = SC.lattice.get_orbit(use_design=use_design) + + return { + "orbit_rms_x": np.std(orbit[0]), + "orbit_rms_y": np.std(orbit[1]), + "beta_beat_x": np.std((twiss["betx"] - design["betx"]) / design["betx"]), + "beta_beat_y": np.std((twiss["bety"] - design["bety"]) / design["bety"]), + "disp_err_x": np.std(twiss["dx"] - design["dx"]), + "disp_err_y": np.std(twiss["dy"] - design["dy"]), + "tune_x": twiss["qx"], + "tune_y": twiss["qy"], + "chromaticity_x": twiss["dqx"], + "chromaticity_y": twiss["dqy"], + } diff --git a/pySC/apps/multipoles.py b/pySC/apps/multipoles.py new file mode 100644 index 00000000..816a1da9 --- /dev/null +++ b/pySC/apps/multipoles.py @@ -0,0 +1,215 @@ +"""Functions for setting systematic and random multipoles on magnets.""" + +import logging +from dataclasses import dataclass +from pathlib import Path + +import numpy as np + +logger = logging.getLogger(__name__) + + +@dataclass +class MultipoleTable: + """Result of reading a multipole table file. + + Attributes: + AB: array of shape (N, 2), columns are [A, B] components. + The nominal coefficient (where the original file had ~1.0) is zeroed. + main_order: 1-based order where the nominal coefficient was found. + main_component: 'A' or 'B', indicating which column held the nominal. + """ + AB: np.ndarray + main_order: int + main_component: str + + +def _multipoles_to_dict(multipoles): + """Convert multipoles to dict form if needed. + + Accepts either a dict of {(component, order): value} or an np.ndarray + of shape (N, 2) where columns are [A, B]. + """ + if isinstance(multipoles, np.ndarray): + if multipoles.ndim != 2 or multipoles.shape[1] != 2: + raise ValueError(f"Expected array of shape (N, 2), got {multipoles.shape}") + return { + (comp, row + 1): multipoles[row, col] + for row in range(multipoles.shape[0]) + for col, comp in enumerate(['A', 'B']) + if multipoles[row, col] != 0.0 + } + return multipoles + + +def _apply_zero_orders(multipoles, zero_orders): + """Remove entries for specified 1-based orders from multipoles dict.""" + if zero_orders is None: + return multipoles + return { + (comp, order): value + for (comp, order), value in multipoles.items() + if order not in zero_orders + } + + +def _extend_magnet_lists(magnet, idx): + """Extend magnet field lists to accommodate index `idx`.""" + if idx >= len(magnet.offset_A): + new_len = idx + 1 + magnet.offset_A.extend([0.0] * (new_len - len(magnet.offset_A))) + magnet.offset_B.extend([0.0] * (new_len - len(magnet.offset_B))) + magnet.A.extend([0.0] * (new_len - len(magnet.A))) + magnet.B.extend([0.0] * (new_len - len(magnet.B))) + magnet.max_order = idx + + +def set_systematic_multipoles(SC, magnet_names, multipoles, + relative_to_nominal=True, + main_order=None, main_component='B', + zero_orders=None): + """Set systematic multipoles on named magnets. + + Args: + SC: SimulatedCommissioning instance + magnet_names: list of magnet names (keys in SC.magnet_settings.magnets) + multipoles: dict of {(component, order): value} or np.ndarray of shape (N, 2) + where columns are [A, B]. Order is 1-based (1=dipole, 2=quad, etc.) + relative_to_nominal: if True, scale value by the nominal main field strength + main_order: 1-based order of the nominal component. + Default: magnet.max_order + 1 (i.e. the highest existing order) + main_component: 'A' or 'B'. Default: 'B' + zero_orders: list of 1-based orders to exclude before applying + """ + multipoles = _multipoles_to_dict(multipoles) + multipoles = _apply_zero_orders(multipoles, zero_orders) + + for name in magnet_names: + magnet = SC.magnet_settings.magnets[name] + + if relative_to_nominal: + order_idx = (main_order - 1) if main_order is not None else magnet.max_order + main_field = abs(SC.lattice.get_magnet_component( + magnet.sim_index, main_component, order_idx, use_design=True + )) + if main_field == 0: + logger.warning("Magnet '%s' has zero design main field, skipping relative scaling", name) + main_field = 1.0 + + for (component, order), value in multipoles.items(): + idx = order - 1 + _extend_magnet_lists(magnet, idx) + + scaled = value * main_field if relative_to_nominal else value + + if component == 'A': + magnet.offset_A[idx] += scaled + elif component == 'B': + magnet.offset_B[idx] += scaled + else: + raise ValueError(f"Component must be 'A' or 'B', got '{component}'") + + magnet.update() + logger.debug("Set systematic multipoles on '%s'", name) + + +def set_random_multipoles(SC, magnet_names, multipoles, zero_orders=None): + """Set random multipoles on named magnets. + + Uses truncated normal distribution (2-sigma) for realistic magnet errors. + + Args: + SC: SimulatedCommissioning instance + magnet_names: list of magnet names + multipoles: dict of {(component, order): rms_value} or np.ndarray of shape (N, 2) + zero_orders: list of 1-based orders to exclude before applying + """ + multipoles = _multipoles_to_dict(multipoles) + multipoles = _apply_zero_orders(multipoles, zero_orders) + + for name in magnet_names: + magnet = SC.magnet_settings.magnets[name] + + for (component, order), rms_value in multipoles.items(): + idx = order - 1 + _extend_magnet_lists(magnet, idx) + + value = SC.rng.normal_trunc(loc=0, scale=rms_value, sigma_truncate=2) + + if component == 'A': + magnet.offset_A[idx] += value + elif component == 'B': + magnet.offset_B[idx] += value + else: + raise ValueError(f"Component must be 'A' or 'B', got '{component}'") + + magnet.update() + logger.debug("Set random multipoles on '%s'", name) + + +def read_multipole_table(filepath): + """Read a multipole table file and return a MultipoleTable. + + Parses tab-or-space-delimited files with format:: + + # n PolynomA(n) PolynomB(n) + 0 0.0 -0.171 + 1 0.0 2.998 + + The nominal coefficient (the cell closest to 1.0) is identified and zeroed + in the returned array to prevent double-counting when used with excitation + scaling (Phase 2). This matches MATLAB's ``applySysMultipoles`` behavior. + + Args: + filepath: path to the multipole table file + + Returns: + MultipoleTable with AB array (nominal zeroed), main_order (1-based), + and main_component ('A' or 'B') + """ + filepath = Path(filepath) + rows = [] + with open(filepath) as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + parts = line.split() + if len(parts) < 3: + continue + try: + _order = int(parts[0]) + a_val = float(parts[1]) + b_val = float(parts[2]) + rows.append((a_val, b_val)) + except ValueError: + continue + + if not rows: + raise ValueError(f"No valid data rows found in {filepath}") + + AB = np.array(rows) # shape (N, 2), cols = [A, B] + + # Find the cell closest to 1.0 + distances = np.abs(AB - 1.0) + min_idx = np.unravel_index(np.argmin(distances), distances.shape) + min_dist = distances[min_idx] + + if min_dist > 1e-6: + # No cell near 1.0 — fall back to largest magnitude + mag_idx = np.unravel_index(np.argmax(np.abs(AB)), AB.shape) + logger.warning( + "No coefficient close to 1.0 found in %s; defaulting to largest " + "magnitude at row %d, col %d", filepath, mag_idx[0], mag_idx[1] + ) + row, col = int(mag_idx[0]), int(mag_idx[1]) + else: + row, col = int(min_idx[0]), int(min_idx[1]) + + main_order = row + 1 # 1-based + main_component = 'A' if col == 0 else 'B' + + # Zero the nominal coefficient to prevent double-counting + AB[row, col] = 0.0 + + return MultipoleTable(AB=AB, main_order=main_order, main_component=main_component) diff --git a/tests/apps/test_dynamic_aperture.py b/tests/apps/test_dynamic_aperture.py new file mode 100644 index 00000000..3a00565f --- /dev/null +++ b/tests/apps/test_dynamic_aperture.py @@ -0,0 +1,130 @@ +"""Tests for pySC.apps.dynamic_aperture — DA scan and helpers.""" +import numpy as np +import pytest +from unittest.mock import MagicMock, patch + +from pySC.apps.dynamic_aperture import ( + dynamic_aperture, + _is_lost, + _shoelace_area, +) + + +class TestShoelaceArea: + """Test _shoelace_area polygon area computation.""" + + def test_unit_square(self): + x = np.array([0, 1, 1, 0]) + y = np.array([0, 0, 1, 1]) + assert _shoelace_area(x, y) == pytest.approx(1.0) + + def test_triangle(self): + x = np.array([0, 2, 0]) + y = np.array([0, 0, 3]) + assert _shoelace_area(x, y) == pytest.approx(3.0) + + def test_degenerate_line(self): + x = np.array([0, 1, 2]) + y = np.array([0, 0, 0]) + assert _shoelace_area(x, y) == pytest.approx(0.0) + + def test_circle_approximation(self): + """Regular n-gon area approaches pi*r^2 for large n.""" + n = 1000 + r = 1.0 + theta = np.linspace(0, 2 * np.pi, n, endpoint=False) + x = r * np.cos(theta) + y = r * np.sin(theta) + np.testing.assert_allclose(_shoelace_area(x, y), np.pi * r**2, rtol=1e-4) + + +class TestIsLost: + """Test _is_lost particle tracking wrapper.""" + + def test_stable_particle(self): + SC = MagicMock() + SC.lattice.track.return_value = np.zeros((1, 6)) + assert not _is_lost(SC, radius=1e-3, theta=0, x0=0, y0=0, n_turns=100) + + def test_lost_particle(self): + SC = MagicMock() + result = np.full((1, 6), np.nan) + SC.lattice.track.return_value = result + assert _is_lost(SC, radius=0.1, theta=0, x0=0, y0=0, n_turns=100) + + def test_initial_conditions_include_offset(self): + SC = MagicMock() + SC.lattice.track.return_value = np.zeros((1, 6)) + + _is_lost(SC, radius=1e-3, theta=np.pi / 4, x0=0.001, y0=-0.002, n_turns=10) + + bunch = SC.lattice.track.call_args.args[0] + expected_x = 1e-3 * np.cos(np.pi / 4) + 0.001 + expected_y = 1e-3 * np.sin(np.pi / 4) - 0.002 + np.testing.assert_allclose(bunch[0, 0], expected_x) + np.testing.assert_allclose(bunch[0, 2], expected_y) + + +class TestDynamicAperture: + """Test dynamic_aperture scan with mocked tracking.""" + + def _make_sc(self, loss_radius=0.01): + """SC mock where particles are lost beyond loss_radius.""" + SC = MagicMock() + SC.lattice.get_orbit.return_value = np.zeros((2, 1)) + + def track_fn(bunch, n_turns=1000): + r = np.sqrt(bunch[0, 0] ** 2 + bunch[0, 2] ** 2) + if r > loss_radius: + return np.full_like(bunch, np.nan) + return bunch + + SC.lattice.track.side_effect = track_fn + return SC + + def test_returns_expected_keys(self): + SC = self._make_sc() + result = dynamic_aperture(SC, n_angles=4, n_turns=10, accuracy=1e-4) + for key in ("radii", "angles", "area", "x", "y"): + assert key in result + + def test_radii_shape_matches_angles(self): + n = 8 + SC = self._make_sc() + result = dynamic_aperture(SC, n_angles=n, n_turns=10, accuracy=1e-4) + assert result["radii"].shape == (n,) + assert result["angles"].shape == (n,) + + def test_radii_close_to_loss_radius(self): + loss_r = 0.01 + SC = self._make_sc(loss_radius=loss_r) + result = dynamic_aperture( + SC, n_angles=4, n_turns=10, + accuracy=1e-5, initial_radius=1e-3, + ) + # All radii should converge near loss_radius + np.testing.assert_allclose(result["radii"], loss_r, atol=1e-4) + + def test_area_positive(self): + SC = self._make_sc(loss_radius=0.01) + result = dynamic_aperture(SC, n_angles=16, n_turns=10, accuracy=1e-5) + assert result["area"] > 0 + + def test_zero_aperture_when_always_lost(self): + """If particles are lost at any radius, DA is zero.""" + SC = MagicMock() + SC.lattice.get_orbit.return_value = np.zeros((2, 1)) + SC.lattice.track.return_value = np.full((1, 6), np.nan) + + result = dynamic_aperture(SC, n_angles=4, n_turns=10, accuracy=1e-4) + np.testing.assert_array_equal(result["radii"], 0) + assert result["area"] == pytest.approx(0.0) + + def test_center_on_orbit_false(self): + """With center_on_orbit=False, orbit is not queried.""" + SC = self._make_sc(loss_radius=0.005) + result = dynamic_aperture( + SC, n_angles=4, n_turns=10, + accuracy=1e-5, center_on_orbit=False, + ) + SC.lattice.get_orbit.assert_not_called() diff --git a/tests/apps/test_loco.py b/tests/apps/test_loco.py new file mode 100644 index 00000000..2da0abca --- /dev/null +++ b/tests/apps/test_loco.py @@ -0,0 +1,172 @@ +"""Tests for pySC.apps.loco — LOCO helpers and fitting.""" +import numpy as np +import pytest +from unittest.mock import MagicMock, patch + +from pySC.apps.loco import ( + _objective, + _get_parameters_mask, + calculate_jacobian, + loco_fit, + apply_loco_corrections, +) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +class TestGetParametersMask: + """Test _get_parameters_mask boolean mask builder.""" + + def test_all_flags_true(self): + mask = _get_parameters_mask([True, True, True], [3, 4, 5]) + assert mask.shape == (12,) + assert mask.all() + + def test_all_flags_false(self): + mask = _get_parameters_mask([False, False, False], [3, 4, 5]) + assert not mask.any() + + def test_partial_flags(self): + mask = _get_parameters_mask([True, False, True], [2, 3, 2]) + expected = np.array([True, True, False, False, False, True, True]) + np.testing.assert_array_equal(mask, expected) + + def test_single_group(self): + mask = _get_parameters_mask([True], [5]) + assert mask.shape == (5,) + assert mask.all() + + +class TestObjective: + """Test _objective weighted residual computation.""" + + def test_zero_params_returns_weighted_residual(self): + """With zero parameter corrections, residual == orm_residuals * weights.""" + orm_res = np.ones((4, 3)) + jac = np.zeros((2, 4, 3)) + params = np.zeros(2) + result = _objective(params, orm_res, jac, weights=1) + np.testing.assert_allclose(result, orm_res.ravel()) + + def test_perfect_correction_gives_zero(self): + """If Jacobian * params == orm_residuals, residual is zero.""" + orm_res = np.ones((2, 2)) + # Single parameter, Jacobian entry is all ones + jac = np.ones((1, 2, 2)) + params = np.array([1.0]) + result = _objective(params, orm_res, jac, weights=1) + np.testing.assert_allclose(result, np.zeros(4), atol=1e-14) + + def test_weights_scale_residual(self): + """Weights multiply the residual row-wise.""" + orm_res = np.ones((3, 2)) + jac = np.zeros((1, 3, 2)) + params = np.zeros(1) + weights = np.array([1.0, 4.0, 9.0]) + result = _objective(params, orm_res, jac, weights) + # sqrt(weights) applied row-wise: [1, 2, 3] + expected = np.array([1, 1, 2, 2, 3, 3], dtype=float) + np.testing.assert_allclose(result, expected) + + +# --------------------------------------------------------------------------- +# Public API (with mocks for SC internals) +# --------------------------------------------------------------------------- + +class TestCalculateJacobian: + """Test calculate_jacobian structure and dimensions.""" + + def test_corrector_and_bpm_blocks(self): + """Corrector-gain and BPM-gain Jacobian blocks have correct structure.""" + SC = MagicMock() + n_bpms, n_corr = 4, 3 + orm_model = np.random.RandomState(0).randn(n_bpms, n_corr) + + with patch("pySC.apps.loco.measure_OrbitResponseMatrix") as mock_orm: + mock_orm.return_value = orm_model + 0.001 # small perturbation + jac = calculate_jacobian( + SC, orm_model, quad_control_names=["Q1"], + include_correctors=True, include_bpms=True, + include_dispersion=False, use_design=True, + ) + + # 1 quad + 3 correctors + 4 bpms = 8 + assert jac.shape == (1 + n_corr + n_bpms, n_bpms, n_corr) + + def test_no_correctors_no_bpms(self): + """With correctors and BPMs disabled, only quad entries remain.""" + SC = MagicMock() + orm_model = np.ones((2, 2)) + + with patch("pySC.apps.loco.measure_OrbitResponseMatrix") as mock_orm: + mock_orm.return_value = orm_model * 1.01 + jac = calculate_jacobian( + SC, orm_model, quad_control_names=["Q1", "Q2"], + include_correctors=False, include_bpms=False, + include_dispersion=False, + ) + + assert jac.shape[0] == 2 # only quads + + +class TestLocoFit: + """Test loco_fit returns correct structure.""" + + def test_returns_all_correction_keys(self): + SC = MagicMock() + n_bpms, n_corr, n_quad = 4, 3, 2 + n_total = n_quad + n_corr + n_bpms + orm_meas = np.random.RandomState(1).randn(n_bpms, n_corr) + orm_model = orm_meas + 0.01 + jac = np.random.RandomState(2).randn(n_total, n_bpms, n_corr) + + result = loco_fit(SC, orm_meas, orm_model, jac) + + assert "quad_corrections" in result + assert "corrector_corrections" in result + assert "bpm_corrections" in result + assert result["quad_corrections"].shape == (n_quad,) + assert result["corrector_corrections"].shape == (n_corr,) + assert result["bpm_corrections"].shape == (n_bpms,) + + def test_fit_subset(self): + """Fitting only quads leaves corrector/bpm corrections at zero.""" + SC = MagicMock() + n_bpms, n_corr, n_quad = 3, 2, 2 + n_total = n_quad + n_corr + n_bpms + orm_meas = np.eye(n_bpms, n_corr) * 0.01 + orm_model = np.zeros((n_bpms, n_corr)) + jac = np.random.RandomState(3).randn(n_total, n_bpms, n_corr) * 0.1 + + result = loco_fit( + SC, orm_meas, orm_model, jac, + fit_quads=True, fit_correctors=False, fit_bpms=False, + ) + + np.testing.assert_array_equal(result["corrector_corrections"], 0) + np.testing.assert_array_equal(result["bpm_corrections"], 0) + + +class TestApplyLocoCorrections: + """Test apply_loco_corrections calls magnet_settings correctly.""" + + def test_applies_fraction(self): + SC = MagicMock() + SC.magnet_settings.get.return_value = 1.0 + + corrections = { + "quad_corrections": np.array([0.1, -0.2]), + "corrector_corrections": np.array([]), + "bpm_corrections": np.array([]), + } + + apply_loco_corrections(SC, corrections, ["Q1", "Q2"], fraction=0.5) + + calls = SC.magnet_settings.set.call_args_list + assert len(calls) == 2 + # Q1: 1.0 - 0.5*0.1 = 0.95 + np.testing.assert_allclose(calls[0].args[1], 0.95) + # Q2: 1.0 - 0.5*(-0.2) = 1.1 + np.testing.assert_allclose(calls[1].args[1], 1.1) diff --git a/tests/apps/test_multipoles.py b/tests/apps/test_multipoles.py new file mode 100644 index 00000000..b5f7a6b4 --- /dev/null +++ b/tests/apps/test_multipoles.py @@ -0,0 +1,405 @@ +"""Tests for pySC.apps.multipoles — systematic and random multipole assignment.""" +import numpy as np +import pytest +from unittest.mock import MagicMock + +from pySC.apps.multipoles import ( + _extend_magnet_lists, + _multipoles_to_dict, + _apply_zero_orders, + set_systematic_multipoles, + set_random_multipoles, + read_multipole_table, + MultipoleTable, +) + + +class FakeMagnet: + """Lightweight stand-in for a magnet settings object.""" + + def __init__(self, max_order=2): + self.max_order = max_order + self.offset_A = [0.0] * (max_order + 1) + self.offset_B = [0.0] * (max_order + 1) + self.A = [0.0] * (max_order + 1) + self.B = [0.0] * (max_order + 1) + self.sim_index = 0 + self.update_count = 0 + + def update(self): + self.update_count += 1 + + +class TestExtendMagnetLists: + """Test _extend_magnet_lists grows field lists correctly.""" + + def test_extends_when_needed(self): + mag = FakeMagnet(max_order=1) + assert len(mag.offset_A) == 2 + _extend_magnet_lists(mag, 4) + assert len(mag.offset_A) == 5 + assert len(mag.offset_B) == 5 + assert len(mag.A) == 5 + assert len(mag.B) == 5 + assert mag.max_order == 4 + + def test_no_extend_when_within_bounds(self): + mag = FakeMagnet(max_order=5) + original_len = len(mag.offset_A) + _extend_magnet_lists(mag, 3) + assert len(mag.offset_A) == original_len + # max_order should NOT change when no extension needed + assert mag.max_order == 5 + + def test_extension_pads_with_zeros(self): + mag = FakeMagnet(max_order=0) + mag.offset_A = [1.0] + _extend_magnet_lists(mag, 2) + assert mag.offset_A == [1.0, 0.0, 0.0] + + +class TestMultipolesToDict: + """Test numpy array to dict conversion.""" + + def test_ndarray_conversion(self): + arr = np.array([[0.0, 0.1], [0.0, 1.0], [0.005, -0.003]]) + result = _multipoles_to_dict(arr) + assert result == { + ('B', 1): 0.1, + ('B', 2): 1.0, + ('A', 3): 0.005, + ('B', 3): -0.003, + } + + def test_skips_zeros(self): + arr = np.array([[0.0, 0.0], [0.0, 0.5]]) + result = _multipoles_to_dict(arr) + assert result == {('B', 2): 0.5} + + def test_dict_passthrough(self): + d = {('A', 1): 0.5} + assert _multipoles_to_dict(d) is d + + def test_bad_shape_raises(self): + with pytest.raises(ValueError, match="shape"): + _multipoles_to_dict(np.array([1.0, 2.0])) + + +class TestApplyZeroOrders: + """Test zero_orders filtering.""" + + def test_removes_specified_orders(self): + multipoles = {('A', 1): 0.1, ('B', 1): 0.2, ('B', 2): 0.3, ('A', 3): 0.4} + result = _apply_zero_orders(multipoles, [1, 3]) + assert result == {('B', 2): 0.3} + + def test_none_returns_unchanged(self): + multipoles = {('B', 1): 0.5} + assert _apply_zero_orders(multipoles, None) is multipoles + + +class TestSetSystematicMultipoles: + """Test set_systematic_multipoles applies values correctly.""" + + def _make_sc(self, magnet_names): + SC = MagicMock() + magnets = {} + for name in magnet_names: + magnets[name] = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = magnets + SC.lattice.get_magnet_component.return_value = 10.0 # nominal field + return SC + + def test_relative_scaling(self): + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 3): 0.01}, + relative_to_nominal=True, + ) + # 0.01 * 10.0 = 0.1 + assert SC.magnet_settings.magnets["Q1"].offset_B[2] == pytest.approx(0.1) + + def test_absolute_scaling(self): + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("A", 2): 0.05}, + relative_to_nominal=False, + ) + assert SC.magnet_settings.magnets["Q1"].offset_A[1] == pytest.approx(0.05) + + def test_invalid_component_raises(self): + SC = self._make_sc(["Q1"]) + with pytest.raises(ValueError, match="Component must be"): + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("C", 1): 0.1}, + relative_to_nominal=False, + ) + + def test_update_called_per_magnet(self): + SC = self._make_sc(["Q1", "Q2"]) + set_systematic_multipoles( + SC, ["Q1", "Q2"], + multipoles={("B", 1): 0.001}, + relative_to_nominal=False, + ) + for name in ["Q1", "Q2"]: + assert SC.magnet_settings.magnets[name].update_count == 1 + + def test_zero_main_field_uses_unity(self): + """When nominal field is zero, scaling falls back to 1.0.""" + SC = self._make_sc(["Q1"]) + SC.lattice.get_magnet_component.return_value = 0.0 + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.5}, + relative_to_nominal=True, + ) + # main_field set to 1.0, so value = 0.5 * 1.0 + assert SC.magnet_settings.magnets["Q1"].offset_B[0] == pytest.approx(0.5) + + def test_accumulation(self): + """Calling twice should accumulate, not overwrite (bug fix).""" + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.001}, + relative_to_nominal=False, + ) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.002}, + relative_to_nominal=False, + ) + assert SC.magnet_settings.magnets["Q1"].offset_B[0] == pytest.approx(0.003) + + def test_accumulation_different_components(self): + """Accumulation works across A and B independently.""" + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("A", 2): 0.01, ("B", 2): 0.02}, + relative_to_nominal=False, + ) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("A", 2): 0.03, ("B", 2): 0.04}, + relative_to_nominal=False, + ) + assert SC.magnet_settings.magnets["Q1"].offset_A[1] == pytest.approx(0.04) + assert SC.magnet_settings.magnets["Q1"].offset_B[1] == pytest.approx(0.06) + + def test_main_order_param(self): + """Explicit main_order overrides magnet.max_order for field lookup.""" + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.01}, + relative_to_nominal=True, + main_order=1, # dipole + ) + # Should look up index 0 (main_order - 1 = 0), not max_order=2 + SC.lattice.get_magnet_component.assert_called_with(0, 'B', 0, use_design=True) + + def test_main_component_A(self): + """main_component='A' looks up the A component for scaling.""" + SC = self._make_sc(["Q1"]) + SC.lattice.get_magnet_component.return_value = 5.0 + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.1}, + relative_to_nominal=True, + main_component='A', + ) + SC.lattice.get_magnet_component.assert_called_with(0, 'A', 2, use_design=True) + # 0.1 * 5.0 = 0.5 + assert SC.magnet_settings.magnets["Q1"].offset_B[0] == pytest.approx(0.5) + + def test_numpy_array_input(self): + """np.ndarray input produces same result as equivalent dict.""" + SC = self._make_sc(["Q1"]) + arr = np.array([[0.0, 0.1], [0.0, 1.0], [0.005, -0.003]]) + set_systematic_multipoles(SC, ["Q1"], multipoles=arr, relative_to_nominal=False) + mag = SC.magnet_settings.magnets["Q1"] + assert mag.offset_B[0] == pytest.approx(0.1) + assert mag.offset_B[1] == pytest.approx(1.0) + assert mag.offset_A[2] == pytest.approx(0.005) + assert mag.offset_B[2] == pytest.approx(-0.003) + + def test_zero_orders(self): + """zero_orders excludes specified orders.""" + SC = self._make_sc(["Q1"]) + set_systematic_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 0.1, ("B", 2): 0.2, ("B", 3): 0.3}, + relative_to_nominal=False, + zero_orders=[1, 2], + ) + mag = SC.magnet_settings.magnets["Q1"] + assert mag.offset_B[0] == pytest.approx(0.0) + assert mag.offset_B[1] == pytest.approx(0.0) + assert mag.offset_B[2] == pytest.approx(0.3) + + +class TestSetRandomMultipoles: + """Test set_random_multipoles adds random offsets.""" + + def test_adds_random_values(self): + SC = MagicMock() + mag = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = {"Q1": mag} + SC.rng.normal_trunc.return_value = 0.123 + + set_random_multipoles(SC, ["Q1"], multipoles={("B", 2): 1.0}) + + SC.rng.normal_trunc.assert_called_once_with(loc=0, scale=1.0, sigma_truncate=2) + assert mag.offset_B[1] == pytest.approx(0.123) + + def test_accumulates_on_existing(self): + SC = MagicMock() + mag = FakeMagnet(max_order=2) + mag.offset_A[0] = 0.5 + SC.magnet_settings.magnets = {"Q1": mag} + SC.rng.normal_trunc.return_value = 0.1 + + set_random_multipoles(SC, ["Q1"], multipoles={("A", 1): 1.0}) + + assert mag.offset_A[0] == pytest.approx(0.6) + + def test_invalid_component_raises(self): + SC = MagicMock() + SC.magnet_settings.magnets = {"Q1": FakeMagnet()} + SC.rng.normal_trunc.return_value = 0.0 + with pytest.raises(ValueError, match="Component must be"): + set_random_multipoles(SC, ["Q1"], multipoles={("X", 1): 1.0}) + + def test_numpy_array_input(self): + SC = MagicMock() + mag = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = {"Q1": mag} + SC.rng.normal_trunc.return_value = 0.05 + + arr = np.array([[0.0, 0.5], [0.3, 0.0]]) + set_random_multipoles(SC, ["Q1"], multipoles=arr) + + # Should be called twice: ('B',1)=0.5 and ('A',2)=0.3 + assert SC.rng.normal_trunc.call_count == 2 + + def test_zero_orders(self): + SC = MagicMock() + mag = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = {"Q1": mag} + SC.rng.normal_trunc.return_value = 0.1 + + set_random_multipoles( + SC, ["Q1"], + multipoles={("B", 1): 1.0, ("B", 2): 1.0, ("A", 3): 1.0}, + zero_orders=[1], + ) + # Only orders 2 and 3 should be applied + assert SC.rng.normal_trunc.call_count == 2 + assert mag.offset_B[0] == pytest.approx(0.0) # order 1 zeroed out + + +class TestReadMultipoleTable: + """Test read_multipole_table parses files correctly.""" + + def test_basic_parse(self, tmp_path): + table_file = tmp_path / "quad_multipoles.txt" + table_file.write_text( + "# n PolynomA(n) PolynomB(n)\n" + "0 0.0 -0.00171\n" + "1 0.0 1.0\n" + "2 0.0 -0.003\n" + ) + result = read_multipole_table(table_file) + assert isinstance(result, MultipoleTable) + assert result.AB.shape == (3, 2) + assert result.main_order == 2 # 1-based, row 1 + assert result.main_component == 'B' + # Nominal zeroed + assert result.AB[1, 1] == 0.0 + # Other values preserved + assert result.AB[0, 1] == pytest.approx(-0.00171) + assert result.AB[2, 1] == pytest.approx(-0.003) + + def test_skew_magnet(self, tmp_path): + """Detects A-component nominal for a skew magnet.""" + table_file = tmp_path / "skew_quad.txt" + table_file.write_text( + "0 0.0 0.0\n" + "1 1.0 0.0\n" + "2 0.001 0.0\n" + ) + result = read_multipole_table(table_file) + assert result.main_order == 2 + assert result.main_component == 'A' + assert result.AB[1, 0] == 0.0 # nominal zeroed + + def test_comments_and_blanks_ignored(self, tmp_path): + table_file = tmp_path / "with_comments.txt" + table_file.write_text( + "# Header line\n" + "\n" + "# Another comment\n" + "0 0.0 0.5\n" + "1 0.0 1.0\n" + ) + result = read_multipole_table(table_file) + assert result.AB.shape == (2, 2) + assert result.main_order == 2 + + def test_empty_file_raises(self, tmp_path): + table_file = tmp_path / "empty.txt" + table_file.write_text("# only comments\n") + with pytest.raises(ValueError, match="No valid data rows"): + read_multipole_table(table_file) + + def test_fallback_to_largest_magnitude(self, tmp_path): + """When no cell is close to 1.0, falls back to largest magnitude.""" + table_file = tmp_path / "no_unity.txt" + table_file.write_text( + "0 0.0 0.0\n" + "1 0.0 50.0\n" + "2 0.001 0.003\n" + ) + result = read_multipole_table(table_file) + assert result.main_order == 2 # row 1 has 50.0 (largest) + assert result.main_component == 'B' + assert result.AB[1, 1] == 0.0 # nominal zeroed + + def test_tab_delimited(self, tmp_path): + table_file = tmp_path / "tabs.txt" + table_file.write_text("0\t0.0\t0.5\n1\t0.0\t1.0\n") + result = read_multipole_table(table_file) + assert result.main_order == 2 + assert result.main_component == 'B' + + def test_integration_with_set_systematic(self, tmp_path): + """Read a table and feed it directly to set_systematic_multipoles.""" + table_file = tmp_path / "sext.txt" + table_file.write_text( + "0 0.0 -0.001\n" + "1 0.0 0.0\n" + "2 0.0 1.0\n" + ) + table = read_multipole_table(table_file) + + SC = MagicMock() + mag = FakeMagnet(max_order=2) + SC.magnet_settings.magnets = {"S1": mag} + SC.lattice.get_magnet_component.return_value = 100.0 + + set_systematic_multipoles( + SC, ["S1"], + multipoles=table.AB, + relative_to_nominal=True, + main_order=table.main_order, + main_component=table.main_component, + ) + # Dipole error: -0.001 * 100.0 = -0.1 + assert mag.offset_B[0] == pytest.approx(-0.1) + # Nominal (order 3, sextupole) was zeroed in table, so no contribution + assert mag.offset_B[2] == pytest.approx(0.0)