diff --git a/src/dodal/plan_stubs/__init__.py b/src/dodal/plan_stubs/__init__.py index 2c84ded8654..25f7a5e97c4 100644 --- a/src/dodal/plan_stubs/__init__.py +++ b/src/dodal/plan_stubs/__init__.py @@ -1,3 +1,21 @@ -from .wrapped import move, move_relative, set_absolute, set_relative, sleep, wait +from .wrapped import ( + move, + move_relative, + rd, + set_absolute, + set_relative, + sleep, + stop, + wait, +) -__all__ = ["move", "move_relative", "set_absolute", "set_relative", "sleep", "wait"] +__all__ = [ + "move", + "move_relative", + "rd", + "set_absolute", + "set_relative", + "sleep", + "stop", + "wait", +] diff --git a/src/dodal/plan_stubs/wrapped.py b/src/dodal/plan_stubs/wrapped.py index 43555dbbc16..1d134c2fb7c 100644 --- a/src/dodal/plan_stubs/wrapped.py +++ b/src/dodal/plan_stubs/wrapped.py @@ -3,7 +3,7 @@ from typing import Annotated, TypeVar import bluesky.plan_stubs as bps -from bluesky.protocols import Movable +from bluesky.protocols import Movable, Readable, Stoppable from bluesky.utils import MsgGenerator """ @@ -146,3 +146,27 @@ def wait( """ return (yield from bps.wait(group, timeout=timeout)) + + +def rd(readable: Readable) -> MsgGenerator: + """Reads a single-value non-triggered object, wrapper for `bp.rd`. + + Args: + readable (Readable): The device to be read + + Returns: + Iterator[MsgGenerator]: Bluesky messages + """ + return (yield from bps.rd(readable)) + + +def stop(stoppable: Stoppable) -> MsgGenerator: + """Stop a device, wrapper for `bp.stop`. + + Args: + stoppable (Stoppable): Device to be stopped + + Returns: + Iterator[MsgGenerator]: Bluesky messages + """ + return (yield from bps.stop(stoppable)) diff --git a/src/dodal/plans/__init__.py b/src/dodal/plans/__init__.py index fb402459694..b94c77f4163 100644 --- a/src/dodal/plans/__init__.py +++ b/src/dodal/plans/__init__.py @@ -1,4 +1,33 @@ from .scanspec import spec_scan -from .wrapped import count +from .wrapped import ( + count, + grid_num_rscan, + grid_num_scan, + list_grid_rscan, + list_grid_scan, + list_rscan, + list_scan, + num_rscan, + num_scan, + step_grid_rscan, + step_grid_scan, + step_rscan, + step_scan, +) -__all__ = ["count", "spec_scan"] +__all__ = [ + "count", + "grid_num_rscan", + "grid_num_scan", + "list_grid_rscan", + "list_grid_scan", + "list_rscan", + "list_scan", + "num_rscan", + "num_scan", + "spec_scan", + "step_grid_rscan", + "step_grid_scan", + "step_rscan", + "step_scan", +] diff --git a/src/dodal/plans/wrapped.py b/src/dodal/plans/wrapped.py index 48875c52353..46cdbfcfbdf 100644 --- a/src/dodal/plans/wrapped.py +++ b/src/dodal/plans/wrapped.py @@ -1,11 +1,16 @@ +import itertools from collections.abc import Sequence +from decimal import Decimal from typing import Annotated, Any import bluesky.plans as bp -from bluesky.protocols import Readable +import numpy as np +from bluesky.protocols import Movable, Readable +from ophyd_async.core import AsyncReadable from pydantic import Field, NonNegativeFloat, validate_call from dodal.common import MsgGenerator +from dodal.devices.motors import Motor from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator """This module wraps plan(s) from bluesky.plans until required handling for them is @@ -27,7 +32,7 @@ @validate_call(config={"arbitrary_types_allowed": True}) def count( detectors: Annotated[ - set[Readable], + Sequence[Readable | AsyncReadable], Field( description="Set of readable devices, will take a reading at each point", min_length=1, @@ -55,3 +60,498 @@ def count( metadata = metadata or {} metadata["shape"] = (num,) yield from bp.count(tuple(detectors), num, delay=delay, md=metadata) + + +def _make_args( + movers: Sequence[Movable | Motor], + params: list[Any] | Sequence[Any], + num_params: int, +): + movers_len = len(movers) + params_len = len(params) + args = [] + if params_len % movers_len == 0 and params_len % num_params == 0: + it = iter(params) + param_chunks = iter(lambda: tuple(itertools.islice(it, num_params)), ()) + for movable, param_chunk in zip(movers, param_chunks, strict=False): + args.append(movable) + args.extend(param_chunk) + else: + raise ValueError(f"params must contain {num_params} values for each movable") + return args + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def num_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[float], + Field( + description="Start and stop points for each movable, 'start1, stop1, ...," + "startN, stopN' for every movable in `movers`." + ), + ], + num: Annotated[int, Field(description="Number of points")], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over one multi-motor trajectory. + Wraps bluesky.plans.scan(det, *args, num, md=metadata)""" + metadata = metadata or {} + metadata["shape"] = (num,) + args = _make_args(movers=movers, params=params, num_params=2) + yield from bp.scan(tuple(detectors), *args, num=num, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def num_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[float], + Field( + description="Start and stop points for each movable, 'start1, stop1, ...," + "startN, stopN' for every movable in `movers`." + ), + ], + num: Annotated[int, Field(description="Number of points")], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over one multi-motor trajectory, relative to current position. + Wraps bluesky.plans.rel_scan(det, *args, num, md=metadata)""" + metadata = metadata or {} + metadata["shape"] = (num,) + args = _make_args(movers=movers, params=params, num_params=2) + yield from bp.rel_scan(tuple(detectors), *args, num=num, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def grid_num_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + Sequence[float | int], + Field( + description="Start and stop and number of points for each movable, 'start1," + "stop1, num1, ..., startN, stopN, numN' for every movable in `movers`." + ), + ], + snake_axes: list | bool | None = None, + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over a mesh; each motor is on an independent trajectory. + Wraps bluesky.plans.grid_scan(det, *args, snake_axes, md=metadata)""" + metadata = metadata or {} + args = _make_args(movers=movers, params=params, num_params=3) + yield from bp.grid_scan(tuple(detectors), *args, snake_axes=snake_axes, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def grid_num_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + Sequence[float | int], + Field( + description="Start and stop and number of points for each movable, 'start1," + "stop1, num1, ..., startN, stopN, numN' for every movable in `movers`." + ), + ], + snake_axes: list | bool | None = None, + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over a mesh relative to current position. + Wraps bluesky.plans.rel_grid_scan(det, *args, snake_axes, md=metadata)""" + metadata = metadata or {} + args = _make_args(movers=movers, params=params, num_params=3) + yield from bp.rel_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def list_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[list[Any]], + Field( + description="List of points for each movable, '[point1, point2, ..., ], " + "[point1, point2, ...], ...' for every movable in `movers`." + ), + ], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over one or more variables in steps simultaneously. + Wraps bluesky.plans.list_scan(det, *args, md=metadata).""" + metadata = metadata or {} + args = _make_args(movers=movers, params=params, num_params=1) + yield from bp.list_scan(tuple(detectors), *args, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def list_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[list[Any]], + Field( + description="List of points for each movable, '[point1, point2, ..., ], " + "[point1, point2, ...], ...' for every movable in `movers`." + ), + ], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over one or more variables simultaneously relative to current position. + Wraps bluesky.plans.rel_list_scan(det, *args, md=metadata).""" + metadata = metadata or {} + args = _make_args(movers=movers, params=params, num_params=1) + yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def list_grid_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[list[Any]], + Field( + description="List of points for each movable, '[point1, point2, ..., ], " + "[point1, point2, ...], ...' for every movable in `movers`." + ), + ], + snake_axes: bool = False, # Currently specifying axes to snake is not supported + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over one or more variables for each given point on independent trajectories. + Wraps bluesky.plans.list_grid_scan(det, *args, md=metadata).""" + metadata = metadata or {} + args = _make_args(movers=movers, params=params, num_params=1) + yield from bp.list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def list_grid_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[list[Any]], + Field( + description="List of points for each movable, '[point1, point2, ..., ], " + "[point1, point2, ...], ...' for every movable in `movers`." + ), + ], + snake_axes: bool = False, # Currently specifying axes to snake is not supported + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over some variables for each given point relative to current position. + Wraps bluesky.plans.rel_list_grid_scan(det, *args, md=metadata).""" + metadata = metadata or {} + args = _make_args(movers=movers, params=params, num_params=1) + yield from bp.rel_list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +def _make_stepped_list( + params: list[Any] | Sequence[Any], + num: int | None = None, +): + def round_list_elements(stepped_list, step): + d = Decimal(str(step)) + exponent = d.as_tuple().exponent + decimal_places = -exponent # type: ignore + return np.round(stepped_list, decimals=decimal_places).tolist() + + start = params[0] + if len(params) == 3: + stop = params[1] + step = params[2] + if abs(step) > abs(stop - start): + step = abs(stop - start) + step = abs(step) * np.sign(stop - start) + stepped_list = np.arange(start=start, stop=stop, step=step).tolist() + if abs((stepped_list[-1] + step) - stop) <= abs(step * 0.01): + stepped_list.append(stepped_list[-1] + step) + rounded_stepped_list = round_list_elements(stepped_list=stepped_list, step=step) + elif len(params) == 2 and num: + step = params[1] + stepped_list = [start + (n * step) for n in range(num)] + rounded_stepped_list = round_list_elements(stepped_list=stepped_list, step=step) + else: + rounded_stepped_list = [] + return rounded_stepped_list + + +def _make_concurrently_stepped_lists( + movers_len: int, + params: list[Any] | Sequence[Any], +): + # separate out the first three elements + # separate the rest of the elements into chunks of two + stepped_lists = [] + params_len = len(params) + if movers_len == 1 and params_len == 3: + stepped_lists.append(_make_stepped_list(params)) + elif movers_len > 1 and (params_len - 3) % 2 == 0: + leader_params = params[:3] + follower_params = params[3:] + stepped_lists.append(_make_stepped_list(leader_params)) + num = len(stepped_lists[0]) + it = iter(follower_params) + param_chunks = iter(lambda: tuple(itertools.islice(it, 2)), ()) + for param_chunk in param_chunks: + stepped_lists.append(_make_stepped_list(param_chunk, num=num)) + else: + raise ValueError( + "params must contain 3 values for the first movable and 2 values for each " + "successive movable." + ) + return stepped_lists + + +def _make_independently_stepped_lists( + movers_len: int, + params: list[Any] | Sequence[Any], +): + num_params = 3 # It will always be start stop step for each axis + stepped_lists = [] + params_len = len(params) + if params_len % movers_len == 0 and params_len % num_params == 0: + it = iter(params) + param_chunks = iter(lambda: tuple(itertools.islice(it, num_params)), ()) + for param_chunk in param_chunks: + stepped_lists.append(_make_stepped_list(param_chunk)) + else: + raise ValueError(f"params must contain {num_params} values for each movable") + return stepped_lists + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def step_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[Any], + Field( + description="Start, stop, and step values for the first movable, and start " + "and step values for each successive movable, 'start1, stop1, step1, " + "start2, step 2, ..., startN, stepN' for every movable in `movers`." + ), + ], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over one multi-motor trajectory with specified step size. + Generates lists of points for each trajectory for bp.list_scan. + """ + movers_len = len(movers) + stepped_lists = _make_concurrently_stepped_lists( + movers_len=movers_len, params=params + ) + metadata = metadata or {} + args = _make_args(movers=movers, params=stepped_lists, num_params=1) + yield from bp.list_scan(tuple(detectors), *args, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def step_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[Any], + Field( + description="Start, stop, and step values for the first movable, and start " + "and step values for each successive movable, 'start1, stop1, step1, " + "start2, step 2, ..., startN, stepN' for every movable in `movers`." + ), + ], + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over multi-motor trajectory relative to position with specified step size. + Generates lists of points for each trajectory for bp.rel_list_scan. + """ + movers_len = len(movers) + stepped_lists = _make_concurrently_stepped_lists( + movers_len=movers_len, params=params + ) + metadata = metadata or {} + args = _make_args(movers=movers, params=stepped_lists, num_params=1) + yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def step_grid_scan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[Any], + Field( + description="Start, stop, and step values 'start1, stop1, step1, ..., " + "startN, stepN' for every movable in `movers`." + ), + ], + snake_axes: bool = False, # Currently specifying axes to snake is not supported + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over independent multi-motor trajectories with specified step size. + Generates lists of points for each trajectory for bp.list_grid_scan. + """ + movers_len = len(movers) + stepped_lists = _make_independently_stepped_lists( + movers_len=movers_len, params=params + ) + metadata = metadata or {} + args = _make_args(movers=movers, params=stepped_lists, num_params=1) + yield from bp.list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) + + +@attach_data_session_metadata_decorator() +@validate_call(config={"arbitrary_types_allowed": True}) +def step_grid_rscan( + detectors: Annotated[ + Sequence[Readable | AsyncReadable], + Field( + description="Set of readable devices, will take a reading at each point", + min_length=1, + ), + ], + movers: Annotated[ + Sequence[Movable | Motor], + Field(description="One or more movable to move during the scan."), + ], + params: Annotated[ + list[Any], + Field( + description="Start, stop, and step values 'start1, stop1, step1, ..., " + "startN, stepN' for every movable in `movers`." + ), + ], + snake_axes: bool = False, # Currently specifying axes to snake is not supported + metadata: dict[str, Any] | None = None, +) -> MsgGenerator: + """Scan over independent multi-motor relative trajectories with specified step size. + Generates lists of points for each trajectory for bp.rel_list_grid_scan. + """ + movers_len = len(movers) + stepped_lists = _make_independently_stepped_lists( + movers_len=movers_len, params=params + ) + metadata = metadata or {} + args = _make_args(movers=movers, params=stepped_lists, num_params=1) + yield from bp.rel_list_grid_scan( + tuple(detectors), *args, snake_axes=snake_axes, md=metadata + ) diff --git a/system_tests/test_adsim.py b/system_tests/test_adsim.py index 665a4175c96..9c5bf8af73f 100644 --- a/system_tests/test_adsim.py +++ b/system_tests/test_adsim.py @@ -87,7 +87,7 @@ def documents_from_num( ) -> dict[str, list[DocumentType]]: docs: dict[str, list[DocumentType]] = {} run_engine( - count({det}, num=request.param), + count([det], num=request.param), lambda name, doc: docs.setdefault(name, []).append(doc), ) return docs diff --git a/tests/plan_stubs/test_wrapped_stubs.py b/tests/plan_stubs/test_wrapped_stubs.py index 9af2e5d5dd7..cbc77ac9bd2 100644 --- a/tests/plan_stubs/test_wrapped_stubs.py +++ b/tests/plan_stubs/test_wrapped_stubs.py @@ -10,9 +10,11 @@ from dodal.plan_stubs.wrapped import ( move, move_relative, + rd, set_absolute, set_relative, sleep, + stop, wait, ) @@ -147,3 +149,11 @@ def test_wait_group_and_timeout(): assert list(wait("foo", 5.0)) == [ Msg("wait", group="foo", timeout=5.0, error_on_timeout=True, watch=_EMPTY) ] + + +def test_rd(x_axis: SimMotor): + assert list(rd(x_axis)) == [Msg("locate", obj=x_axis)] + + +def test_stop(x_axis: SimMotor): + assert list(stop(x_axis)) == [Msg("stop", obj=x_axis)] diff --git a/tests/plans/test_wrapped.py b/tests/plans/test_wrapped.py index b6b76dbc88c..b4cd3485059 100644 --- a/tests/plans/test_wrapped.py +++ b/tests/plans/test_wrapped.py @@ -1,5 +1,5 @@ from collections.abc import Sequence -from typing import cast +from typing import Any, cast import pytest from bluesky.protocols import Readable @@ -13,11 +13,31 @@ StreamResource, ) from ophyd_async.core import ( + AsyncReadable, StandardDetector, ) from pydantic import ValidationError -from dodal.plans.wrapped import count +from dodal.devices.motors import Motor +from dodal.plans.wrapped import ( + _make_args, + _make_concurrently_stepped_lists, + _make_independently_stepped_lists, + _make_stepped_list, + count, + grid_num_rscan, + grid_num_scan, + list_grid_rscan, + list_grid_scan, + list_rscan, + list_scan, + num_rscan, + num_scan, + step_grid_rscan, + step_grid_scan, + step_rscan, + step_scan, +) @pytest.fixture @@ -26,7 +46,7 @@ def documents_from_num( ) -> dict[str, list[Document]]: docs: dict[str, list[Document]] = {} run_engine( - count({det}, num=request.param), + count([det], num=request.param), lambda name, doc: docs.setdefault(name, []).append(doc), ) return docs @@ -50,16 +70,16 @@ def test_count_delay_validation(det: StandardDetector, run_engine: RunEngine): } for delay, reason in args.items(): with pytest.raises((ValidationError, AssertionError), match=reason): - run_engine(count({det}, num=3, delay=delay)) + run_engine(count([det], num=3, delay=delay)) print(delay) def test_count_detectors_validation(run_engine: RunEngine): - args: dict[str, set[Readable]] = { + args: dict[str, Sequence[Readable | AsyncReadable]] = { # No device to read - "Set should have at least 1 item after validation, not 0": set(), + "1 validation error for count": set(), # Not Readable - "Input should be an instance of Readable": set("foo"), # type: ignore + "Input should be an instance of Sequence": set("foo"), # type: ignore } for reason, dets in args.items(): with pytest.raises(ValidationError, match=reason): @@ -74,7 +94,7 @@ def test_count_num_validation(det: StandardDetector, run_engine: RunEngine): } for num, reason in args.items(): with pytest.raises(ValidationError, match=reason): - run_engine(count({det}, num=num)) + run_engine(count([det], num=num)) @pytest.mark.parametrize( @@ -157,3 +177,819 @@ def test_plan_produces_expected_datums( docs = documents_from_num.get("stream_datum") data_keys = [det.name, f"{det.name}-sum"] assert docs and len(docs) == len(data_keys) * length + + +@pytest.mark.parametrize( + "num_params, params", ([2, [1, 2, 3, 4]], [3, [1, 2, 3, 3, 4, 3]]) +) +def test_make_args(x_axis: Motor, y_axis: Motor, num_params: int, params: list[float]): + movers = [x_axis, y_axis] + args = _make_args(movers=movers, params=params, num_params=num_params) + print(args) + assert len(args) == len(movers) + len(params) + assert args[0] == x_axis + assert args[(num_params + 1)] == y_axis + assert args[1] == 1 + assert args[(num_params + 2)] == 3 + + +def test_make_args_when_given_lists(x_axis: Motor, y_axis: Motor): + args = _make_args( + movers=[x_axis, y_axis], params=[[1, 2, 3, 4], [3, 4, 5, 6]], num_params=1 + ) + print(args) + assert len(args) == 4 + assert args[0] == x_axis + assert args[2] == y_axis + assert args[1][0] == 1 + assert args[3][0] == 3 + + +@pytest.mark.parametrize("x_start, x_stop, num", ([0, 2, 5], [1, -1, 3])) +def test_num_scan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + num: int, +): + run_engine( + num_scan(detectors=[det], movers=[x_axis], params=[x_start, x_stop], num=num) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, y_start, y_stop, num", ([0, 2, 2, 0, 5], [-1, 1, -1, 1, 3]) +) +def test_num_scan_with_two_axes( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + y_axis: Motor, + y_start: Any, + y_stop: Any, + num: int, +): + run_engine( + num_scan( + detectors=[det], + movers=[x_axis, y_axis], + params=[x_start, x_stop, y_start, y_stop], + num=num, + ) + ) + + +def test_num_scan_fails_when_given_wrong_number_of_params( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, y_axis: Motor +): + with pytest.raises(ValueError): + run_engine( + num_scan(detectors=[det], movers=[x_axis, y_axis], params=[0, 1, 2], num=3) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, y_start, y_stop, num", ([-1, 1, 2, 0, 0], [-1, 1, -1, 1, 3.5]) +) +def test_num_scan_fails_when_given_bad_info( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + y_axis: Motor, + y_start: Any, + y_stop: Any, + num: int, +): + with pytest.raises(ValueError): + run_engine( + num_scan( + detectors=[det], + movers=[x_axis, y_axis], + params=[x_start, x_stop, y_start, y_stop], + num=num, + ) + ) + + +@pytest.mark.parametrize("x_start, x_stop, num", ([0, 2, 5], [1, -1, 3])) +def test_num_rscan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + num: int, +): + run_engine( + num_rscan(detectors=[det], movers=[x_axis], params=[x_start, x_stop], num=num) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, y_start, y_stop, num", ([0, 2, 2, 0, 5], [-1, 1, -1, 1, 3]) +) +def test_num_rscan_with_two_axes( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + y_axis: Motor, + y_start: Any, + y_stop: Any, + num: int, +): + run_engine( + num_rscan( + detectors=[det], + movers=[x_axis, y_axis], + params=[x_start, x_stop, y_start, y_stop], + num=num, + ) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, y_start, y_stop, num", ([-1, 1, 2, 0, 0], [-1, 1, -1, 1, 3.5]) +) +def test_num_rscan_fails_when_given_bad_info( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + y_axis: Motor, + y_start: Any, + y_stop: Any, + num: int, +): + with pytest.raises(ValueError): + run_engine( + num_rscan( + detectors=[det], + movers=[x_axis, y_axis], + params=[x_start, x_stop, y_start, y_stop], + num=num, + ) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, x_num, y_start, y_stop, y_num", + ([0, 2, 3, 0, 2, 3], [-1, 1, 5, 1, -1, 5]), +) +def test_grid_num_scan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + x_num: int, + y_axis: Motor, + y_start: Any, + y_stop: Any, + y_num: int, +): + run_engine( + grid_num_scan( + detectors=[det], + movers=[y_axis, x_axis], + params=[y_start, y_stop, y_num, x_start, x_stop, x_num], + ) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, x_num, y_start, y_stop, y_num", + ([0, 2, 3, 0, 2, 3], [-1, 1, 5, 1, -1, 5]), +) +def test_grid_num_scan_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + x_num: int, + y_axis: Motor, + y_start: Any, + y_stop: Any, + y_num: int, +): + run_engine( + grid_num_scan( + detectors=[det], + movers=[y_axis, x_axis], + params=[y_start, y_stop, y_num, x_start, x_stop, x_num], + snake_axes=True, + ) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, x_num, y_start, y_stop, y_num", + ([0, 2, 3, 0, 2, 3], [-1, 1, 5, 1, -1, 5]), +) +def test_grid_num_scan_when_snaking_subset_of_axes( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + x_num: int, + y_axis: Motor, + y_start: Any, + y_stop: Any, + y_num: int, +): + run_engine( + grid_num_scan( + detectors=[det], + movers=[y_axis, x_axis], + params=[y_start, y_stop, y_num, x_start, x_stop, x_num], + snake_axes=[x_axis], + ) + ) + + +def test_grid_num_scan_fails_when_snaking_slow_axis( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(ValueError): + run_engine( + grid_num_scan( + detectors=[det], + movers=[y_axis, x_axis], + params=[0, 2, 3, 0, 2, 3], + snake_axes=[y_axis], + ) + ) + + +def test_grid_num_scan_fails_when_given_length_of_zero( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(RuntimeError): + run_engine( + grid_num_scan( + detectors=[det], + movers=[y_axis, x_axis], + params=[0, 2, 0, 0, 2, 3], + ) + ) + + +def test_grid_num_scan_fails_when_given_non_integer_length( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(TypeError): + run_engine( + grid_num_scan( + detectors=[det], + movers=[y_axis, x_axis], + params=[0, 2, 3.5, 0, 2, 3], + ) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, x_num, y_start, y_stop, y_num", + ([0, 2, 3, 0, 2, 3], [-1, 1, 5, 1, -1, 5]), +) +def test_grid_num_rscan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + x_num: int, + y_axis: Motor, + y_start: Any, + y_stop: Any, + y_num: int, +): + run_engine( + grid_num_rscan( + detectors=[det], + movers=[y_axis, x_axis], + params=[y_start, y_stop, y_num, x_start, x_stop, x_num], + ) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, x_num, y_start, y_stop, y_num", + ([0, 2, 3, 0, 2, 3], [-1, 1, 5, 1, -1, 5]), +) +def test_grid_num_rscan_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + x_num: int, + y_axis: Motor, + y_start: Any, + y_stop: Any, + y_num: int, +): + run_engine( + grid_num_rscan( + detectors=[det], + movers=[y_axis, x_axis], + params=[y_start, y_stop, y_num, x_start, x_stop, x_num], + snake_axes=True, + ) + ) + + +@pytest.mark.parametrize( + "x_start, x_stop, x_num, y_start, y_stop, y_num", + ([0, 2, 3, 0, 2, 3], [-1, 1, 5, 1, -1, 5]), +) +def test_grid_num_rscan_when_snaking_subset_of_axes( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_start: Any, + x_stop: Any, + x_num: int, + y_axis: Motor, + y_start: Any, + y_stop: Any, + y_num: int, +): + run_engine( + grid_num_rscan( + detectors=[det], + movers=[y_axis, x_axis], + params=[y_start, y_stop, y_num, x_start, x_stop, x_num], + snake_axes=[x_axis], + ) + ) + + +def test_grid_num_rscan_fails_when_snaking_slow_axis( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(ValueError): + run_engine( + grid_num_rscan( + detectors=[det], + movers=[y_axis, x_axis], + params=[0, 2, 3, 0, 2, 3], + snake_axes=[y_axis], + ) + ) + + +def test_grid_num_rscan_fails_when_given_length_of_zero( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(RuntimeError): + run_engine( + grid_num_rscan( + detectors=[det], + movers=[y_axis, x_axis], + params=[0, 2, 0, 0, 2, 3], + ) + ) + + +def test_grid_num_rscan_fails_when_given_non_integer_length( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(TypeError): + run_engine( + grid_num_rscan( + detectors=[det], movers=[y_axis, x_axis], params=[0, 2, 3.5, 0, 2, 3] + ) + ) + + +@pytest.mark.parametrize("x_list", ([[0, 1, 2, 3]], [[1.1, 2.2, 3.3]])) +def test_list_scan( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_list: Any +): + run_engine(list_scan(detectors=[det], movers=[x_axis], params=x_list)) + + +@pytest.mark.parametrize( + "x_list, y_list", + ( + [[3, 2, 1], [1, 2, 3]], + [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], + ), +) +def test_list_scan_with_two_axes( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, + y_axis: Motor, + y_list: list, +): + run_engine( + list_scan(detectors=[det], movers=[x_axis, y_axis], params=[x_list, y_list]) + ) + + +def test_list_scan_with_two_axes_fails_when_given_differnt_list_lengths( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(ValueError): + run_engine( + list_scan( + detectors=[det], + movers=[x_axis, y_axis], + params=[[1, 2, 3, 4, 5], [1, 2, 3, 4]], + ) + ) + + +@pytest.mark.parametrize("x_list", ([[0, 1, 2, 3]], [[1.1, 2.2, 3.3]])) +def test_list_rscan( + run_engine: RunEngine, det: StandardDetector, x_axis: Motor, x_list: Any +): + run_engine(list_rscan(detectors=[det], movers=[x_axis], params=x_list)) + + +@pytest.mark.parametrize( + "x_list, y_list", + ( + [[3, 2, 1], [1, 2, 3]], + [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], + ), +) +def test_list_rscan_with_two_axes( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, + y_axis: Motor, + y_list: list, +): + run_engine( + list_rscan(detectors=[det], movers=[x_axis, y_axis], params=[x_list, y_list]) + ) + + +def test_list_rscan_with_two_axes_fails_when_given_differnt_list_lengths( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(ValueError): + run_engine( + list_rscan( + detectors=[det], + movers=[x_axis, y_axis], + params=[[1, 2, 3, 4, 5], [1, 2, 3, 4]], + ) + ) + + +@pytest.mark.parametrize( + "x_list, y_list", + ( + [[3, 2, 1], [1, 2, 3]], + [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], + ), +) +def test_list_grid_scan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, + y_axis: Motor, + y_list: list, +): + run_engine( + list_grid_scan( + detectors=[det], movers=[x_axis, y_axis], params=[x_list, y_list] + ) + ) + + +def test_list_grid_scan_when_given_differnt_list_lengths( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + run_engine( + list_grid_scan( + detectors=[det], + movers=[x_axis, y_axis], + params=[[1, 2, 3, 4, 5], [1, 2, 3, 4]], + ) + ) + + +def test_list_grid_scan_when_given_bad_info( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(TypeError): + run_engine( + list_grid_scan( + detectors=[det], + movers=[x_axis, y_axis], + params=[[1, 2, 3, 4, 5], ["one", 2, 3, 4, 5]], + ) + ) + + +@pytest.mark.parametrize( + "x_list, y_list", + ( + [[3, 2, 1], [1, 2, 3]], + [[-1.1, -2.2, -3.3, -4.4, -5.5], [1.1, 2.2, 3.3, 4.4, 5.5]], + ), +) +def test_list_grid_rscan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + x_list: list, + y_axis: Motor, + y_list: list, +): + run_engine( + list_grid_rscan( + detectors=[det], movers=[x_axis, y_axis], params=[x_list, y_list] + ) + ) + + +def test_list_grid_rscan_with_two_axes_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + run_engine( + list_grid_rscan( + detectors=[det], + movers=[x_axis, y_axis], + params=[[1, 2, 3, 4, 5], [1, 2, 3, 4, 5]], + snake_axes=True, + ) + ) + + +def test_list_grid_rscan_when_given_differnt_list_lengths( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + run_engine( + list_grid_rscan( + detectors=[det], + movers=[x_axis, y_axis], + params=[[1, 2, 3, 4, 5], [1, 2, 3, 4]], + ) + ) + + +def test_list_grid_rscan_when_given_bad_info( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises(TypeError): + run_engine( + list_grid_rscan( + detectors=[det], + movers=[x_axis, y_axis], + params=[[1, 2, 3, 4, 5], ["one", 2, 3, 4, 5]], + ) + ) + + +@pytest.mark.parametrize( + "params", ([-1, 1, 0.1], [-2, 2, 0.2], [1, -1, -0.1], [2, -2, -0.2]) +) +def test_make_stepped_list_when_given_three_params(params: list[Any]): + stepped_list = _make_stepped_list(params=params) + assert len(stepped_list) == 21 + assert stepped_list[0] / stepped_list[-1] == -1 + assert stepped_list[10] == 0 + + +@pytest.mark.parametrize("params", ([-1, 0.1], [-2, 0.2], [1, -0.1], [2, -0.2])) +def test_make_stepped_list_when_given_two_params(params: list[Any]): + stepped_list = _make_stepped_list(params=params, num=21) + assert len(stepped_list) == 21 + assert stepped_list[0] / stepped_list[-1] == -1 + assert stepped_list[10] == 0 + + +def test_make_stepped_list_when_given_wrong_number_of_params(): + stepped_list = _make_stepped_list(params=[1]) + assert stepped_list == [] + + +def test_make_concurrently_stepped_lists(): + stepped_lists = _make_concurrently_stepped_lists( + movers_len=2, params=[0, 1, 0.1, 0, 1] + ) + assert len(stepped_lists) == 2 + assert len(stepped_lists[0]) == 11 and len(stepped_lists[1]) == 11 + assert stepped_lists[0][-1] == 1 + assert stepped_lists[1][-1] == 10 + + +def test_make_independently_stepped_lists(): + stepped_lists = _make_independently_stepped_lists( + movers_len=2, params=[0, 1, 0.1, -10, 10, 1] + ) + assert len(stepped_lists) == 2 + assert len(stepped_lists[0]) == 11 and len(stepped_lists[1]) == 21 + assert stepped_lists[0][-1] == 1 + assert stepped_lists[1][-1] == 10 + + +@pytest.mark.parametrize("params", ([0, 1, 0.1], [-1, 1, 0.1], [0, 10, 1])) +def test_step_scan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + params: list[Any], +): + run_engine(step_scan(detectors=[det], movers=[x_axis], params=params)) + + +@pytest.mark.parametrize( + "params", ([0, 1, 0.1, 0, 0.1], [-1, 1, 0.1, -1, 0.1], [0, 10, 1, 0, 1]) +) +def test_step_scan_with_multiple_movers( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + run_engine(step_scan(detectors=[det], movers=[x_axis, y_axis], params=params)) + + +@pytest.mark.parametrize("params", ([0, 1, 0.1, 0, 1, 0.1], [0, 1, 0.1, 0])) +def test_step_scan_fails_when_given_incorrect_number_of_params( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + with pytest.raises(ValueError): + run_engine(step_scan(detectors=[det], movers=[x_axis, y_axis], params=params)) + + +@pytest.mark.parametrize("params", ([0, 1, 0.25], [-1, 1, 0.5], [0, 10, 2.5])) +def test_step_rscan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + params: list[Any], +): + run_engine(step_rscan(detectors=[det], movers=[x_axis], params=params)) + + +@pytest.mark.parametrize( + "params", ([0, 1, 0.25, 0, 0.25], [-1, 1, 0.5, -1, 0.5], [0, 10, 2.5, 0, 2.5]) +) +def test_step_rscan_with_multiple_movers( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + run_engine(step_rscan(detectors=[det], movers=[x_axis, y_axis], params=params)) + + +@pytest.mark.parametrize("params", ([0, 1, 0.5, 0, 1, 0.5], [0, 1, 0.5, 0])) +def test_step_rscan_fails_when_given_incorrect_number_of_params( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + with pytest.raises(ValueError): + run_engine(step_rscan(detectors=[det], movers=[x_axis, y_axis], params=params)) + + +@pytest.mark.parametrize("params", ([0, 1, 0.5, 0, 1, 0.5], [0, 10, 5, 0, 10, 5])) +def test_step_grid_scan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + run_engine(step_grid_scan(detectors=[det], movers=[y_axis, x_axis], params=params)) + + +@pytest.mark.parametrize("params", ([0, 1, 0.5, 0, 1, 0.5], [0, 10, 5, 0, 10, 5])) +def test_step_grid_scan_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + run_engine( + step_grid_scan( + detectors=[det], movers=[y_axis, x_axis], params=params, snake_axes=True + ) + ) + + +@pytest.mark.parametrize("params", ([0, 1, 0.25, 0, 1], [0, 10, 2.5, 0])) +def test_step_grid_scan_fails_when_given_incorrect_number_of_params( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + with pytest.raises(ValueError): + run_engine( + step_grid_scan( + detectors=[det], movers=[y_axis, x_axis], params=params, snake_axes=True + ) + ) + + +@pytest.mark.parametrize("params", ([0, 1, 0.5, 0, 1, 0.5], [0, 10, 5, 0, 10, 5])) +def test_step_grid_rscan( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + run_engine(step_grid_rscan(detectors=[det], movers=[y_axis, x_axis], params=params)) + + +@pytest.mark.parametrize("params", ([0, 1, 0.5, 0, 1, 0.5], [0, 10, 5, 0, 10, 5])) +def test_step_grid_rscan_when_snaking( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + run_engine( + step_grid_rscan( + detectors=[det], movers=[y_axis, x_axis], params=params, snake_axes=True + ) + ) + + +@pytest.mark.parametrize("params", ([0, 1, 0.1, 0, 1], [0, 10, 1, 0])) +def test_step_grid_rscan_fails_when_given_incorrect_number_of_params( + run_engine: RunEngine, + det: StandardDetector, + x_axis: Motor, + y_axis: Motor, + params: list[Any], +): + with pytest.raises(ValueError): + run_engine( + step_grid_rscan( + detectors=[det], movers=[y_axis, x_axis], params=params, snake_axes=True + ) + )