Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pySC/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .apps.measurements import measure_bba
from .apps.measurements import measure_ORM
from .apps.measurements import measure_dispersion
from .tuning.pySC_interface import pySCInjectionInterface, pySCOrbitInterface
from .tuning.pySC_interface import pySCInjectionInterface, pySCOrbitInterface, pySCPseudoOrbitInterface
import logging
import sys

Expand Down Expand Up @@ -46,3 +46,4 @@ def disable_pySC_rich():
# the model_rebuild is "almost certainly"? triggered.
pySCInjectionInterface.model_rebuild()
pySCOrbitInterface.model_rebuild()
pySCPseudoOrbitInterface.model_rebuild()
13 changes: 13 additions & 0 deletions pySC/core/bpm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ def capture_injection(self, n_turns=1, bba=True, subtract_reference=True, use_de
else:
return fake_trajectory_x_tbt, fake_trajectory_y_tbt

def capture_pseudo_orbit(self, n_turns=1, bba=True, subtract_reference=True,
use_design=False, return_transmission=False):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a "starting_turn=1" option in case one wants to define the pseudo-orbit from later turns for reasons like fast-kicker orbit bumps, decoherence, e.t.c

result = self.capture_injection(
n_turns=n_turns, bba=bba, subtract_reference=subtract_reference,
use_design=use_design, return_transmission=return_transmission
)
if return_transmission:
x, y, transmission = result
return np.nanmean(x, axis=1), np.nanmean(y, axis=1), transmission
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to catch np.nanmean warnings. They can get quite annoying.

with warnings.catch_warnings():
    warnings.simplefilter("ignore", category=RuntimeWarning)
    mean_data = np.nanmean(data, axis=1)

else:
x, y = result
return np.nanmean(x, axis=1), np.nanmean(y, axis=1)

def capture_kick(self, n_turns=1, kick_px=0, kick_py=0, bba=True, subtract_reference=True, use_design=False) -> tuple[np.ndarray, np.ndarray]:
'''
Simulates an orbit reading, after kicking a stored beam, from the BPMs, applying calibration errors, offsets/rolls, and noise.
Expand Down
10 changes: 10 additions & 0 deletions pySC/tuning/pySC_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,13 @@ def get_ref_orbit(self) -> tuple[np.ndarray, np.ndarray]:
x_ref = np.repeat(self.SC.bpm_system.reference_x[:, np.newaxis], self.n_turns, axis=1)
y_ref = np.repeat(self.SC.bpm_system.reference_y[:, np.newaxis], self.n_turns, axis=1)
return x_ref.flatten(order='F'), y_ref.flatten(order='F')

class pySCPseudoOrbitInterface(pySCOrbitInterface):
SC: "SimulatedCommissioning" = Field(repr=False)
n_turns: int = 1
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

starting_turn=1 from above should be an attribute of the class here


def get_orbit(self) -> tuple[np.ndarray, np.ndarray]:
return self.SC.bpm_system.capture_pseudo_orbit(
n_turns=self.n_turns, use_design=self.use_design,
bba=self.bba, subtract_reference=self.subtract_reference
)
56 changes: 18 additions & 38 deletions pySC/tuning/tuning_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .c_minus import CMinus
from .rf_tuning import RF_tuning
from ..core.control import IndivControl
from .pySC_interface import pySCInjectionInterface, pySCOrbitInterface
from .pySC_interface import pySCInjectionInterface, pySCOrbitInterface, pySCPseudoOrbitInterface
from ..apps import orbit_correction

import numpy as np
Expand Down Expand Up @@ -186,61 +186,41 @@ def correct_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=10

return

def correct_orbit(self, n_reps=1, method='tikhonov', parameter=100, gain=1, virtual=False):
def correct_orbit(self, n_reps=1, method='tikhonov', parameter=100, gain=1, virtual=False,
pseudo=False, n_turns=1):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should raise an error/warning if n_turns and starting_turn is redefined but pseudo is False

RM_name = 'orbit'
self.fetch_response_matrix(RM_name, orbit=True)
response_matrix = self.response_matrix[RM_name]
response_matrix.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms)

SC = self._parent
interface = pySCOrbitInterface(SC=SC)
if pseudo:
interface = pySCPseudoOrbitInterface(SC=SC, n_turns=n_turns)
else:
interface = pySCOrbitInterface(SC=SC)

for _ in range(n_reps):
_ = orbit_correction(interface=interface, response_matrix=response_matrix, reference=None,
method=method, parameter=parameter, virtual=virtual, gain=gain, apply=True)

orbit_x, orbit_y = SC.bpm_system.capture_orbit()
rms_x = np.nanstd(orbit_x) * 1e6
rms_y = np.nanstd(orbit_y) * 1e6
logger.info(f'Corrected orbit: {rms_x=:.1f} um, {rms_y=:.1f} um.')
if pseudo:
orbit_x, orbit_y = SC.bpm_system.capture_pseudo_orbit(n_turns=n_turns)
rms_x = np.nanstd(orbit_x) * 1e6
rms_y = np.nanstd(orbit_y) * 1e6
logger.info(f'Corrected pseudo-orbit ({n_turns} turns): {rms_x=:.1f} um, {rms_y=:.1f} um.')
else:
orbit_x, orbit_y = SC.bpm_system.capture_orbit()
rms_x = np.nanstd(orbit_x) * 1e6
rms_y = np.nanstd(orbit_y) * 1e6
logger.info(f'Corrected orbit: {rms_x=:.1f} um, {rms_y=:.1f} um.')
return

# def correct_pseudo_orbit_at_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=100, gain=1, zerosum=False):
# RM_name = 'orbit'
# self.fetch_response_matrix(RM_name, orbit=True)
# RM = self.response_matrix[RM_name]
# RM.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms)

# for _ in range(n_reps):
# trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns)
# pseudo_orbit_x = np.nanmean(trajectory_x, axis=1)
# pseudo_orbit_y = np.nanmean(trajectory_y, axis=1)
# pseudo_orbit = np.concat((pseudo_orbit_x, pseudo_orbit_y))

# trims = RM.solve(pseudo_orbit, method=method, parameter=parameter, zerosum=zerosum)

# settings = self._parent.magnet_settings
# for control_name, trim in zip(self.CORR, trims):
# setpoint = settings.get(control_name=control_name) - gain * trim
# settings.set(control_name=control_name, setpoint=setpoint)

# trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns)
# trajectory_x = trajectory_x.flatten('F')
# trajectory_y = trajectory_y.flatten('F')
# rms_x = np.nanstd(trajectory_x) * 1e6
# rms_y = np.nanstd(trajectory_y) * 1e6
# bad_readings = sum(np.isnan(trajectory_x))
# good_turns = (len(trajectory_x) - bad_readings) / len(self._parent.bpm_system.indices)
# logger.info(f'Corrected injection: transmission through {good_turns:.2f}/{n_turns} turns, {rms_x=:.1f} um, {rms_y=:.1f} um.')

# return

def fit_dispersive_orbit(self):
SC = self._parent
response = measure_RFFrequencyOrbitResponse(SC=SC, use_design=True)

x,y = SC.bpm_system.capture_orbit(bba=False, subtract_reference=False, use_design=False)
xy = np.concat((x.flatten(order='F'), y.flatten(order='F')))
xy = np.concatenate((x.flatten(order='F'), y.flatten(order='F')))

return np.dot(xy, response) / np.dot(response, response)

Expand Down
36 changes: 36 additions & 0 deletions tests/core/test_bpm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,39 @@ def test_capture_injection_design_mode(sc):
x_design2, y_design2 = bpm.capture_injection(n_turns=1, use_design=True)
np.testing.assert_array_equal(x_design, x_design2)
np.testing.assert_array_equal(y_design, y_design2)


# ---------------------------------------------------------------------------
# capture_pseudo_orbit tests
# ---------------------------------------------------------------------------

@pytest.mark.slow
def test_capture_pseudo_orbit_shape(sc):
"""Output shape is (n_bpms,) for each of x, y — averaged across turns."""
bpm = sc.bpm_system
n_turns = 3
x, y = bpm.capture_pseudo_orbit(n_turns=n_turns, bba=False, subtract_reference=False, use_design=True)
assert x.shape == (len(bpm.indices),)
assert y.shape == (len(bpm.indices),)


@pytest.mark.slow
def test_capture_pseudo_orbit_with_transmission(sc):
"""return_transmission=True returns a 3-tuple (x, y, transmission)."""
bpm = sc.bpm_system
result = bpm.capture_pseudo_orbit(n_turns=2, return_transmission=True, use_design=True)
assert len(result) == 3
x, y, transmission = result
assert x.shape == (len(bpm.indices),)
assert y.shape == (len(bpm.indices),)


@pytest.mark.slow
def test_capture_pseudo_orbit_single_turn(sc):
"""n_turns=1 degenerates to a squeezed single-turn injection reading."""
bpm = sc.bpm_system
x_pseudo, y_pseudo = bpm.capture_pseudo_orbit(n_turns=1, bba=False, subtract_reference=False, use_design=True)
x_inj, y_inj = bpm.capture_injection(n_turns=1, bba=False, subtract_reference=False, use_design=True)
# With use_design=True (no RNG noise), pseudo-orbit of 1 turn == injection squeezed
np.testing.assert_allclose(x_pseudo, x_inj.squeeze())
np.testing.assert_allclose(y_pseudo, y_inj.squeeze())
16 changes: 16 additions & 0 deletions tests/tuning/test_tuning_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ def test_correct_injection_basic(sc_tuning):
sc.tuning.correct_injection(n_turns=1, n_reps=1, method='svd_cutoff', parameter=0)


@pytest.mark.slow
def test_correct_pseudo_orbit_basic(sc_tuning):
"""Pseudo-orbit correction runs without error."""
sc = sc_tuning

# Apply a small kick to create an orbit offset
corr = sc.tuning.HCORR[0]
sc.magnet_settings.set(corr, 1e-5)

# Build the orbit RM (pseudo-orbit reuses it)
sc.tuning.calculate_model_orbit_response_matrix()

# Should run without raising
sc.tuning.correct_orbit(pseudo=True, n_turns=2, n_reps=1, method='svd_cutoff', parameter=0)


@pytest.mark.slow
def test_wiggle_last_corrector(sc_tuning):
"""wiggle_last_corrector runs without error on HMBA ring."""
Expand Down