Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions config/hsfei/hsfei_filterwheel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Single Daemon Configuration Example
# For standalone daemon deployment or development/testing
#
# Usage:
# daemon = Atcfwheel.from_config_file("config/hsfei_atcfwheel.yaml")
# ATC FilterWheel — Thor labs fw 102c contoller instance
#
# Usage:
# daemons/generic/filterwheel -c config/hsfei/hsfei_filterwheel.yaml

peer_id: hsfei_atcfwheel
group_id: hsfei

hardware:
ip_address: 192.168.29.100
tcp_port: 10010
units: filter_pos
timeout_s: 30.0
retry_count: 3

limits:
soft_min: 1
soft_max: 6
hard_min: 1
hard_max: 6

named_positions:
empty: 1
OD1: 2
OD2: 3
OD4: 4
OD5: 5
empty2: 6

logging:
level: INFO
file: /tmp/atcfwheel.log
285 changes: 285 additions & 0 deletions daemons/generic/filterwheel
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
#!/usr/bin/env python3
'''Module for the Filter Wheel Daemon'''
import argparse
import sys
from typing import Dict, Any, Optional #pylint: disable = W0611

from hispec.daemon import HispecDaemon #pylint: disable = E0611
from hispec.driver.thorlabs.fw102c import FilterWheelController #pylint: disable = E0611

class Filterwheel(HispecDaemon): #pylint: disable = W0223
'''Daemon for controlling the Filter Wheel via Thorlabs FW102C controller'''

def __init__(self):
"""Initialize the Filter Wheel daemon.

Args: come from the hsfei configuration file
"""
super().__init__()

self.host = None
self.port = None
self.dev = FilterWheelController(log=True)
self._soft_min = None
self._soft_max = None
self._hard_min = None
self._hard_max = None
self.named_positions = None
self.daemon_desc = "Filter Wheel Daemon"
self.units = "position units" # Set appropriate units for your filter wheel

# Daemon state
self.state = {
'connected': False,
'error': ''
}

def on_start(self, libby):
'''Starts up daemon and initializies the hardware device'''
self.host = self.get_config("hardware.ip_address")
self.port = self.get_config("hardware.tcp_port")
self.daemon_desc = self.get_config("peer_id")
self.units = self.get_config("units")
self._soft_min = self.get_config("limits.soft_min")
self._soft_max = self.get_config("limits.soft_max")
self._hard_min = self.get_config("limits.hard_min")
self._hard_max = self.get_config("limits.hard_max")
self.named_positions = self.get_config("named_positions")

# Initialize hardware connection
if not(self.host and self.port):
self.logger.error("No IP address or port specified for Filter Wheel controller")
self.state['error'] = 'No IP address or port specified'
return
try:
connection = self.connect(True)
if not connection.get("ok"):
raise ConnectionError(connection.get("Error"))
self.state['connected'] = True
self.logger.info("Daemon started successfully and connected to hardware")
self.initialize()
self.logger.info("Initialized %s", self.daemon_desc)
except ConnectionRefusedError as e:
self.logger.error("Failed to connect to hardware: %s", e)
self.logger.warning("Daemon will start but hardware is not available")
self.state['error'] = str(e)
self.state['connected'] = False

def initialize(self):
"""handles initialization"""
if not self.state['connected']:
return {"ok": False, "error": "Not connected to hardware"}

try:
self.dev.initialize()
self.keyword_registry.bool("isconnected",
getter=self.dev.is_connected,
setter=self.keyword_wrapper(self.connect, key="isconnected"),
description="Check if daemon can talk to the FW controller.")
self.keyword_registry.string("error",
getter=lambda: self.state['error'],
description="Get the current error message.")
self.keyword_registry.int("positionvalue",
getter=self.keyword_wrapper(self.get_pos, key="position"),
setter=self.keyword_wrapper(self.set_pos, key="position"),
validator=self._check_soft_limits,
units=self.units,
description="Set and get current position of FilterWheel.")
self.keyword_registry.string("positionnamed",
getter=self.keyword_wrapper(self.cur_named_position, key="named_pos"),
setter=self.keyword_wrapper(self.goto_named_pos, key="named_pos"),
validator=self._check_named,
description="Set and get named position of FilterWheel.")
self.keyword_registry.int("softmin",
getter=lambda: self._soft_min,
setter=lambda v: setattr(self, "_soft_min", int(v)),
units=self.units,
description="Software lower limit for filter wheel position.")
self.keyword_registry.int("softmax",
getter=lambda: self._soft_max,
setter=lambda v: setattr(self, "_soft_max", int(v)),
units=self.units,
description="Software upper limit for filter wheel position.")
self.keyword_registry.int("hardmin",
getter=lambda: self._hard_min,
units=self.units,
description="Hardware lower limit for filter wheel position.")
self.keyword_registry.int("hardmax",
getter=lambda: self._hard_max,
units=self.units,
description="Hardware upper limit for filter wheel position.")

except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
self.state['error'] = str(e)
return {"ok":False , "error": str(e)}
return {"ok": True}

def get_named_positions(self):
"""Get named positions from config (e.g., home, deployed, science)."""
return self._config.get("named_positions", {})

def get_named_position(self, name: str):
"""Get a specific named position value, or None if not found."""
return self.get_named_positions().get(name)

def cur_named_position(self):
"""Get the name of the current position, if it matches a named position."""
current_pos = self.dev.get_pos()
for name, pos in self.get_named_positions().items():
if str(pos) == str(current_pos):
return {"ok": True, "named_pos": name, "position": current_pos}
return {"ok": False, "error": "Current position does not match any named position", "position": current_pos} #pylint: disable = C0301

def on_stop(self, libby) -> None: #pylint: disable=W0222
'''Stops the daemon and disconnects from hardware device'''
try:
self.connect(False)
self.logger.info("Disconnected %s", self.daemon_desc)
except Exception as e: # pylint: disable=W0718
self.logger.error("Disconnect %s:: Failed ", self.daemon_desc)
self.logger.error("Error: %s",e)

def connect(self, connect):
"""handles connection"""
try:
if connect:
self.dev.connect(host = self.host, port = self.port)
else:
self.dev.disconnect()
result = self.dev.is_connected()
if result != connect:
raise ConnectionError("Failed to Handle Connection Request")
self.logger.info("isconnected: %s", result)
except Exception as e: # pylint: disable=W0718
self.logger.error("Failed to execute: %s",e)
return {"ok": False, "error": str(e)}
return {"ok": True, "isconnected": result}

def status(self):
"""handles status"""
try:
limits = self.dev.get_limits()
position = self.cur_named_position()
status = {
"connected": self.dev.is_connected(),
"position": position.get("position"),
"named_pos": position.get("named_pos"),
"min_limit": limits.get("1")[0],
"max_limit": limits.get("1")[1],
}
self.logger.debug("status: %s",status)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
self.state['error'] = str(e)
return {"ok": False, "error": str(e)}
return {"ok": True, "status": status}

def get_pos(self):
'''gets current position'''
if not self.state['connected']:
return {"ok": False, "error": "Not connected to hardware"}

try:
position = self.dev.get_pos()
self.logger.debug("get_pos: %s",position)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
self.state['error'] = str(e)
return {"ok": False, "error": str(e)}
return {"ok":True, "position": int(position)}

def set_pos(self, pos):
'''sets current position'''
if not self.state['connected']:
return {"ok": False, "error": "Not connected to hardware"}

try:
pos = int(pos)
self._check_soft_limits(pos)
self.dev.set_pos(pos)
self.logger.debug("set_pos: %d",pos)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
self.state['error'] = str(e)
return {"ok": False, "error": str(e)}
return {"ok": True, "position": int(pos)}

def goto_named_pos(self, name):
'''moves to named position'''
if not self.state['connected']:
return {"ok": False, "error": "Not connected to hardware"}

try:
goal = self.get_named_position(name.lower())
if goal is not None:
self.dev.set_pos(int(goal))
self.logger.debug("goto_named_pos: %s -> %s",name,goal)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
self.state['error'] = str(e)
return {"ok": False, "error": str(e)}
return {"ok": True, "named_pos": name, "position": goal}

def _check_soft_limits(self, pos: int) -> bool:
"""Returns True if position is within soft limits."""
if self._soft_min is not None and pos < self._soft_min:
raise ValueError(f"Position {pos} below soft min {self._soft_min}")
if self._soft_max is not None and pos > self._soft_max:
raise ValueError(f"Position {pos} above soft max {self._soft_max}")
return None

def _check_named(self, name: str) -> Optional[str]:
if not name:
return "value must be a non-empty string"
if name not in self.named_positions:
return (f"unknown named position '{name}'; "
f"available: {list(self.named_positions)}")
return None

@staticmethod
def keyword_wrapper(func, key=None):
"""Wrap a daemon method for use as a keyword getter/setter."""
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception as e:
print(f"DEBUG keyword_wrapper [{func.__name__}]: exception={e}")
raise
if not result.get("ok"):
raise RuntimeError(result.get("error", f"Unknown error in {func.__name__}"))
if key:
return result[key]
return {k: v for k, v in result.items() if k != "ok"}
return wrapper

def main():
"""Main entry point for the daemon."""
parser = argparse.ArgumentParser(
description='FilterWheel Daemon'
)
parser.add_argument('-c', '--config', type=str,
help='Path to config file (YAML or JSON)')
parser.add_argument('-d', '--daemon-id', type=str,
help='Daemon ID (required for subsystem configs with multiple daemons)')

args = parser.parse_args()

if not args.config:
print("--config is required", file=sys.stderr)
sys.exit(2)

try:
print(f"Starting FilterWheel Daemon with config: {args.config} and daemon ID: {args}")
daemon = Filterwheel.from_config_file(args.config, daemon_id=args.daemon_id)
daemon.serve()
except KeyboardInterrupt:
print("\nDaemon interrupted by user")
sys.exit(0)
except Exception as e: #pylint: disable=W0718
print(f"Error running daemon: {e}", file=sys.stderr)
sys.exit(1)


if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion src/hispec/driver/thorlabs
Submodule thorlabs updated 1 files
+6 −5 fw102c.py
Loading