Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/robocoop_backend/robocoop_backend.egg-info/SOURCES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ robocoop_backend/modules/robot/__init__.py
robocoop_backend/modules/robot/rgb_chenillard.py
robocoop_backend/modules/robot/state_store.py
robocoop_backend/modules/robot/telemetry_service.py
robocoop_backend/modules/robot/teleop.py
robocoop_backend/modules/robot/teleop_watchdog.py
robocoop_backend/scripts/__init__.py
robocoop_backend/scripts/rgb_chenillard_standalone.py
16 changes: 11 additions & 5 deletions src/robocoop_backend/robocoop_backend/adapters/base_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

logger = logging.getLogger(__name__)

from robocoop_backend.modules.robot.teleop import TeleopCommand

logger = logging.getLogger(__name__)


class RobotAdapter(ABC):
@abstractmethod
Expand All @@ -21,10 +25,12 @@ def is_connected(self) -> bool:
"""Check if currently connected."""
...

def send_velocity(self, data: Dict[str, Any]) -> None:
"""Publish cmd_vel from teleop.move data. Override in subclasses."""
logger.debug("send_velocity not implemented for %s", type(self).__name__)
@abstractmethod
def send_velocity(self, command: TeleopCommand) -> None:
"""Publish a validated teleop command."""
...

@abstractmethod
def emergency_stop(self) -> None:
"""Trigger emergency stop. Override in subclasses."""
logger.debug("emergency_stop not implemented for %s", type(self).__name__)
"""Trigger emergency stop."""
...
15 changes: 12 additions & 3 deletions src/robocoop_backend/robocoop_backend/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ def create_adapter(
logger.info(f"Creating adapter: {adapter_type}")

if adapter_type == "mock":
teleop = config.get("teleop", {})
return MockRobotAdapter(telemetry_service=telemetry_service)

return MockRobotAdapter(
telemetry_service=telemetry_service,
max_linear_x=float(teleop.get("max_linear_x", 0.25)),
max_linear_y=float(teleop.get("max_linear_y", 0.25)),
max_angular_z=float(teleop.get("max_angular_z", 0.8)),
)

if adapter_type == "rosbridge":
rb = config.get("rosbridge", {})
topics = rb.get("topics", {})
Expand All @@ -53,8 +61,9 @@ def create_adapter(
emergency_stop_msg_type=msg_types.get("emergency_stop", "std_msgs/msg/Bool"),
odom_topic=topics.get("odom", "/odom"),
odom_msg_type=msg_types.get("odom", "nav_msgs/msg/Odometry"),
max_linear_x=float(teleop.get("max_linear_x", 0.5)),
max_angular_z=float(teleop.get("max_angular_z", 1.0)),
max_linear_x=float(teleop.get("max_linear_x", 0.25)),
max_linear_y=float(teleop.get("max_linear_y", 0.25)),
max_angular_z=float(teleop.get("max_angular_z", 0.8)),
battery_watchdog_timeout=float(watchdog_timeout),
telemetry_service=telemetry_service,
rgb_chenillard_enabled=_rgb_chenillard_enabled(rgb_cfg),
Expand All @@ -67,4 +76,4 @@ def create_adapter(
rgb_led_max=int(rgb_cfg.get("led_max", 7)),
)

raise ValueError(f"Unknown adapter type: '{adapter_type}'. Supported: mock, rosbridge")
raise ValueError(f"Unknown adapter type: '{adapter_type}'. Supported: mock, rosbridge")
31 changes: 23 additions & 8 deletions src/robocoop_backend/robocoop_backend/adapters/mock_adapter.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,46 @@
import logging
from typing import Any, Dict

from robocoop_backend.adapters.base_adapter import RobotAdapter
from robocoop_backend.modules.robot.teleop import TeleopCommand

logger = logging.getLogger(__name__)

logger = logging.getLogger(__name__)


class MockRobotAdapter(RobotAdapter):
def __init__(self, telemetry_service=None):
def __init__(
self,
telemetry_service=None,
max_linear_x: float = 0.25,
max_linear_y: float = 0.25,
max_angular_z: float = 0.8,
):
self._is_connected = True
self.telemetry_service = telemetry_service
self.max_linear_x = max_linear_x
self.max_linear_y = max_linear_y
self.max_angular_z = max_angular_z
self.last_velocity_command = None

async def connect(self) -> bool:
"""Mock connect always succeeds."""
return True

async def disconnect(self) -> None:
"""Mock disconnect is a no-op."""
pass

def is_connected(self) -> bool:
return self._is_connected

def send_velocity(self, data: Dict[str, Any]) -> None:
linear_x = data.get("linear_x", data.get("linear", 0.0))
angular_z = data.get("angular_z", data.get("angular", 0.0))
logger.info("[MOCK] teleop.move linear_x=%s angular_z=%s", linear_x, angular_z)
def send_velocity(self, command: TeleopCommand) -> None:
self.last_velocity_command = command.to_dict()

logger.info(
"[MOCK] teleop.move linear_x=%s linear_y=%s angular_z=%s",
command.linear_x,
command.linear_y,
command.angular_z,
)

def emergency_stop(self) -> None:
logger.warning("[MOCK] emergency_stop")
62 changes: 37 additions & 25 deletions src/robocoop_backend/robocoop_backend/adapters/rosbridge_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from robocoop_backend.adapters.base_adapter import RobotAdapter
from robocoop_backend.adapters.rosbridge_client import RosbridgeClient
from robocoop_backend.modules.robot.rgb_chenillard import RgbChenillard
from robocoop_backend.modules.robot.teleop import TeleopCommand

logger = logging.getLogger(__name__)

Expand All @@ -26,8 +27,9 @@ def __init__(
emergency_stop_msg_type: str = "std_msgs/msg/Bool",
odom_topic: str = "/odom",
odom_msg_type: str = "nav_msgs/msg/Odometry",
max_linear_x: float = 0.5,
max_angular_z: float = 1.0,
max_linear_x: float = 0.25,
max_linear_y: float = 0.25,
max_angular_z: float = 0.8,
battery_watchdog_timeout: float = 15.0,
ping_interval: float = 5.0,
telemetry_service=None,
Expand All @@ -42,13 +44,18 @@ def __init__(
):
self.battery_topic = battery_topic
self.battery_msg_type = battery_msg_type
self._V_MIN = 9.0
self._V_MAX = 12.6
self._BATTERY_ALPHA = 0.05
self._filtered_voltage: Optional[float] = None
self.cmd_vel_topic = cmd_vel_topic
self.cmd_vel_msg_type = cmd_vel_msg_type
self.emergency_stop_topic = emergency_stop_topic
self.emergency_stop_msg_type = emergency_stop_msg_type
self.odom_topic = odom_topic
self.odom_msg_type = odom_msg_type
self.max_linear_x = max_linear_x
self.max_linear_y = max_linear_y
self.max_angular_z = max_angular_z
self.battery_watchdog_timeout = battery_watchdog_timeout
self.ping_interval = ping_interval
Expand Down Expand Up @@ -134,22 +141,19 @@ def _on_odom_received(self, msg_data: Dict[str, Any]) -> None:
except (TypeError, ValueError) as e:
logger.error(f"[ODOM] Error: {e}")

def send_velocity(self, data: Dict[str, Any]) -> None:
try:
linear_x = self._clamp_axis(
float(data.get("linear_x", data.get("linear", 0.0))), self.max_linear_x
)
angular_z = self._clamp_axis(
float(data.get("angular_z", data.get("angular", 0.0))), self.max_angular_z
)
msg = {
"linear": {"x": linear_x, "y": 0.0, "z": 0.0},
"angular": {"x": 0.0, "y": 0.0, "z": angular_z},
}
self._schedule_publish(self.cmd_vel_topic, self.cmd_vel_msg_type, msg)
logger.debug("[TELEOP] cmd_vel linear_x=%.3f angular_z=%.3f", linear_x, angular_z)
except (TypeError, ValueError) as e:
logger.error(f"[TELEOP] Invalid velocity data: {e}")
def send_velocity(self, command: TeleopCommand) -> None:
self._schedule_publish(
self.cmd_vel_topic,
self.cmd_vel_msg_type,
command.to_twist_msg(),
)

logger.debug(
"[TELEOP] cmd_vel linear_x=%.3f linear_y=%.3f angular_z=%.3f",
command.linear_x,
command.linear_y,
command.angular_z,
)

def emergency_stop(self) -> None:
logger.warning("[EMERGENCY] Publishing emergency stop")
Expand All @@ -159,10 +163,6 @@ def emergency_stop(self) -> None:
{"data": True},
)

@staticmethod
def _clamp_axis(value: float, limit: float) -> float:
return max(-limit, min(limit, value))

async def _client_publish(self, topic: str, msg_type: str, msg: Dict[str, Any]) -> None:
await self._client.publish(topic, msg_type, msg)

Expand All @@ -178,9 +178,21 @@ def _on_battery_received(self, msg_data: Dict[str, Any]) -> None:
battery_level = None
if "data" in msg_data:
voltage = float(msg_data["data"])
# battery_level = max(0, min(100, (voltage - 9.0) / (12.6 - 9.0) * 100))
battery_level = max(0, min(100, voltage / 12.0 * 100))
logger.info(f"[BATTERY] {voltage:.2f}V -> {battery_level:.1f}%")

if self._filtered_voltage is None:
self._filtered_voltage = voltage
else:
self._filtered_voltage = (
self._BATTERY_ALPHA * voltage
+ (1 - self._BATTERY_ALPHA) * self._filtered_voltage
)

battery_level = max(0, min(100,
(self._filtered_voltage - self._V_MIN) / (self._V_MAX - self._V_MIN) * 100
))
logger.info(
f"[BATTERY] raw={voltage:.2f}V filtered={self._filtered_voltage:.2f}V -> {battery_level:.1f}%"
)
elif "percentage" in msg_data:
battery_level = float(msg_data["percentage"])
logger.info(f"[BATTERY] {battery_level:.1f}%")
Expand Down
Loading
Loading