diff --git a/pySC/__init__.py b/pySC/__init__.py index ff43a496..8ed61af6 100644 --- a/pySC/__init__.py +++ b/pySC/__init__.py @@ -15,7 +15,7 @@ from .apps.measurements import measure_bba from .apps.measurements import measure_ORM from .apps.measurements import measure_dispersion -from .tuning.pySC_interface import pySCInjectionInterface, pySCOrbitInterface +from .tuning.pySC_interface import pySCInjectionInterface, pySCOrbitInterface, pySCPseudoOrbitInterface import logging import sys @@ -46,3 +46,4 @@ def disable_pySC_rich(): # the model_rebuild is "almost certainly"? triggered. pySCInjectionInterface.model_rebuild() pySCOrbitInterface.model_rebuild() +pySCPseudoOrbitInterface.model_rebuild() diff --git a/pySC/core/bpm_system.py b/pySC/core/bpm_system.py index fbf43d9b..c0ae6108 100644 --- a/pySC/core/bpm_system.py +++ b/pySC/core/bpm_system.py @@ -172,6 +172,19 @@ def capture_injection(self, n_turns=1, bba=True, subtract_reference=True, use_de else: return fake_trajectory_x_tbt, fake_trajectory_y_tbt + def capture_pseudo_orbit(self, n_turns=1, bba=True, subtract_reference=True, + use_design=False, return_transmission=False): + result = self.capture_injection( + n_turns=n_turns, bba=bba, subtract_reference=subtract_reference, + use_design=use_design, return_transmission=return_transmission + ) + if return_transmission: + x, y, transmission = result + return np.nanmean(x, axis=1), np.nanmean(y, axis=1), transmission + else: + x, y = result + return np.nanmean(x, axis=1), np.nanmean(y, axis=1) + def capture_kick(self, n_turns=1, kick_px=0, kick_py=0, bba=True, subtract_reference=True, use_design=False) -> tuple[np.ndarray, np.ndarray]: ''' Simulates an orbit reading, after kicking a stored beam, from the BPMs, applying calibration errors, offsets/rolls, and noise. diff --git a/pySC/tuning/pySC_interface.py b/pySC/tuning/pySC_interface.py index 9df4b150..613497d3 100644 --- a/pySC/tuning/pySC_interface.py +++ b/pySC/tuning/pySC_interface.py @@ -63,3 +63,13 @@ def get_ref_orbit(self) -> tuple[np.ndarray, np.ndarray]: x_ref = np.repeat(self.SC.bpm_system.reference_x[:, np.newaxis], self.n_turns, axis=1) y_ref = np.repeat(self.SC.bpm_system.reference_y[:, np.newaxis], self.n_turns, axis=1) return x_ref.flatten(order='F'), y_ref.flatten(order='F') + +class pySCPseudoOrbitInterface(pySCOrbitInterface): + SC: "SimulatedCommissioning" = Field(repr=False) + n_turns: int = 1 + + def get_orbit(self) -> tuple[np.ndarray, np.ndarray]: + return self.SC.bpm_system.capture_pseudo_orbit( + n_turns=self.n_turns, use_design=self.use_design, + bba=self.bba, subtract_reference=self.subtract_reference + ) diff --git a/pySC/tuning/tuning_core.py b/pySC/tuning/tuning_core.py index 56bb02cc..0d00c891 100644 --- a/pySC/tuning/tuning_core.py +++ b/pySC/tuning/tuning_core.py @@ -10,7 +10,7 @@ from .c_minus import CMinus from .rf_tuning import RF_tuning from ..core.control import IndivControl -from .pySC_interface import pySCInjectionInterface, pySCOrbitInterface +from .pySC_interface import pySCInjectionInterface, pySCOrbitInterface, pySCPseudoOrbitInterface from ..apps import orbit_correction import numpy as np @@ -186,61 +186,41 @@ def correct_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=10 return - def correct_orbit(self, n_reps=1, method='tikhonov', parameter=100, gain=1, virtual=False): + def correct_orbit(self, n_reps=1, method='tikhonov', parameter=100, gain=1, virtual=False, + pseudo=False, n_turns=1): RM_name = 'orbit' self.fetch_response_matrix(RM_name, orbit=True) response_matrix = self.response_matrix[RM_name] response_matrix.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms) SC = self._parent - interface = pySCOrbitInterface(SC=SC) + if pseudo: + interface = pySCPseudoOrbitInterface(SC=SC, n_turns=n_turns) + else: + interface = pySCOrbitInterface(SC=SC) for _ in range(n_reps): _ = orbit_correction(interface=interface, response_matrix=response_matrix, reference=None, method=method, parameter=parameter, virtual=virtual, gain=gain, apply=True) - orbit_x, orbit_y = SC.bpm_system.capture_orbit() - rms_x = np.nanstd(orbit_x) * 1e6 - rms_y = np.nanstd(orbit_y) * 1e6 - logger.info(f'Corrected orbit: {rms_x=:.1f} um, {rms_y=:.1f} um.') + if pseudo: + orbit_x, orbit_y = SC.bpm_system.capture_pseudo_orbit(n_turns=n_turns) + rms_x = np.nanstd(orbit_x) * 1e6 + rms_y = np.nanstd(orbit_y) * 1e6 + logger.info(f'Corrected pseudo-orbit ({n_turns} turns): {rms_x=:.1f} um, {rms_y=:.1f} um.') + else: + orbit_x, orbit_y = SC.bpm_system.capture_orbit() + rms_x = np.nanstd(orbit_x) * 1e6 + rms_y = np.nanstd(orbit_y) * 1e6 + logger.info(f'Corrected orbit: {rms_x=:.1f} um, {rms_y=:.1f} um.') return - # def correct_pseudo_orbit_at_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=100, gain=1, zerosum=False): - # RM_name = 'orbit' - # self.fetch_response_matrix(RM_name, orbit=True) - # RM = self.response_matrix[RM_name] - # RM.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms) - - # for _ in range(n_reps): - # trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns) - # pseudo_orbit_x = np.nanmean(trajectory_x, axis=1) - # pseudo_orbit_y = np.nanmean(trajectory_y, axis=1) - # pseudo_orbit = np.concat((pseudo_orbit_x, pseudo_orbit_y)) - - # trims = RM.solve(pseudo_orbit, method=method, parameter=parameter, zerosum=zerosum) - - # settings = self._parent.magnet_settings - # for control_name, trim in zip(self.CORR, trims): - # setpoint = settings.get(control_name=control_name) - gain * trim - # settings.set(control_name=control_name, setpoint=setpoint) - - # trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns) - # trajectory_x = trajectory_x.flatten('F') - # trajectory_y = trajectory_y.flatten('F') - # rms_x = np.nanstd(trajectory_x) * 1e6 - # rms_y = np.nanstd(trajectory_y) * 1e6 - # bad_readings = sum(np.isnan(trajectory_x)) - # good_turns = (len(trajectory_x) - bad_readings) / len(self._parent.bpm_system.indices) - # logger.info(f'Corrected injection: transmission through {good_turns:.2f}/{n_turns} turns, {rms_x=:.1f} um, {rms_y=:.1f} um.') - - # return - def fit_dispersive_orbit(self): SC = self._parent response = measure_RFFrequencyOrbitResponse(SC=SC, use_design=True) x,y = SC.bpm_system.capture_orbit(bba=False, subtract_reference=False, use_design=False) - xy = np.concat((x.flatten(order='F'), y.flatten(order='F'))) + xy = np.concatenate((x.flatten(order='F'), y.flatten(order='F'))) return np.dot(xy, response) / np.dot(response, response) diff --git a/tests/core/test_bpm_system.py b/tests/core/test_bpm_system.py index c6013bfc..3f6ee165 100644 --- a/tests/core/test_bpm_system.py +++ b/tests/core/test_bpm_system.py @@ -302,3 +302,39 @@ def test_capture_injection_design_mode(sc): x_design2, y_design2 = bpm.capture_injection(n_turns=1, use_design=True) np.testing.assert_array_equal(x_design, x_design2) np.testing.assert_array_equal(y_design, y_design2) + + +# --------------------------------------------------------------------------- +# capture_pseudo_orbit tests +# --------------------------------------------------------------------------- + +@pytest.mark.slow +def test_capture_pseudo_orbit_shape(sc): + """Output shape is (n_bpms,) for each of x, y — averaged across turns.""" + bpm = sc.bpm_system + n_turns = 3 + x, y = bpm.capture_pseudo_orbit(n_turns=n_turns, bba=False, subtract_reference=False, use_design=True) + assert x.shape == (len(bpm.indices),) + assert y.shape == (len(bpm.indices),) + + +@pytest.mark.slow +def test_capture_pseudo_orbit_with_transmission(sc): + """return_transmission=True returns a 3-tuple (x, y, transmission).""" + bpm = sc.bpm_system + result = bpm.capture_pseudo_orbit(n_turns=2, return_transmission=True, use_design=True) + assert len(result) == 3 + x, y, transmission = result + assert x.shape == (len(bpm.indices),) + assert y.shape == (len(bpm.indices),) + + +@pytest.mark.slow +def test_capture_pseudo_orbit_single_turn(sc): + """n_turns=1 degenerates to a squeezed single-turn injection reading.""" + bpm = sc.bpm_system + x_pseudo, y_pseudo = bpm.capture_pseudo_orbit(n_turns=1, bba=False, subtract_reference=False, use_design=True) + x_inj, y_inj = bpm.capture_injection(n_turns=1, bba=False, subtract_reference=False, use_design=True) + # With use_design=True (no RNG noise), pseudo-orbit of 1 turn == injection squeezed + np.testing.assert_allclose(x_pseudo, x_inj.squeeze()) + np.testing.assert_allclose(y_pseudo, y_inj.squeeze()) diff --git a/tests/tuning/test_tuning_core.py b/tests/tuning/test_tuning_core.py index f1aaa358..68e637a8 100644 --- a/tests/tuning/test_tuning_core.py +++ b/tests/tuning/test_tuning_core.py @@ -87,6 +87,22 @@ def test_correct_injection_basic(sc_tuning): sc.tuning.correct_injection(n_turns=1, n_reps=1, method='svd_cutoff', parameter=0) +@pytest.mark.slow +def test_correct_pseudo_orbit_basic(sc_tuning): + """Pseudo-orbit correction runs without error.""" + sc = sc_tuning + + # Apply a small kick to create an orbit offset + corr = sc.tuning.HCORR[0] + sc.magnet_settings.set(corr, 1e-5) + + # Build the orbit RM (pseudo-orbit reuses it) + sc.tuning.calculate_model_orbit_response_matrix() + + # Should run without raising + sc.tuning.correct_orbit(pseudo=True, n_turns=2, n_reps=1, method='svd_cutoff', parameter=0) + + @pytest.mark.slow def test_wiggle_last_corrector(sc_tuning): """wiggle_last_corrector runs without error on HMBA ring."""