From d6636e2514ff6e6dc83f811c5ade46fe22846b29 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 18 Nov 2025 20:45:23 +0000 Subject: [PATCH 1/5] Initial plan From a4ea99f14cfc59d2797410802c82b834351d0702 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 26 Nov 2025 06:50:38 +0000 Subject: [PATCH 2/5] Add Zaber linear rail module with Prefect workflow support Co-authored-by: sgbaird <45469701+sgbaird@users.noreply.github.com> --- CHANGELOG.md | 4 + .../zaber_linear_rail/__init__.py | 42 ++ .../zaber_linear_rail/config.py | 101 +++++ .../zaber_linear_rail/prefect_flows.py | 393 ++++++++++++++++++ .../zaber_linear_rail/rail_controller.py | 328 +++++++++++++++ 5 files changed, 868 insertions(+) create mode 100644 src/ac_training_lab/zaber_linear_rail/__init__.py create mode 100644 src/ac_training_lab/zaber_linear_rail/config.py create mode 100644 src/ac_training_lab/zaber_linear_rail/prefect_flows.py create mode 100644 src/ac_training_lab/zaber_linear_rail/rail_controller.py diff --git a/CHANGELOG.md b/CHANGELOG.md index db43d22d..a6a46369 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] ### Added +- Zaber linear rail control module with Prefect workflow orchestration in `src/ac_training_lab/zaber_linear_rail/`: + - `config.py` - Configuration settings for serial port, motion parameters, and safety limits + - `rail_controller.py` - Core controller with connect, home, move_to_position, move_relative, get_position, stop + - `prefect_flows.py` - Prefect flows for remote orchestration (move_to_position_flow, home_flow, sequence_flow) - Support for both `rpicam-vid` (Raspberry Pi OS Trixie) and `libcamera-vid` (Raspberry Pi OS Bookworm) camera commands in `src/ac_training_lab/picam/device.py` to ensure compatibility across different OS versions. - Comprehensive Unit Operations section in `docs/index.md` documenting all available capabilities including dispensing, synthesis, characterization, and robotics operations. - Expanded Training Workflows section in `docs/index.md` with 10 educational workflows including RGB/RYB color matching, titration, yeast growth optimization, vision-enabled 3D printing optimization, microscopy image stitching, and AprilTag robot path planning. diff --git a/src/ac_training_lab/zaber_linear_rail/__init__.py b/src/ac_training_lab/zaber_linear_rail/__init__.py new file mode 100644 index 00000000..45a98f43 --- /dev/null +++ b/src/ac_training_lab/zaber_linear_rail/__init__.py @@ -0,0 +1,42 @@ +""" +Zaber Linear Rail Control Module. + +Provides programmatic control of Zaber linear rails via Python with Prefect workflow +orchestration support. + +Example: + from ac_training_lab.zaber_linear_rail import move_to_position, home_axis + + home_axis() + move_to_position(100.0) # Move to 100mm +""" + +from ac_training_lab.zaber_linear_rail.config import ( + DEFAULT_ACCELERATION, + DEFAULT_VELOCITY, + SERIAL_PORT, +) +from ac_training_lab.zaber_linear_rail.rail_controller import ( + connect, + disconnect, + get_device_info, + get_position, + home_axis, + move_relative, + move_to_position, + stop_movement, +) + +__all__ = [ + "connect", + "disconnect", + "home_axis", + "move_to_position", + "move_relative", + "get_position", + "stop_movement", + "get_device_info", + "SERIAL_PORT", + "DEFAULT_VELOCITY", + "DEFAULT_ACCELERATION", +] diff --git a/src/ac_training_lab/zaber_linear_rail/config.py b/src/ac_training_lab/zaber_linear_rail/config.py new file mode 100644 index 00000000..af43eef2 --- /dev/null +++ b/src/ac_training_lab/zaber_linear_rail/config.py @@ -0,0 +1,101 @@ +""" +Configuration settings for Zaber linear rail control. + +Edit these values during initial setup to match your hardware and environment. +""" + +import os + +# ============================================================================= +# CONNECTION CONFIGURATION +# ============================================================================= +# Serial port where Zaber device is connected +# On Raspberry Pi, typically /dev/ttyUSB0 or /dev/ttyACM0 +SERIAL_PORT = os.getenv("ZABER_SERIAL_PORT", "/dev/ttyUSB0") + +# Alternative connection methods +TCP_HOST = os.getenv("ZABER_TCP_HOST", "") +TCP_PORT = int(os.getenv("ZABER_TCP_PORT", "55550")) + +# IoT connection (for Zaber Cloud-connected devices) +IOT_DEVICE_ID = os.getenv("ZABER_IOT_DEVICE_ID", "") +IOT_TOKEN = os.getenv("ZABER_IOT_TOKEN", "") + + +# ============================================================================= +# DEVICE CONFIGURATION +# ============================================================================= +# Device and axis indices (1-based indexing as per Zaber convention) +DEVICE_INDEX = int(os.getenv("ZABER_DEVICE_INDEX", "0")) # Index in device list +AXIS_NUMBER = int(os.getenv("ZABER_AXIS_NUMBER", "1")) # Axis number on device + + +# ============================================================================= +# MOTION PARAMETERS +# ============================================================================= +# Default velocity in mm/s (adjust based on your rail specifications) +DEFAULT_VELOCITY = float(os.getenv("ZABER_DEFAULT_VELOCITY", "50.0")) + +# Default acceleration in mm/s² (adjust based on your rail specifications) +DEFAULT_ACCELERATION = float(os.getenv("ZABER_DEFAULT_ACCELERATION", "100.0")) + +# Position limits in mm (set based on your rail travel range) +MIN_POSITION = float(os.getenv("ZABER_MIN_POSITION", "0.0")) +MAX_POSITION = float(os.getenv("ZABER_MAX_POSITION", "500.0")) + + +# ============================================================================= +# SAFETY SETTINGS +# ============================================================================= +# Timeout for movement operations in seconds +MOVEMENT_TIMEOUT = float(os.getenv("ZABER_MOVEMENT_TIMEOUT", "60.0")) + +# Homing timeout in seconds +HOMING_TIMEOUT = float(os.getenv("ZABER_HOMING_TIMEOUT", "120.0")) + + +# ============================================================================= +# STATE MANAGEMENT +# ============================================================================= +# File to store persistent state +STATE_FILE = os.getenv("ZABER_STATE_FILE", "zaber_state.json") + + +# ============================================================================= +# VALIDATION +# ============================================================================= +def validate_config(): + """Validate configuration values.""" + errors = [] + + # Validate position limits + if MIN_POSITION >= MAX_POSITION: + errors.append( + f"MIN_POSITION ({MIN_POSITION}) must be less than " + f"MAX_POSITION ({MAX_POSITION})" + ) + + # Validate velocity + if DEFAULT_VELOCITY <= 0: + errors.append(f"DEFAULT_VELOCITY ({DEFAULT_VELOCITY}) must be positive") + + # Validate acceleration + if DEFAULT_ACCELERATION <= 0: + errors.append(f"DEFAULT_ACCELERATION ({DEFAULT_ACCELERATION}) must be positive") + + # Validate timeouts + if MOVEMENT_TIMEOUT <= 0: + errors.append(f"MOVEMENT_TIMEOUT ({MOVEMENT_TIMEOUT}) must be positive") + if HOMING_TIMEOUT <= 0: + errors.append(f"HOMING_TIMEOUT ({HOMING_TIMEOUT}) must be positive") + + if errors: + raise ValueError( + "Configuration validation failed:\n" + "\n".join(f" - {e}" for e in errors) + ) + + return True + + +# Validate on import +validate_config() diff --git a/src/ac_training_lab/zaber_linear_rail/prefect_flows.py b/src/ac_training_lab/zaber_linear_rail/prefect_flows.py new file mode 100644 index 00000000..ded8abc6 --- /dev/null +++ b/src/ac_training_lab/zaber_linear_rail/prefect_flows.py @@ -0,0 +1,393 @@ +""" +Prefect flows for Zaber linear rail control. + +Provides workflow orchestration using Prefect for remote rail control. +""" + +from prefect import flow, task +from prefect.logging import get_run_logger + +from ac_training_lab.zaber_linear_rail.rail_controller import ( + connect, + disconnect, + get_device_info, + get_position, + home_axis, + move_relative, + move_to_position, + stop_movement, +) + +# Wrap core functions as tasks +connect_task = task(connect) +disconnect_task = task(disconnect) +home_axis_task = task(home_axis) +move_to_position_task = task(move_to_position) +move_relative_task = task(move_relative) +get_position_task = task(get_position) +stop_movement_task = task(stop_movement) +get_device_info_task = task(get_device_info) + + +@flow +def move_to_position_flow( + target_position: float, + serial_port: str = None, + velocity: float = None, + home_first: bool = False, +) -> dict: + """ + Prefect flow for moving rail to a specific position. + + Args: + target_position: Target position in mm + serial_port: Optional serial port override + velocity: Optional velocity in mm/s + home_first: Whether to home before moving + + Returns: + dict with movement results + """ + logger = get_run_logger() + logger.info("=== MOVE TO POSITION FLOW ===") + logger.info(f"Target: {target_position} mm") + + # Connect to device + connection_info = connect_task(serial_port=serial_port) + logger.info(f"Connected to: {connection_info['device_name']}") + + try: + # Home if requested + if home_first: + logger.info("Homing axis first...") + home_result = home_axis_task() + logger.info(f"Homed. Current position: {home_result['position']:.2f} mm") + + # Get initial position + initial = get_position_task() + logger.info(f"Initial position: {initial['position']:.2f} mm") + + # Execute movement + result = move_to_position_task(target_position, velocity=velocity) + + start = result["start_position"] + final = result["final_position"] + logger.info(f"Movement complete: {start:.2f} -> {final:.2f} mm") + logger.info(f"Distance moved: {result['distance']:.2f} mm") + + return { + "success": True, + "movement": result, + "connection": connection_info, + } + + finally: + # Always disconnect + disconnect_task() + logger.info("Disconnected") + + +@flow +def move_relative_flow( + distance: float, + serial_port: str = None, + velocity: float = None, +) -> dict: + """ + Prefect flow for moving rail by a relative distance. + + Args: + distance: Distance to move in mm (positive or negative) + serial_port: Optional serial port override + velocity: Optional velocity in mm/s + + Returns: + dict with movement results + """ + logger = get_run_logger() + logger.info("=== MOVE RELATIVE FLOW ===") + logger.info(f"Distance: {distance:+.2f} mm") + + # Connect to device + connection_info = connect_task(serial_port=serial_port) + logger.info(f"Connected to: {connection_info['device_name']}") + + try: + # Get initial position + initial = get_position_task() + logger.info(f"Initial position: {initial['position']:.2f} mm") + + # Execute movement + result = move_relative_task(distance, velocity=velocity) + + start = result["start_position"] + final = result["final_position"] + logger.info(f"Movement complete: {start:.2f} -> {final:.2f} mm") + logger.info(f"Actual distance: {result['actual_distance']:.2f} mm") + + return { + "success": True, + "movement": result, + "connection": connection_info, + } + + finally: + disconnect_task() + logger.info("Disconnected") + + +@flow +def home_flow(serial_port: str = None) -> dict: + """ + Prefect flow for homing the rail. + + Args: + serial_port: Optional serial port override + + Returns: + dict with homing results + """ + logger = get_run_logger() + logger.info("=== HOME FLOW ===") + + # Connect to device + connection_info = connect_task(serial_port=serial_port) + logger.info(f"Connected to: {connection_info['device_name']}") + + try: + # Home the axis + result = home_axis_task() + logger.info(f"Homing complete. Position: {result['position']:.2f} mm") + + return { + "success": True, + "homing": result, + "connection": connection_info, + } + + finally: + disconnect_task() + logger.info("Disconnected") + + +@flow +def get_status_flow(serial_port: str = None) -> dict: + """ + Prefect flow for getting rail status. + + Args: + serial_port: Optional serial port override + + Returns: + dict with status info + """ + logger = get_run_logger() + logger.info("=== STATUS FLOW ===") + + # Connect to device + connection_info = connect_task(serial_port=serial_port) + logger.info(f"Connected to: {connection_info['device_name']}") + + try: + # Get device info + device_info = get_device_info_task() + + logger.info(f"Device: {device_info['device_name']}") + logger.info(f"Position: {device_info['current_position']:.2f} mm") + logger.info(f"Max speed: {device_info['max_speed']:.2f} mm/s") + logger.info(f"Acceleration: {device_info['acceleration']:.2f} mm/s²") + + return { + "success": True, + "device_info": device_info, + "connection": connection_info, + } + + finally: + disconnect_task() + logger.info("Disconnected") + + +@flow +def sequence_flow( + positions: list[float], + serial_port: str = None, + velocity: float = None, + home_first: bool = True, +) -> dict: + """ + Prefect flow for moving through a sequence of positions. + + Args: + positions: List of positions to visit in mm + serial_port: Optional serial port override + velocity: Optional velocity in mm/s + home_first: Whether to home before starting sequence + + Returns: + dict with sequence results + """ + logger = get_run_logger() + logger.info("=== SEQUENCE FLOW ===") + logger.info(f"Positions: {positions}") + + # Connect to device + connection_info = connect_task(serial_port=serial_port) + logger.info(f"Connected to: {connection_info['device_name']}") + + results = [] + + try: + # Home if requested + if home_first: + logger.info("Homing axis first...") + home_result = home_axis_task() + logger.info(f"Homed. Position: {home_result['position']:.2f} mm") + + # Execute sequence + for i, position in enumerate(positions): + logger.info(f"Step {i+1}/{len(positions)}: Moving to {position} mm") + result = move_to_position_task(position, velocity=velocity) + results.append(result) + logger.info(f" Reached: {result['final_position']:.2f} mm") + + logger.info(f"Sequence complete. Visited {len(results)} positions.") + + return { + "success": True, + "steps_completed": len(results), + "results": results, + "connection": connection_info, + } + + finally: + disconnect_task() + logger.info("Disconnected") + + +# ============================================================================= +# DEPLOYMENT FUNCTIONS +# ============================================================================= + + +def deploy_move_to_position( + work_pool_name: str = "zaber-linear-rail-pool", + deployment_name: str = "move-to-position", +) -> str: + """Deploy the move-to-position flow.""" + from pathlib import Path + + source_dir = Path(__file__).parent.parent.parent + + entrypoint = ( + "ac_training_lab/zaber_linear_rail/prefect_flows.py:move_to_position_flow" + ) + move_to_position_flow.from_source( + source=str(source_dir), + entrypoint=entrypoint, + ).deploy( + name=deployment_name, + work_pool_name=work_pool_name, + description="Move Zaber linear rail to an absolute position", + ) + + print(f"✅ Deployed '{deployment_name}'") + run_cmd = f"prefect deployment run 'move-to-position-flow/{deployment_name}'" + print(f"Run: {run_cmd} --param target_position=100.0") + return deployment_name + + +def deploy_home( + work_pool_name: str = "zaber-linear-rail-pool", + deployment_name: str = "home", +) -> str: + """Deploy the home flow.""" + from pathlib import Path + + source_dir = Path(__file__).parent.parent.parent + + home_flow.from_source( + source=str(source_dir), + entrypoint="ac_training_lab/zaber_linear_rail/prefect_flows.py:home_flow", + ).deploy( + name=deployment_name, + work_pool_name=work_pool_name, + description="Home Zaber linear rail axis", + ) + + print(f"✅ Deployed '{deployment_name}'") + print(f"Run: prefect deployment run 'home-flow/{deployment_name}'") + return deployment_name + + +def deploy_sequence( + work_pool_name: str = "zaber-linear-rail-pool", + deployment_name: str = "sequence", +) -> str: + """Deploy the sequence flow.""" + from pathlib import Path + + source_dir = Path(__file__).parent.parent.parent + + sequence_flow.from_source( + source=str(source_dir), + entrypoint="ac_training_lab/zaber_linear_rail/prefect_flows.py:sequence_flow", + ).deploy( + name=deployment_name, + work_pool_name=work_pool_name, + description="Execute a sequence of positions on Zaber linear rail", + ) + + print(f"✅ Deployed '{deployment_name}'") + run_cmd = f"prefect deployment run 'sequence-flow/{deployment_name}'" + print(f"Run: {run_cmd} --param positions='[0, 100, 200, 100, 0]'") + return deployment_name + + +def deploy_all_flows(work_pool_name: str = "zaber-linear-rail-pool") -> bool: + """Deploy all Zaber linear rail flows.""" + print("=== DEPLOYING ZABER LINEAR RAIL FLOWS ===") + print(f"Work pool: {work_pool_name}") + + deploy_move_to_position(work_pool_name) + deploy_home(work_pool_name) + deploy_sequence(work_pool_name) + + print("\n🎉 All deployments created!") + print("\nAvailable flows:") + print(" 1. move-to-position - Move to absolute position") + print(" 2. home - Home the axis") + print(" 3. sequence - Execute position sequence") + print(f"\nStart worker: prefect worker start --pool {work_pool_name}") + + return True + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + cmd = sys.argv[1] + if cmd == "deploy": + pool = sys.argv[2] if len(sys.argv) > 2 else "zaber-linear-rail-pool" + deploy_all_flows(pool) + elif cmd == "status": + get_status_flow() + elif cmd == "home": + home_flow() + elif cmd == "move": + if len(sys.argv) < 3: + print("Usage: python prefect_flows.py move ") + sys.exit(1) + position = float(sys.argv[2]) + move_to_position_flow(position) + else: + print("Usage: python prefect_flows.py [deploy|status|home|move ]") + else: + print("Usage: python prefect_flows.py [deploy|status|home|move ]") + print("\nExamples:") + print(" python prefect_flows.py deploy # Deploy all flows") + print(" python prefect_flows.py deploy my-pool # Deploy with custom pool") + print(" python prefect_flows.py status # Get device status") + print(" python prefect_flows.py home # Home the axis") + print(" python prefect_flows.py move 100 # Move to 100mm") diff --git a/src/ac_training_lab/zaber_linear_rail/rail_controller.py b/src/ac_training_lab/zaber_linear_rail/rail_controller.py new file mode 100644 index 00000000..f51f3c69 --- /dev/null +++ b/src/ac_training_lab/zaber_linear_rail/rail_controller.py @@ -0,0 +1,328 @@ +""" +Core controller for Zaber linear rail. + +Provides low-level functions for connecting to and controlling a Zaber linear rail. +Uses the zaber-motion library for communication. +""" + +import json +from pathlib import Path +from typing import Optional + +from zaber_motion import Library +from zaber_motion.ascii import Axis, Connection + +from ac_training_lab.zaber_linear_rail.config import ( + AXIS_NUMBER, + DEFAULT_ACCELERATION, + DEFAULT_VELOCITY, + DEVICE_INDEX, + IOT_DEVICE_ID, + IOT_TOKEN, + MAX_POSITION, + MIN_POSITION, + SERIAL_PORT, + STATE_FILE, + TCP_HOST, + TCP_PORT, +) + +# Global connection and axis references +_connection: Optional[Connection] = None +_axis: Optional[Axis] = None + + +def _load_state() -> dict: + """Load state from file.""" + state_path = Path(STATE_FILE) + if state_path.exists(): + with open(state_path) as f: + return json.load(f) + return {"last_position": None, "is_homed": False} + + +def _save_state(state: dict) -> None: + """Save state to file.""" + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + + +def connect( + serial_port: Optional[str] = None, + tcp_host: Optional[str] = None, + tcp_port: Optional[int] = None, +) -> dict: + """ + Connect to Zaber device. + + Connection priority: + 1. TCP if tcp_host provided + 2. IoT if IOT_DEVICE_ID configured + 3. Serial port (default) + + Args: + serial_port: Override serial port (default: from config) + tcp_host: TCP host for network connection + tcp_port: TCP port for network connection + + Returns: + dict with connection info + """ + global _connection, _axis + + Library.enable_device_db_store() + + # Determine connection method + if tcp_host or TCP_HOST: + host = tcp_host or TCP_HOST + port = tcp_port or TCP_PORT + print(f"Connecting via TCP to {host}:{port}...") + _connection = Connection.open_tcp(host, port) + elif IOT_DEVICE_ID: + print(f"Connecting via IoT to device {IOT_DEVICE_ID}...") + _connection = Connection.open_iot(IOT_DEVICE_ID, IOT_TOKEN or "unauthenticated") + else: + port = serial_port or SERIAL_PORT + print(f"Connecting via serial port {port}...") + _connection = Connection.open_serial_port(port) + + # Detect devices + device_list = _connection.detect_devices() + if not device_list: + _connection.close() + _connection = None + raise RuntimeError("No Zaber devices found") + + print(f"Found {len(device_list)} device(s)") + + # Get the configured device + if DEVICE_INDEX >= len(device_list): + _connection.close() + _connection = None + raise RuntimeError( + f"Device index {DEVICE_INDEX} out of range " + f"(found {len(device_list)} devices)" + ) + + device = device_list[DEVICE_INDEX] + _axis = device.get_axis(AXIS_NUMBER) + + # Apply default settings + _axis.settings.set("maxspeed", DEFAULT_VELOCITY, "mm/s") + _axis.settings.set("accel", DEFAULT_ACCELERATION, "mm/s^2") + + return { + "connected": True, + "device_name": device.name, + "device_id": device.device_id, + "axis_count": device.axis_count, + } + + +def disconnect() -> dict: + """ + Disconnect from Zaber device. + + Returns: + dict with status + """ + global _connection, _axis + + if _connection: + _connection.close() + _connection = None + _axis = None + return {"disconnected": True} + + return {"disconnected": False, "message": "No active connection"} + + +def _ensure_connected() -> None: + """Ensure device is connected, raise if not.""" + if _connection is None or _axis is None: + raise RuntimeError("Not connected. Call connect() first.") + + +def home_axis() -> dict: + """ + Home the axis to find reference position. + + Returns: + dict with homing result + """ + _ensure_connected() + + print("Homing axis...") + _axis.home() + _axis.wait_until_idle() + + position = _axis.get_position("mm") + + # Update state + state = _load_state() + state["is_homed"] = True + state["last_position"] = position + _save_state(state) + + print(f"Homing complete. Position: {position:.2f} mm") + + return {"success": True, "position": position, "homed": True} + + +def move_to_position(target_position: float, velocity: Optional[float] = None) -> dict: + """ + Move axis to absolute position. + + Args: + target_position: Target position in mm + velocity: Optional velocity override in mm/s + + Returns: + dict with movement result + """ + _ensure_connected() + + # Validate position + if not (MIN_POSITION <= target_position <= MAX_POSITION): + raise ValueError( + f"Target position {target_position} mm is out of range " + f"[{MIN_POSITION}, {MAX_POSITION}]" + ) + + # Get starting position + start_position = _axis.get_position("mm") + + # Set velocity if provided + if velocity: + _axis.settings.set("maxspeed", velocity, "mm/s") + + print(f"Moving from {start_position:.2f} mm to {target_position:.2f} mm...") + _axis.move_absolute(target_position, "mm") + _axis.wait_until_idle() + + # Get final position + final_position = _axis.get_position("mm") + + # Update state + state = _load_state() + state["last_position"] = final_position + _save_state(state) + + print(f"Movement complete. Final position: {final_position:.2f} mm") + + return { + "success": True, + "start_position": start_position, + "target_position": target_position, + "final_position": final_position, + "distance": abs(final_position - start_position), + } + + +def move_relative(distance: float, velocity: Optional[float] = None) -> dict: + """ + Move axis by relative distance. + + Args: + distance: Distance to move in mm (positive or negative) + velocity: Optional velocity override in mm/s + + Returns: + dict with movement result + """ + _ensure_connected() + + # Get current position + current_position = _axis.get_position("mm") + target_position = current_position + distance + + # Validate target position + if not (MIN_POSITION <= target_position <= MAX_POSITION): + raise ValueError( + f"Target position {target_position} mm would be out of range " + f"[{MIN_POSITION}, {MAX_POSITION}]" + ) + + # Set velocity if provided + if velocity: + _axis.settings.set("maxspeed", velocity, "mm/s") + + print(f"Moving {distance:+.2f} mm from {current_position:.2f} mm...") + _axis.move_relative(distance, "mm") + _axis.wait_until_idle() + + # Get final position + final_position = _axis.get_position("mm") + + # Update state + state = _load_state() + state["last_position"] = final_position + _save_state(state) + + print(f"Movement complete. Final position: {final_position:.2f} mm") + + return { + "success": True, + "start_position": current_position, + "distance_requested": distance, + "final_position": final_position, + "actual_distance": final_position - current_position, + } + + +def get_position() -> dict: + """ + Get current axis position. + + Returns: + dict with position info + """ + _ensure_connected() + + position = _axis.get_position("mm") + is_busy = _axis.is_busy() + + return {"position": position, "busy": is_busy} + + +def stop_movement() -> dict: + """ + Stop current movement immediately. + + Returns: + dict with stop result + """ + _ensure_connected() + + _axis.stop() + position = _axis.get_position("mm") + + return {"stopped": True, "position": position} + + +def get_device_info() -> dict: + """ + Get device information. + + Returns: + dict with device info + """ + _ensure_connected() + + device = _axis.device + + # Get current settings + max_speed = _axis.settings.get("maxspeed", "mm/s") + accel = _axis.settings.get("accel", "mm/s^2") + position = _axis.get_position("mm") + + return { + "device_name": device.name, + "device_id": device.device_id, + "axis_number": AXIS_NUMBER, + "axis_count": device.axis_count, + "current_position": position, + "max_speed": max_speed, + "acceleration": accel, + "position_limits": {"min": MIN_POSITION, "max": MAX_POSITION}, + } From e7c06f1ac0b7c3b98e897bb6f701bc01aeada15b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 26 Nov 2025 06:53:33 +0000 Subject: [PATCH 3/5] Address code review feedback: add encoding, fix types, clarify comments Co-authored-by: sgbaird <45469701+sgbaird@users.noreply.github.com> --- .../zaber_linear_rail/config.py | 2 +- .../zaber_linear_rail/prefect_flows.py | 18 ++++++++++-------- .../zaber_linear_rail/rail_controller.py | 4 ++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/ac_training_lab/zaber_linear_rail/config.py b/src/ac_training_lab/zaber_linear_rail/config.py index af43eef2..ee9897a5 100644 --- a/src/ac_training_lab/zaber_linear_rail/config.py +++ b/src/ac_training_lab/zaber_linear_rail/config.py @@ -25,7 +25,7 @@ # ============================================================================= # DEVICE CONFIGURATION # ============================================================================= -# Device and axis indices (1-based indexing as per Zaber convention) +# Device index is 0-based (for device_list), axis number is 1-based (Zaber) DEVICE_INDEX = int(os.getenv("ZABER_DEVICE_INDEX", "0")) # Index in device list AXIS_NUMBER = int(os.getenv("ZABER_AXIS_NUMBER", "1")) # Axis number on device diff --git a/src/ac_training_lab/zaber_linear_rail/prefect_flows.py b/src/ac_training_lab/zaber_linear_rail/prefect_flows.py index ded8abc6..03de017b 100644 --- a/src/ac_training_lab/zaber_linear_rail/prefect_flows.py +++ b/src/ac_training_lab/zaber_linear_rail/prefect_flows.py @@ -4,6 +4,8 @@ Provides workflow orchestration using Prefect for remote rail control. """ +from typing import Optional + from prefect import flow, task from prefect.logging import get_run_logger @@ -32,8 +34,8 @@ @flow def move_to_position_flow( target_position: float, - serial_port: str = None, - velocity: float = None, + serial_port: Optional[str] = None, + velocity: Optional[float] = None, home_first: bool = False, ) -> dict: """ @@ -90,8 +92,8 @@ def move_to_position_flow( @flow def move_relative_flow( distance: float, - serial_port: str = None, - velocity: float = None, + serial_port: Optional[str] = None, + velocity: Optional[float] = None, ) -> dict: """ Prefect flow for moving rail by a relative distance. @@ -137,7 +139,7 @@ def move_relative_flow( @flow -def home_flow(serial_port: str = None) -> dict: +def home_flow(serial_port: Optional[str] = None) -> dict: """ Prefect flow for homing the rail. @@ -171,7 +173,7 @@ def home_flow(serial_port: str = None) -> dict: @flow -def get_status_flow(serial_port: str = None) -> dict: +def get_status_flow(serial_port: Optional[str] = None) -> dict: """ Prefect flow for getting rail status. @@ -211,8 +213,8 @@ def get_status_flow(serial_port: str = None) -> dict: @flow def sequence_flow( positions: list[float], - serial_port: str = None, - velocity: float = None, + serial_port: Optional[str] = None, + velocity: Optional[float] = None, home_first: bool = True, ) -> dict: """ diff --git a/src/ac_training_lab/zaber_linear_rail/rail_controller.py b/src/ac_training_lab/zaber_linear_rail/rail_controller.py index f51f3c69..c2c502ce 100644 --- a/src/ac_training_lab/zaber_linear_rail/rail_controller.py +++ b/src/ac_training_lab/zaber_linear_rail/rail_controller.py @@ -36,14 +36,14 @@ def _load_state() -> dict: """Load state from file.""" state_path = Path(STATE_FILE) if state_path.exists(): - with open(state_path) as f: + with open(state_path, encoding="utf-8") as f: return json.load(f) return {"last_position": None, "is_homed": False} def _save_state(state: dict) -> None: """Save state to file.""" - with open(STATE_FILE, "w") as f: + with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump(state, f, indent=2) From bb650f2f94ef514af6b52035306c1a6e26f20a37 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 26 Nov 2025 18:06:26 +0000 Subject: [PATCH 4/5] Simplify Zaber module: USB-only, direct API wrappers, remove validation Co-authored-by: sgbaird <45469701+sgbaird@users.noreply.github.com> --- .../zaber_linear_rail/__init__.py | 32 +- .../zaber_linear_rail/config.py | 92 +---- .../zaber_linear_rail/prefect_flows.py | 380 +++--------------- .../zaber_linear_rail/rail_controller.py | 297 ++------------ 4 files changed, 100 insertions(+), 701 deletions(-) diff --git a/src/ac_training_lab/zaber_linear_rail/__init__.py b/src/ac_training_lab/zaber_linear_rail/__init__.py index 45a98f43..0f7ce82a 100644 --- a/src/ac_training_lab/zaber_linear_rail/__init__.py +++ b/src/ac_training_lab/zaber_linear_rail/__init__.py @@ -1,42 +1,34 @@ """ Zaber Linear Rail Control Module. -Provides programmatic control of Zaber linear rails via Python with Prefect workflow -orchestration support. +Simple wrappers around zaber-motion library for Prefect integration. Example: - from ac_training_lab.zaber_linear_rail import move_to_position, home_axis + from ac_training_lab.zaber_linear_rail import connect, home, move_absolute - home_axis() - move_to_position(100.0) # Move to 100mm + connect() + home() + move_absolute(100.0) # Move to 100mm """ -from ac_training_lab.zaber_linear_rail.config import ( - DEFAULT_ACCELERATION, - DEFAULT_VELOCITY, - SERIAL_PORT, -) +from ac_training_lab.zaber_linear_rail.config import SERIAL_PORT from ac_training_lab.zaber_linear_rail.rail_controller import ( connect, disconnect, - get_device_info, get_position, - home_axis, + home, + move_absolute, move_relative, - move_to_position, - stop_movement, + stop, ) __all__ = [ "connect", "disconnect", - "home_axis", - "move_to_position", + "home", + "move_absolute", "move_relative", "get_position", - "stop_movement", - "get_device_info", + "stop", "SERIAL_PORT", - "DEFAULT_VELOCITY", - "DEFAULT_ACCELERATION", ] diff --git a/src/ac_training_lab/zaber_linear_rail/config.py b/src/ac_training_lab/zaber_linear_rail/config.py index ee9897a5..fb9c81e8 100644 --- a/src/ac_training_lab/zaber_linear_rail/config.py +++ b/src/ac_training_lab/zaber_linear_rail/config.py @@ -6,96 +6,10 @@ import os -# ============================================================================= -# CONNECTION CONFIGURATION -# ============================================================================= -# Serial port where Zaber device is connected +# Serial port where Zaber device is connected (USB) # On Raspberry Pi, typically /dev/ttyUSB0 or /dev/ttyACM0 SERIAL_PORT = os.getenv("ZABER_SERIAL_PORT", "/dev/ttyUSB0") -# Alternative connection methods -TCP_HOST = os.getenv("ZABER_TCP_HOST", "") -TCP_PORT = int(os.getenv("ZABER_TCP_PORT", "55550")) - -# IoT connection (for Zaber Cloud-connected devices) -IOT_DEVICE_ID = os.getenv("ZABER_IOT_DEVICE_ID", "") -IOT_TOKEN = os.getenv("ZABER_IOT_TOKEN", "") - - -# ============================================================================= -# DEVICE CONFIGURATION -# ============================================================================= # Device index is 0-based (for device_list), axis number is 1-based (Zaber) -DEVICE_INDEX = int(os.getenv("ZABER_DEVICE_INDEX", "0")) # Index in device list -AXIS_NUMBER = int(os.getenv("ZABER_AXIS_NUMBER", "1")) # Axis number on device - - -# ============================================================================= -# MOTION PARAMETERS -# ============================================================================= -# Default velocity in mm/s (adjust based on your rail specifications) -DEFAULT_VELOCITY = float(os.getenv("ZABER_DEFAULT_VELOCITY", "50.0")) - -# Default acceleration in mm/s² (adjust based on your rail specifications) -DEFAULT_ACCELERATION = float(os.getenv("ZABER_DEFAULT_ACCELERATION", "100.0")) - -# Position limits in mm (set based on your rail travel range) -MIN_POSITION = float(os.getenv("ZABER_MIN_POSITION", "0.0")) -MAX_POSITION = float(os.getenv("ZABER_MAX_POSITION", "500.0")) - - -# ============================================================================= -# SAFETY SETTINGS -# ============================================================================= -# Timeout for movement operations in seconds -MOVEMENT_TIMEOUT = float(os.getenv("ZABER_MOVEMENT_TIMEOUT", "60.0")) - -# Homing timeout in seconds -HOMING_TIMEOUT = float(os.getenv("ZABER_HOMING_TIMEOUT", "120.0")) - - -# ============================================================================= -# STATE MANAGEMENT -# ============================================================================= -# File to store persistent state -STATE_FILE = os.getenv("ZABER_STATE_FILE", "zaber_state.json") - - -# ============================================================================= -# VALIDATION -# ============================================================================= -def validate_config(): - """Validate configuration values.""" - errors = [] - - # Validate position limits - if MIN_POSITION >= MAX_POSITION: - errors.append( - f"MIN_POSITION ({MIN_POSITION}) must be less than " - f"MAX_POSITION ({MAX_POSITION})" - ) - - # Validate velocity - if DEFAULT_VELOCITY <= 0: - errors.append(f"DEFAULT_VELOCITY ({DEFAULT_VELOCITY}) must be positive") - - # Validate acceleration - if DEFAULT_ACCELERATION <= 0: - errors.append(f"DEFAULT_ACCELERATION ({DEFAULT_ACCELERATION}) must be positive") - - # Validate timeouts - if MOVEMENT_TIMEOUT <= 0: - errors.append(f"MOVEMENT_TIMEOUT ({MOVEMENT_TIMEOUT}) must be positive") - if HOMING_TIMEOUT <= 0: - errors.append(f"HOMING_TIMEOUT ({HOMING_TIMEOUT}) must be positive") - - if errors: - raise ValueError( - "Configuration validation failed:\n" + "\n".join(f" - {e}" for e in errors) - ) - - return True - - -# Validate on import -validate_config() +DEVICE_INDEX = int(os.getenv("ZABER_DEVICE_INDEX", "0")) +AXIS_NUMBER = int(os.getenv("ZABER_AXIS_NUMBER", "1")) diff --git a/src/ac_training_lab/zaber_linear_rail/prefect_flows.py b/src/ac_training_lab/zaber_linear_rail/prefect_flows.py index 03de017b..75cb380a 100644 --- a/src/ac_training_lab/zaber_linear_rail/prefect_flows.py +++ b/src/ac_training_lab/zaber_linear_rail/prefect_flows.py @@ -1,395 +1,131 @@ """ Prefect flows for Zaber linear rail control. -Provides workflow orchestration using Prefect for remote rail control. +Simple wrappers around Zaber API calls with Prefect task/flow decorators. """ from typing import Optional from prefect import flow, task -from prefect.logging import get_run_logger from ac_training_lab.zaber_linear_rail.rail_controller import ( connect, disconnect, - get_device_info, get_position, - home_axis, + home, + move_absolute, move_relative, - move_to_position, - stop_movement, + stop, ) -# Wrap core functions as tasks +# Wrap functions as tasks connect_task = task(connect) disconnect_task = task(disconnect) -home_axis_task = task(home_axis) -move_to_position_task = task(move_to_position) +home_task = task(home) +move_absolute_task = task(move_absolute) move_relative_task = task(move_relative) get_position_task = task(get_position) -stop_movement_task = task(stop_movement) -get_device_info_task = task(get_device_info) +stop_task = task(stop) @flow -def move_to_position_flow( - target_position: float, - serial_port: Optional[str] = None, - velocity: Optional[float] = None, - home_first: bool = False, -) -> dict: - """ - Prefect flow for moving rail to a specific position. - - Args: - target_position: Target position in mm - serial_port: Optional serial port override - velocity: Optional velocity in mm/s - home_first: Whether to home before moving - - Returns: - dict with movement results - """ - logger = get_run_logger() - logger.info("=== MOVE TO POSITION FLOW ===") - logger.info(f"Target: {target_position} mm") - - # Connect to device - connection_info = connect_task(serial_port=serial_port) - logger.info(f"Connected to: {connection_info['device_name']}") - +def home_flow(serial_port: Optional[str] = None) -> float: + """Home the axis. Returns position in mm.""" + connect_task(serial_port=serial_port) try: - # Home if requested - if home_first: - logger.info("Homing axis first...") - home_result = home_axis_task() - logger.info(f"Homed. Current position: {home_result['position']:.2f} mm") - - # Get initial position - initial = get_position_task() - logger.info(f"Initial position: {initial['position']:.2f} mm") - - # Execute movement - result = move_to_position_task(target_position, velocity=velocity) - - start = result["start_position"] - final = result["final_position"] - logger.info(f"Movement complete: {start:.2f} -> {final:.2f} mm") - logger.info(f"Distance moved: {result['distance']:.2f} mm") - - return { - "success": True, - "movement": result, - "connection": connection_info, - } - + return home_task() finally: - # Always disconnect disconnect_task() - logger.info("Disconnected") @flow -def move_relative_flow( - distance: float, - serial_port: Optional[str] = None, - velocity: Optional[float] = None, -) -> dict: - """ - Prefect flow for moving rail by a relative distance. - - Args: - distance: Distance to move in mm (positive or negative) - serial_port: Optional serial port override - velocity: Optional velocity in mm/s - - Returns: - dict with movement results - """ - logger = get_run_logger() - logger.info("=== MOVE RELATIVE FLOW ===") - logger.info(f"Distance: {distance:+.2f} mm") - - # Connect to device - connection_info = connect_task(serial_port=serial_port) - logger.info(f"Connected to: {connection_info['device_name']}") - +def move_absolute_flow( + position: float, unit: str = "mm", serial_port: Optional[str] = None +) -> float: + """Move to absolute position. Returns final position.""" + connect_task(serial_port=serial_port) try: - # Get initial position - initial = get_position_task() - logger.info(f"Initial position: {initial['position']:.2f} mm") - - # Execute movement - result = move_relative_task(distance, velocity=velocity) - - start = result["start_position"] - final = result["final_position"] - logger.info(f"Movement complete: {start:.2f} -> {final:.2f} mm") - logger.info(f"Actual distance: {result['actual_distance']:.2f} mm") - - return { - "success": True, - "movement": result, - "connection": connection_info, - } - + return move_absolute_task(position, unit) finally: disconnect_task() - logger.info("Disconnected") @flow -def home_flow(serial_port: Optional[str] = None) -> dict: - """ - Prefect flow for homing the rail. - - Args: - serial_port: Optional serial port override - - Returns: - dict with homing results - """ - logger = get_run_logger() - logger.info("=== HOME FLOW ===") - - # Connect to device - connection_info = connect_task(serial_port=serial_port) - logger.info(f"Connected to: {connection_info['device_name']}") - +def move_relative_flow( + distance: float, unit: str = "mm", serial_port: Optional[str] = None +) -> float: + """Move by relative distance. Returns final position.""" + connect_task(serial_port=serial_port) try: - # Home the axis - result = home_axis_task() - logger.info(f"Homing complete. Position: {result['position']:.2f} mm") - - return { - "success": True, - "homing": result, - "connection": connection_info, - } - + return move_relative_task(distance, unit) finally: disconnect_task() - logger.info("Disconnected") @flow -def get_status_flow(serial_port: Optional[str] = None) -> dict: - """ - Prefect flow for getting rail status. - - Args: - serial_port: Optional serial port override - - Returns: - dict with status info - """ - logger = get_run_logger() - logger.info("=== STATUS FLOW ===") - - # Connect to device - connection_info = connect_task(serial_port=serial_port) - logger.info(f"Connected to: {connection_info['device_name']}") - +def get_position_flow(unit: str = "mm", serial_port: Optional[str] = None) -> float: + """Get current position.""" + connect_task(serial_port=serial_port) try: - # Get device info - device_info = get_device_info_task() - - logger.info(f"Device: {device_info['device_name']}") - logger.info(f"Position: {device_info['current_position']:.2f} mm") - logger.info(f"Max speed: {device_info['max_speed']:.2f} mm/s") - logger.info(f"Acceleration: {device_info['acceleration']:.2f} mm/s²") - - return { - "success": True, - "device_info": device_info, - "connection": connection_info, - } - + return get_position_task(unit) finally: disconnect_task() - logger.info("Disconnected") @flow def sequence_flow( positions: list[float], + unit: str = "mm", serial_port: Optional[str] = None, - velocity: Optional[float] = None, home_first: bool = True, -) -> dict: - """ - Prefect flow for moving through a sequence of positions. - - Args: - positions: List of positions to visit in mm - serial_port: Optional serial port override - velocity: Optional velocity in mm/s - home_first: Whether to home before starting sequence - - Returns: - dict with sequence results - """ - logger = get_run_logger() - logger.info("=== SEQUENCE FLOW ===") - logger.info(f"Positions: {positions}") - - # Connect to device - connection_info = connect_task(serial_port=serial_port) - logger.info(f"Connected to: {connection_info['device_name']}") - - results = [] - +) -> list[float]: + """Move through sequence of positions. Returns list of final positions.""" + connect_task(serial_port=serial_port) try: - # Home if requested if home_first: - logger.info("Homing axis first...") - home_result = home_axis_task() - logger.info(f"Homed. Position: {home_result['position']:.2f} mm") - - # Execute sequence - for i, position in enumerate(positions): - logger.info(f"Step {i+1}/{len(positions)}: Moving to {position} mm") - result = move_to_position_task(position, velocity=velocity) - results.append(result) - logger.info(f" Reached: {result['final_position']:.2f} mm") - - logger.info(f"Sequence complete. Visited {len(results)} positions.") - - return { - "success": True, - "steps_completed": len(results), - "results": results, - "connection": connection_info, - } - + home_task() + results = [] + for pos in positions: + results.append(move_absolute_task(pos, unit)) + return results finally: disconnect_task() - logger.info("Disconnected") - - -# ============================================================================= -# DEPLOYMENT FUNCTIONS -# ============================================================================= -def deploy_move_to_position( - work_pool_name: str = "zaber-linear-rail-pool", - deployment_name: str = "move-to-position", -) -> str: - """Deploy the move-to-position flow.""" - from pathlib import Path - - source_dir = Path(__file__).parent.parent.parent - - entrypoint = ( - "ac_training_lab/zaber_linear_rail/prefect_flows.py:move_to_position_flow" - ) - move_to_position_flow.from_source( - source=str(source_dir), - entrypoint=entrypoint, - ).deploy( - name=deployment_name, - work_pool_name=work_pool_name, - description="Move Zaber linear rail to an absolute position", - ) - - print(f"✅ Deployed '{deployment_name}'") - run_cmd = f"prefect deployment run 'move-to-position-flow/{deployment_name}'" - print(f"Run: {run_cmd} --param target_position=100.0") - return deployment_name - - -def deploy_home( - work_pool_name: str = "zaber-linear-rail-pool", - deployment_name: str = "home", -) -> str: - """Deploy the home flow.""" +# Deployment functions +def deploy_all_flows(work_pool_name: str = "zaber-linear-rail-pool") -> None: + """Deploy all flows to Prefect.""" from pathlib import Path source_dir = Path(__file__).parent.parent.parent + base = "ac_training_lab/zaber_linear_rail/prefect_flows.py" home_flow.from_source( - source=str(source_dir), - entrypoint="ac_training_lab/zaber_linear_rail/prefect_flows.py:home_flow", - ).deploy( - name=deployment_name, - work_pool_name=work_pool_name, - description="Home Zaber linear rail axis", - ) + source=str(source_dir), entrypoint=f"{base}:home_flow" + ).deploy(name="home", work_pool_name=work_pool_name) - print(f"✅ Deployed '{deployment_name}'") - print(f"Run: prefect deployment run 'home-flow/{deployment_name}'") - return deployment_name + move_absolute_flow.from_source( + source=str(source_dir), entrypoint=f"{base}:move_absolute_flow" + ).deploy(name="move-absolute", work_pool_name=work_pool_name) + move_relative_flow.from_source( + source=str(source_dir), entrypoint=f"{base}:move_relative_flow" + ).deploy(name="move-relative", work_pool_name=work_pool_name) -def deploy_sequence( - work_pool_name: str = "zaber-linear-rail-pool", - deployment_name: str = "sequence", -) -> str: - """Deploy the sequence flow.""" - from pathlib import Path - - source_dir = Path(__file__).parent.parent.parent + get_position_flow.from_source( + source=str(source_dir), entrypoint=f"{base}:get_position_flow" + ).deploy(name="get-position", work_pool_name=work_pool_name) sequence_flow.from_source( - source=str(source_dir), - entrypoint="ac_training_lab/zaber_linear_rail/prefect_flows.py:sequence_flow", - ).deploy( - name=deployment_name, - work_pool_name=work_pool_name, - description="Execute a sequence of positions on Zaber linear rail", - ) - - print(f"✅ Deployed '{deployment_name}'") - run_cmd = f"prefect deployment run 'sequence-flow/{deployment_name}'" - print(f"Run: {run_cmd} --param positions='[0, 100, 200, 100, 0]'") - return deployment_name - - -def deploy_all_flows(work_pool_name: str = "zaber-linear-rail-pool") -> bool: - """Deploy all Zaber linear rail flows.""" - print("=== DEPLOYING ZABER LINEAR RAIL FLOWS ===") - print(f"Work pool: {work_pool_name}") - - deploy_move_to_position(work_pool_name) - deploy_home(work_pool_name) - deploy_sequence(work_pool_name) - - print("\n🎉 All deployments created!") - print("\nAvailable flows:") - print(" 1. move-to-position - Move to absolute position") - print(" 2. home - Home the axis") - print(" 3. sequence - Execute position sequence") - print(f"\nStart worker: prefect worker start --pool {work_pool_name}") + source=str(source_dir), entrypoint=f"{base}:sequence_flow" + ).deploy(name="sequence", work_pool_name=work_pool_name) - return True + print(f"Deployed all flows to work pool: {work_pool_name}") if __name__ == "__main__": import sys - if len(sys.argv) > 1: - cmd = sys.argv[1] - if cmd == "deploy": - pool = sys.argv[2] if len(sys.argv) > 2 else "zaber-linear-rail-pool" - deploy_all_flows(pool) - elif cmd == "status": - get_status_flow() - elif cmd == "home": - home_flow() - elif cmd == "move": - if len(sys.argv) < 3: - print("Usage: python prefect_flows.py move ") - sys.exit(1) - position = float(sys.argv[2]) - move_to_position_flow(position) - else: - print("Usage: python prefect_flows.py [deploy|status|home|move ]") - else: - print("Usage: python prefect_flows.py [deploy|status|home|move ]") - print("\nExamples:") - print(" python prefect_flows.py deploy # Deploy all flows") - print(" python prefect_flows.py deploy my-pool # Deploy with custom pool") - print(" python prefect_flows.py status # Get device status") - print(" python prefect_flows.py home # Home the axis") - print(" python prefect_flows.py move 100 # Move to 100mm") + if len(sys.argv) > 1 and sys.argv[1] == "deploy": + pool = sys.argv[2] if len(sys.argv) > 2 else "zaber-linear-rail-pool" + deploy_all_flows(pool) diff --git a/src/ac_training_lab/zaber_linear_rail/rail_controller.py b/src/ac_training_lab/zaber_linear_rail/rail_controller.py index c2c502ce..a41fda1b 100644 --- a/src/ac_training_lab/zaber_linear_rail/rail_controller.py +++ b/src/ac_training_lab/zaber_linear_rail/rail_controller.py @@ -1,328 +1,85 @@ """ Core controller for Zaber linear rail. -Provides low-level functions for connecting to and controlling a Zaber linear rail. -Uses the zaber-motion library for communication. +Simple wrappers around zaber-motion library for Prefect task integration. """ -import json -from pathlib import Path from typing import Optional from zaber_motion import Library -from zaber_motion.ascii import Axis, Connection +from zaber_motion.ascii import Connection from ac_training_lab.zaber_linear_rail.config import ( AXIS_NUMBER, - DEFAULT_ACCELERATION, - DEFAULT_VELOCITY, DEVICE_INDEX, - IOT_DEVICE_ID, - IOT_TOKEN, - MAX_POSITION, - MIN_POSITION, SERIAL_PORT, - STATE_FILE, - TCP_HOST, - TCP_PORT, ) # Global connection and axis references _connection: Optional[Connection] = None -_axis: Optional[Axis] = None +_axis = None -def _load_state() -> dict: - """Load state from file.""" - state_path = Path(STATE_FILE) - if state_path.exists(): - with open(state_path, encoding="utf-8") as f: - return json.load(f) - return {"last_position": None, "is_homed": False} - - -def _save_state(state: dict) -> None: - """Save state to file.""" - with open(STATE_FILE, "w", encoding="utf-8") as f: - json.dump(state, f, indent=2) - - -def connect( - serial_port: Optional[str] = None, - tcp_host: Optional[str] = None, - tcp_port: Optional[int] = None, -) -> dict: - """ - Connect to Zaber device. - - Connection priority: - 1. TCP if tcp_host provided - 2. IoT if IOT_DEVICE_ID configured - 3. Serial port (default) - - Args: - serial_port: Override serial port (default: from config) - tcp_host: TCP host for network connection - tcp_port: TCP port for network connection - - Returns: - dict with connection info - """ +def connect(serial_port: Optional[str] = None) -> dict: + """Connect to Zaber device via USB serial.""" global _connection, _axis Library.enable_device_db_store() - - # Determine connection method - if tcp_host or TCP_HOST: - host = tcp_host or TCP_HOST - port = tcp_port or TCP_PORT - print(f"Connecting via TCP to {host}:{port}...") - _connection = Connection.open_tcp(host, port) - elif IOT_DEVICE_ID: - print(f"Connecting via IoT to device {IOT_DEVICE_ID}...") - _connection = Connection.open_iot(IOT_DEVICE_ID, IOT_TOKEN or "unauthenticated") - else: - port = serial_port or SERIAL_PORT - print(f"Connecting via serial port {port}...") - _connection = Connection.open_serial_port(port) - - # Detect devices + port = serial_port or SERIAL_PORT + _connection = Connection.open_serial_port(port) device_list = _connection.detect_devices() + if not device_list: _connection.close() _connection = None raise RuntimeError("No Zaber devices found") - print(f"Found {len(device_list)} device(s)") - - # Get the configured device - if DEVICE_INDEX >= len(device_list): - _connection.close() - _connection = None - raise RuntimeError( - f"Device index {DEVICE_INDEX} out of range " - f"(found {len(device_list)} devices)" - ) - device = device_list[DEVICE_INDEX] _axis = device.get_axis(AXIS_NUMBER) - # Apply default settings - _axis.settings.set("maxspeed", DEFAULT_VELOCITY, "mm/s") - _axis.settings.set("accel", DEFAULT_ACCELERATION, "mm/s^2") - return { - "connected": True, "device_name": device.name, "device_id": device.device_id, "axis_count": device.axis_count, } -def disconnect() -> dict: - """ - Disconnect from Zaber device. - - Returns: - dict with status - """ +def disconnect() -> None: + """Disconnect from Zaber device.""" global _connection, _axis - if _connection: _connection.close() _connection = None _axis = None - return {"disconnected": True} - return {"disconnected": False, "message": "No active connection"} - -def _ensure_connected() -> None: - """Ensure device is connected, raise if not.""" - if _connection is None or _axis is None: - raise RuntimeError("Not connected. Call connect() first.") - - -def home_axis() -> dict: - """ - Home the axis to find reference position. - - Returns: - dict with homing result - """ - _ensure_connected() - - print("Homing axis...") +def home() -> float: + """Home the axis. Returns position in mm.""" _axis.home() _axis.wait_until_idle() - - position = _axis.get_position("mm") - - # Update state - state = _load_state() - state["is_homed"] = True - state["last_position"] = position - _save_state(state) - - print(f"Homing complete. Position: {position:.2f} mm") - - return {"success": True, "position": position, "homed": True} + return _axis.get_position("mm") -def move_to_position(target_position: float, velocity: Optional[float] = None) -> dict: - """ - Move axis to absolute position. - - Args: - target_position: Target position in mm - velocity: Optional velocity override in mm/s - - Returns: - dict with movement result - """ - _ensure_connected() - - # Validate position - if not (MIN_POSITION <= target_position <= MAX_POSITION): - raise ValueError( - f"Target position {target_position} mm is out of range " - f"[{MIN_POSITION}, {MAX_POSITION}]" - ) - - # Get starting position - start_position = _axis.get_position("mm") - - # Set velocity if provided - if velocity: - _axis.settings.set("maxspeed", velocity, "mm/s") - - print(f"Moving from {start_position:.2f} mm to {target_position:.2f} mm...") - _axis.move_absolute(target_position, "mm") +def move_absolute(position: float, unit: str = "mm") -> float: + """Move to absolute position. Returns final position.""" + _axis.move_absolute(position, unit) _axis.wait_until_idle() + return _axis.get_position("mm") - # Get final position - final_position = _axis.get_position("mm") - - # Update state - state = _load_state() - state["last_position"] = final_position - _save_state(state) - - print(f"Movement complete. Final position: {final_position:.2f} mm") - return { - "success": True, - "start_position": start_position, - "target_position": target_position, - "final_position": final_position, - "distance": abs(final_position - start_position), - } - - -def move_relative(distance: float, velocity: Optional[float] = None) -> dict: - """ - Move axis by relative distance. - - Args: - distance: Distance to move in mm (positive or negative) - velocity: Optional velocity override in mm/s - - Returns: - dict with movement result - """ - _ensure_connected() - - # Get current position - current_position = _axis.get_position("mm") - target_position = current_position + distance - - # Validate target position - if not (MIN_POSITION <= target_position <= MAX_POSITION): - raise ValueError( - f"Target position {target_position} mm would be out of range " - f"[{MIN_POSITION}, {MAX_POSITION}]" - ) - - # Set velocity if provided - if velocity: - _axis.settings.set("maxspeed", velocity, "mm/s") - - print(f"Moving {distance:+.2f} mm from {current_position:.2f} mm...") - _axis.move_relative(distance, "mm") +def move_relative(distance: float, unit: str = "mm") -> float: + """Move by relative distance. Returns final position.""" + _axis.move_relative(distance, unit) _axis.wait_until_idle() + return _axis.get_position("mm") - # Get final position - final_position = _axis.get_position("mm") - - # Update state - state = _load_state() - state["last_position"] = final_position - _save_state(state) - - print(f"Movement complete. Final position: {final_position:.2f} mm") - - return { - "success": True, - "start_position": current_position, - "distance_requested": distance, - "final_position": final_position, - "actual_distance": final_position - current_position, - } - - -def get_position() -> dict: - """ - Get current axis position. - Returns: - dict with position info - """ - _ensure_connected() +def get_position(unit: str = "mm") -> float: + """Get current position.""" + return _axis.get_position(unit) - position = _axis.get_position("mm") - is_busy = _axis.is_busy() - - return {"position": position, "busy": is_busy} - - -def stop_movement() -> dict: - """ - Stop current movement immediately. - - Returns: - dict with stop result - """ - _ensure_connected() +def stop() -> float: + """Stop movement. Returns position.""" _axis.stop() - position = _axis.get_position("mm") - - return {"stopped": True, "position": position} - - -def get_device_info() -> dict: - """ - Get device information. - - Returns: - dict with device info - """ - _ensure_connected() - - device = _axis.device - - # Get current settings - max_speed = _axis.settings.get("maxspeed", "mm/s") - accel = _axis.settings.get("accel", "mm/s^2") - position = _axis.get_position("mm") - - return { - "device_name": device.name, - "device_id": device.device_id, - "axis_number": AXIS_NUMBER, - "axis_count": device.axis_count, - "current_position": position, - "max_speed": max_speed, - "acceleration": accel, - "position_limits": {"min": MIN_POSITION, "max": MAX_POSITION}, - } + return _axis.get_position("mm") From a0f7746ba2e2702a039cd5036add17bc817eb7cb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 26 Nov 2025 18:08:38 +0000 Subject: [PATCH 5/5] Add connection check to prevent AttributeError when not connected Co-authored-by: sgbaird <45469701+sgbaird@users.noreply.github.com> --- .../zaber_linear_rail/rail_controller.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/ac_training_lab/zaber_linear_rail/rail_controller.py b/src/ac_training_lab/zaber_linear_rail/rail_controller.py index a41fda1b..7ae183d8 100644 --- a/src/ac_training_lab/zaber_linear_rail/rail_controller.py +++ b/src/ac_training_lab/zaber_linear_rail/rail_controller.py @@ -20,6 +20,12 @@ _axis = None +def _check_connected(): + """Raise if not connected.""" + if _axis is None: + raise RuntimeError("Not connected. Call connect() first.") + + def connect(serial_port: Optional[str] = None) -> dict: """Connect to Zaber device via USB serial.""" global _connection, _axis @@ -55,6 +61,7 @@ def disconnect() -> None: def home() -> float: """Home the axis. Returns position in mm.""" + _check_connected() _axis.home() _axis.wait_until_idle() return _axis.get_position("mm") @@ -62,6 +69,7 @@ def home() -> float: def move_absolute(position: float, unit: str = "mm") -> float: """Move to absolute position. Returns final position.""" + _check_connected() _axis.move_absolute(position, unit) _axis.wait_until_idle() return _axis.get_position("mm") @@ -69,6 +77,7 @@ def move_absolute(position: float, unit: str = "mm") -> float: def move_relative(distance: float, unit: str = "mm") -> float: """Move by relative distance. Returns final position.""" + _check_connected() _axis.move_relative(distance, unit) _axis.wait_until_idle() return _axis.get_position("mm") @@ -76,10 +85,12 @@ def move_relative(distance: float, unit: str = "mm") -> float: def get_position(unit: str = "mm") -> float: """Get current position.""" + _check_connected() return _axis.get_position(unit) def stop() -> float: """Stop movement. Returns position.""" + _check_connected() _axis.stop() return _axis.get_position("mm")