Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]
### Added
- Zaber linear rail control module with Prefect workflow orchestration in `src/ac_training_lab/zaber_linear_rail/`:
- `config.py` - Configuration settings for serial port, motion parameters, and safety limits
- `rail_controller.py` - Core controller with connect, home, move_to_position, move_relative, get_position, stop
- `prefect_flows.py` - Prefect flows for remote orchestration (move_to_position_flow, home_flow, sequence_flow)
- Support for both `rpicam-vid` (Raspberry Pi OS Trixie) and `libcamera-vid` (Raspberry Pi OS Bookworm) camera commands in `src/ac_training_lab/picam/device.py` to ensure compatibility across different OS versions.
- Comprehensive Unit Operations section in `docs/index.md` documenting all available capabilities including dispensing, synthesis, characterization, and robotics operations.
- Expanded Training Workflows section in `docs/index.md` with 10 educational workflows including RGB/RYB color matching, titration, yeast growth optimization, vision-enabled 3D printing optimization, microscopy image stitching, and AprilTag robot path planning.
Expand Down
34 changes: 34 additions & 0 deletions src/ac_training_lab/zaber_linear_rail/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Zaber Linear Rail Control Module.

Simple wrappers around zaber-motion library for Prefect integration.

Example:
from ac_training_lab.zaber_linear_rail import connect, home, move_absolute

connect()
home()
move_absolute(100.0) # Move to 100mm
"""

from ac_training_lab.zaber_linear_rail.config import SERIAL_PORT
from ac_training_lab.zaber_linear_rail.rail_controller import (
connect,
disconnect,
get_position,
home,
move_absolute,
move_relative,
stop,
)

__all__ = [
"connect",
"disconnect",
"home",
"move_absolute",
"move_relative",
"get_position",
"stop",
"SERIAL_PORT",
]
15 changes: 15 additions & 0 deletions src/ac_training_lab/zaber_linear_rail/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
Configuration settings for Zaber linear rail control.

Edit these values during initial setup to match your hardware and environment.
"""

import os

# Serial port where Zaber device is connected (USB)
# On Raspberry Pi, typically /dev/ttyUSB0 or /dev/ttyACM0
SERIAL_PORT = os.getenv("ZABER_SERIAL_PORT", "/dev/ttyUSB0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume USB connection and no other form of communication except for Prefect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified to USB-only in a0f7746. Removed TCP/IoT connection options - now only SERIAL_PORT config remains.


# Device index is 0-based (for device_list), axis number is 1-based (Zaber)
DEVICE_INDEX = int(os.getenv("ZABER_DEVICE_INDEX", "0"))
AXIS_NUMBER = int(os.getenv("ZABER_AXIS_NUMBER", "1"))
131 changes: 131 additions & 0 deletions src/ac_training_lab/zaber_linear_rail/prefect_flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Prefect flows for Zaber linear rail control.

Simple wrappers around Zaber API calls with Prefect task/flow decorators.
"""

from typing import Optional

from prefect import flow, task

from ac_training_lab.zaber_linear_rail.rail_controller import (
connect,
disconnect,
get_position,
home,
move_absolute,
move_relative,
stop,
)

# Wrap functions as tasks
connect_task = task(connect)
disconnect_task = task(disconnect)
home_task = task(home)
move_absolute_task = task(move_absolute)
move_relative_task = task(move_relative)
get_position_task = task(get_position)
stop_task = task(stop)


@flow
def home_flow(serial_port: Optional[str] = None) -> float:
"""Home the axis. Returns position in mm."""
connect_task(serial_port=serial_port)
try:
return home_task()
finally:
disconnect_task()


@flow
def move_absolute_flow(
position: float, unit: str = "mm", serial_port: Optional[str] = None
) -> float:
"""Move to absolute position. Returns final position."""
connect_task(serial_port=serial_port)
try:
return move_absolute_task(position, unit)
finally:
disconnect_task()


@flow
def move_relative_flow(
distance: float, unit: str = "mm", serial_port: Optional[str] = None
) -> float:
"""Move by relative distance. Returns final position."""
connect_task(serial_port=serial_port)
try:
return move_relative_task(distance, unit)
finally:
disconnect_task()


@flow
def get_position_flow(unit: str = "mm", serial_port: Optional[str] = None) -> float:
"""Get current position."""
connect_task(serial_port=serial_port)
try:
return get_position_task(unit)
finally:
disconnect_task()


@flow
def sequence_flow(
positions: list[float],
unit: str = "mm",
serial_port: Optional[str] = None,
home_first: bool = True,
) -> list[float]:
"""Move through sequence of positions. Returns list of final positions."""
connect_task(serial_port=serial_port)
try:
if home_first:
home_task()
results = []
for pos in positions:
results.append(move_absolute_task(pos, unit))
return results
finally:
disconnect_task()


# Deployment functions
def deploy_all_flows(work_pool_name: str = "zaber-linear-rail-pool") -> None:
"""Deploy all flows to Prefect."""
from pathlib import Path

source_dir = Path(__file__).parent.parent.parent
base = "ac_training_lab/zaber_linear_rail/prefect_flows.py"

home_flow.from_source(
source=str(source_dir), entrypoint=f"{base}:home_flow"
).deploy(name="home", work_pool_name=work_pool_name)

move_absolute_flow.from_source(
source=str(source_dir), entrypoint=f"{base}:move_absolute_flow"
).deploy(name="move-absolute", work_pool_name=work_pool_name)

move_relative_flow.from_source(
source=str(source_dir), entrypoint=f"{base}:move_relative_flow"
).deploy(name="move-relative", work_pool_name=work_pool_name)

get_position_flow.from_source(
source=str(source_dir), entrypoint=f"{base}:get_position_flow"
).deploy(name="get-position", work_pool_name=work_pool_name)

sequence_flow.from_source(
source=str(source_dir), entrypoint=f"{base}:sequence_flow"
).deploy(name="sequence", work_pool_name=work_pool_name)

print(f"Deployed all flows to work pool: {work_pool_name}")


if __name__ == "__main__":
import sys

if len(sys.argv) > 1 and sys.argv[1] == "deploy":
pool = sys.argv[2] if len(sys.argv) > 2 else "zaber-linear-rail-pool"
deploy_all_flows(pool)
96 changes: 96 additions & 0 deletions src/ac_training_lab/zaber_linear_rail/rail_controller.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this seems overly complicated. Sure we can't just wrap zaber API calls with task and flow directly in most cases? Do we really need all this extra logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified in a0f7746. The controller now directly wraps Zaber API calls:

def home() -> float:
    _axis.home()
    _axis.wait_until_idle()
    return _axis.get_position("mm")

Removed state management, position validation, verbose logging. Functions simply call Zaber API and return results. Module went from 329 lines to 96 lines.

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Core controller for Zaber linear rail.

Simple wrappers around zaber-motion library for Prefect task integration.
"""

from typing import Optional

from zaber_motion import Library
from zaber_motion.ascii import Connection

from ac_training_lab.zaber_linear_rail.config import (
AXIS_NUMBER,
DEVICE_INDEX,
SERIAL_PORT,
)

# Global connection and axis references
_connection: Optional[Connection] = None
_axis = None


def _check_connected():
"""Raise if not connected."""
if _axis is None:
raise RuntimeError("Not connected. Call connect() first.")


def connect(serial_port: Optional[str] = None) -> dict:
"""Connect to Zaber device via USB serial."""
global _connection, _axis

Library.enable_device_db_store()
port = serial_port or SERIAL_PORT
_connection = Connection.open_serial_port(port)
device_list = _connection.detect_devices()

if not device_list:
_connection.close()
_connection = None
raise RuntimeError("No Zaber devices found")

device = device_list[DEVICE_INDEX]
_axis = device.get_axis(AXIS_NUMBER)

return {
"device_name": device.name,
"device_id": device.device_id,
"axis_count": device.axis_count,
}


def disconnect() -> None:
"""Disconnect from Zaber device."""
global _connection, _axis
if _connection:
_connection.close()
_connection = None
_axis = None


def home() -> float:
"""Home the axis. Returns position in mm."""
_check_connected()
_axis.home()
_axis.wait_until_idle()
return _axis.get_position("mm")


def move_absolute(position: float, unit: str = "mm") -> float:
"""Move to absolute position. Returns final position."""
_check_connected()
_axis.move_absolute(position, unit)
_axis.wait_until_idle()
return _axis.get_position("mm")


def move_relative(distance: float, unit: str = "mm") -> float:
"""Move by relative distance. Returns final position."""
_check_connected()
_axis.move_relative(distance, unit)
_axis.wait_until_idle()
return _axis.get_position("mm")


def get_position(unit: str = "mm") -> float:
"""Get current position."""
_check_connected()
return _axis.get_position(unit)


def stop() -> float:
"""Stop movement. Returns position."""
_check_connected()
_axis.stop()
return _axis.get_position("mm")