diff --git a/CHANGELOG.md b/CHANGELOG.md index db43d22d..a6a46369 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] ### Added +- Zaber linear rail control module with Prefect workflow orchestration in `src/ac_training_lab/zaber_linear_rail/`: + - `config.py` - Configuration settings for serial port, motion parameters, and safety limits + - `rail_controller.py` - Core controller with connect, home, move_to_position, move_relative, get_position, stop + - `prefect_flows.py` - Prefect flows for remote orchestration (move_to_position_flow, home_flow, sequence_flow) - Support for both `rpicam-vid` (Raspberry Pi OS Trixie) and `libcamera-vid` (Raspberry Pi OS Bookworm) camera commands in `src/ac_training_lab/picam/device.py` to ensure compatibility across different OS versions. - Comprehensive Unit Operations section in `docs/index.md` documenting all available capabilities including dispensing, synthesis, characterization, and robotics operations. - Expanded Training Workflows section in `docs/index.md` with 10 educational workflows including RGB/RYB color matching, titration, yeast growth optimization, vision-enabled 3D printing optimization, microscopy image stitching, and AprilTag robot path planning. diff --git a/src/ac_training_lab/zaber_linear_rail/__init__.py b/src/ac_training_lab/zaber_linear_rail/__init__.py new file mode 100644 index 00000000..0f7ce82a --- /dev/null +++ b/src/ac_training_lab/zaber_linear_rail/__init__.py @@ -0,0 +1,34 @@ +""" +Zaber Linear Rail Control Module. + +Simple wrappers around zaber-motion library for Prefect integration. + +Example: + from ac_training_lab.zaber_linear_rail import connect, home, move_absolute + + connect() + home() + move_absolute(100.0) # Move to 100mm +""" + +from ac_training_lab.zaber_linear_rail.config import SERIAL_PORT +from ac_training_lab.zaber_linear_rail.rail_controller import ( + connect, + disconnect, + get_position, + home, + move_absolute, + move_relative, + stop, +) + +__all__ = [ + "connect", + "disconnect", + "home", + "move_absolute", + "move_relative", + "get_position", + "stop", + "SERIAL_PORT", +] diff --git a/src/ac_training_lab/zaber_linear_rail/config.py b/src/ac_training_lab/zaber_linear_rail/config.py new file mode 100644 index 00000000..fb9c81e8 --- /dev/null +++ b/src/ac_training_lab/zaber_linear_rail/config.py @@ -0,0 +1,15 @@ +""" +Configuration settings for Zaber linear rail control. + +Edit these values during initial setup to match your hardware and environment. +""" + +import os + +# Serial port where Zaber device is connected (USB) +# On Raspberry Pi, typically /dev/ttyUSB0 or /dev/ttyACM0 +SERIAL_PORT = os.getenv("ZABER_SERIAL_PORT", "/dev/ttyUSB0") + +# Device index is 0-based (for device_list), axis number is 1-based (Zaber) +DEVICE_INDEX = int(os.getenv("ZABER_DEVICE_INDEX", "0")) +AXIS_NUMBER = int(os.getenv("ZABER_AXIS_NUMBER", "1")) diff --git a/src/ac_training_lab/zaber_linear_rail/prefect_flows.py b/src/ac_training_lab/zaber_linear_rail/prefect_flows.py new file mode 100644 index 00000000..75cb380a --- /dev/null +++ b/src/ac_training_lab/zaber_linear_rail/prefect_flows.py @@ -0,0 +1,131 @@ +""" +Prefect flows for Zaber linear rail control. + +Simple wrappers around Zaber API calls with Prefect task/flow decorators. +""" + +from typing import Optional + +from prefect import flow, task + +from ac_training_lab.zaber_linear_rail.rail_controller import ( + connect, + disconnect, + get_position, + home, + move_absolute, + move_relative, + stop, +) + +# Wrap functions as tasks +connect_task = task(connect) +disconnect_task = task(disconnect) +home_task = task(home) +move_absolute_task = task(move_absolute) +move_relative_task = task(move_relative) +get_position_task = task(get_position) +stop_task = task(stop) + + +@flow +def home_flow(serial_port: Optional[str] = None) -> float: + """Home the axis. Returns position in mm.""" + connect_task(serial_port=serial_port) + try: + return home_task() + finally: + disconnect_task() + + +@flow +def move_absolute_flow( + position: float, unit: str = "mm", serial_port: Optional[str] = None +) -> float: + """Move to absolute position. Returns final position.""" + connect_task(serial_port=serial_port) + try: + return move_absolute_task(position, unit) + finally: + disconnect_task() + + +@flow +def move_relative_flow( + distance: float, unit: str = "mm", serial_port: Optional[str] = None +) -> float: + """Move by relative distance. Returns final position.""" + connect_task(serial_port=serial_port) + try: + return move_relative_task(distance, unit) + finally: + disconnect_task() + + +@flow +def get_position_flow(unit: str = "mm", serial_port: Optional[str] = None) -> float: + """Get current position.""" + connect_task(serial_port=serial_port) + try: + return get_position_task(unit) + finally: + disconnect_task() + + +@flow +def sequence_flow( + positions: list[float], + unit: str = "mm", + serial_port: Optional[str] = None, + home_first: bool = True, +) -> list[float]: + """Move through sequence of positions. Returns list of final positions.""" + connect_task(serial_port=serial_port) + try: + if home_first: + home_task() + results = [] + for pos in positions: + results.append(move_absolute_task(pos, unit)) + return results + finally: + disconnect_task() + + +# Deployment functions +def deploy_all_flows(work_pool_name: str = "zaber-linear-rail-pool") -> None: + """Deploy all flows to Prefect.""" + from pathlib import Path + + source_dir = Path(__file__).parent.parent.parent + base = "ac_training_lab/zaber_linear_rail/prefect_flows.py" + + home_flow.from_source( + source=str(source_dir), entrypoint=f"{base}:home_flow" + ).deploy(name="home", work_pool_name=work_pool_name) + + move_absolute_flow.from_source( + source=str(source_dir), entrypoint=f"{base}:move_absolute_flow" + ).deploy(name="move-absolute", work_pool_name=work_pool_name) + + move_relative_flow.from_source( + source=str(source_dir), entrypoint=f"{base}:move_relative_flow" + ).deploy(name="move-relative", work_pool_name=work_pool_name) + + get_position_flow.from_source( + source=str(source_dir), entrypoint=f"{base}:get_position_flow" + ).deploy(name="get-position", work_pool_name=work_pool_name) + + sequence_flow.from_source( + source=str(source_dir), entrypoint=f"{base}:sequence_flow" + ).deploy(name="sequence", work_pool_name=work_pool_name) + + print(f"Deployed all flows to work pool: {work_pool_name}") + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1 and sys.argv[1] == "deploy": + pool = sys.argv[2] if len(sys.argv) > 2 else "zaber-linear-rail-pool" + deploy_all_flows(pool) diff --git a/src/ac_training_lab/zaber_linear_rail/rail_controller.py b/src/ac_training_lab/zaber_linear_rail/rail_controller.py new file mode 100644 index 00000000..7ae183d8 --- /dev/null +++ b/src/ac_training_lab/zaber_linear_rail/rail_controller.py @@ -0,0 +1,96 @@ +""" +Core controller for Zaber linear rail. + +Simple wrappers around zaber-motion library for Prefect task integration. +""" + +from typing import Optional + +from zaber_motion import Library +from zaber_motion.ascii import Connection + +from ac_training_lab.zaber_linear_rail.config import ( + AXIS_NUMBER, + DEVICE_INDEX, + SERIAL_PORT, +) + +# Global connection and axis references +_connection: Optional[Connection] = None +_axis = None + + +def _check_connected(): + """Raise if not connected.""" + if _axis is None: + raise RuntimeError("Not connected. Call connect() first.") + + +def connect(serial_port: Optional[str] = None) -> dict: + """Connect to Zaber device via USB serial.""" + global _connection, _axis + + Library.enable_device_db_store() + port = serial_port or SERIAL_PORT + _connection = Connection.open_serial_port(port) + device_list = _connection.detect_devices() + + if not device_list: + _connection.close() + _connection = None + raise RuntimeError("No Zaber devices found") + + device = device_list[DEVICE_INDEX] + _axis = device.get_axis(AXIS_NUMBER) + + return { + "device_name": device.name, + "device_id": device.device_id, + "axis_count": device.axis_count, + } + + +def disconnect() -> None: + """Disconnect from Zaber device.""" + global _connection, _axis + if _connection: + _connection.close() + _connection = None + _axis = None + + +def home() -> float: + """Home the axis. Returns position in mm.""" + _check_connected() + _axis.home() + _axis.wait_until_idle() + return _axis.get_position("mm") + + +def move_absolute(position: float, unit: str = "mm") -> float: + """Move to absolute position. Returns final position.""" + _check_connected() + _axis.move_absolute(position, unit) + _axis.wait_until_idle() + return _axis.get_position("mm") + + +def move_relative(distance: float, unit: str = "mm") -> float: + """Move by relative distance. Returns final position.""" + _check_connected() + _axis.move_relative(distance, unit) + _axis.wait_until_idle() + return _axis.get_position("mm") + + +def get_position(unit: str = "mm") -> float: + """Get current position.""" + _check_connected() + return _axis.get_position(unit) + + +def stop() -> float: + """Stop movement. Returns position.""" + _check_connected() + _axis.stop() + return _axis.get_position("mm")