From d70570c070b3529d6b8c25cf14eea266c6a31bf3 Mon Sep 17 00:00:00 2001 From: Sergey Grizan Date: Tue, 12 May 2026 03:26:41 -0700 Subject: [PATCH] ROS: Refactor node --- examples/teleop_ros2/python/assets.py | 72 + examples/teleop_ros2/python/constants.py | 97 ++ examples/teleop_ros2/python/geometry.py | 167 +++ examples/teleop_ros2/python/messages.py | 309 ++++ examples/teleop_ros2/python/session_config.py | 397 ++++++ .../teleop_ros2/python/teleop_ros2_node.py | 1248 +++-------------- 6 files changed, 1232 insertions(+), 1058 deletions(-) create mode 100644 examples/teleop_ros2/python/assets.py create mode 100644 examples/teleop_ros2/python/constants.py create mode 100644 examples/teleop_ros2/python/geometry.py create mode 100644 examples/teleop_ros2/python/messages.py create mode 100644 examples/teleop_ros2/python/session_config.py diff --git a/examples/teleop_ros2/python/assets.py b/examples/teleop_ros2/python/assets.py new file mode 100644 index 000000000..481e5ed80 --- /dev/null +++ b/examples/teleop_ros2/python/assets.py @@ -0,0 +1,72 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Asset and configuration path resolution for teleop_ros2_node.""" + +from pathlib import Path + + +def resolve_config_asset_root(raw_root: str, default_root: Path) -> Path: + raw_root = raw_root.strip() + if not raw_root: + return default_root + + root = Path(raw_root).expanduser().resolve() + if not root.is_dir(): + raise FileNotFoundError(f"config_asset_root directory not found: {root}") + return root + + +def resolve_dex_sharpa_config(root: Path, filename: str) -> str: + return resolve_teleop_ros2_file( + root, + "DexPilot Sharpa Wave retargeting config", + "configs", + filename, + ) + + +def resolve_dex_sharpa_urdf(root: Path, filename: str) -> str: + try: + return resolve_teleop_ros2_file( + root, + "Standalone Sharpa Wave URDF", + "assets", + "urdf", + "sharpa_standalone", + filename, + ) + except FileNotFoundError as exc: + raise FileNotFoundError( + f"{exc}. Populate the official Sharpa Wave URDFs with " + "examples/teleop_ros2/scripts/fetch_sharpa_wave_urdfs.py before using " + "hand_retargeter:=dexpilot, or use a Docker image built from " + "examples/teleop_ros2/Dockerfile." + ) from exc + + +def resolve_sharpa_mjcf(filename: str) -> str: + try: + from importlib.resources import files + except ImportError as exc: # pragma: no cover - Python 3.10+ has this. + raise ModuleNotFoundError( + "Sharpa hand retargeting requires importlib.resources support" + ) from exc + + try: + return str( + files("robotic_grounding") / "assets" / "xmls" / "sharpawave" / filename + ) + except ModuleNotFoundError as exc: + raise ModuleNotFoundError( + "Sharpa hand retargeting requires robotic_grounding assets. " + "Install/use a build with isaacteleop[grounding] and bundled robotic_grounding." + ) from exc + + +def resolve_teleop_ros2_file(root: Path, description: str, *parts: str) -> str: + path = root.joinpath(*parts) + if not path.is_file(): + raise FileNotFoundError(f"{description} not found at: {path}") + return str(path) diff --git a/examples/teleop_ros2/python/constants.py b/examples/teleop_ros2/python/constants.py new file mode 100644 index 000000000..668b59559 --- /dev/null +++ b/examples/teleop_ros2/python/constants.py @@ -0,0 +1,97 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Constants and enum values shared by the Teleop ROS 2 node.""" + +from enum import Enum + +from isaacteleop.retargeting_engine.tensor_types.indices import BodyJointPicoIndex + +try: + from enum import StrEnum +except ImportError: # pragma: no cover - Python 3.10 compatibility for ROS Humble. + + class StrEnum(str, Enum): + def __str__(self) -> str: + return self.value + + +class HandRetargeter(StrEnum): + MODE_DEFAULT = "mode_default" + TRIHAND = "trihand" + PINK_IK = "pink_ik" + DEXPILOT = "dexpilot" + + +BODY_JOINT_NAMES = [e.name for e in BodyJointPicoIndex] +HAND_RETARGETERS = tuple(retargeter.value for retargeter in HandRetargeter) +SHARPA_HAND_RETARGETERS = (HandRetargeter.PINK_IK, HandRetargeter.DEXPILOT) +TELEOP_MODES = ("controller_teleop", "hand_teleop", "controller_raw", "full_body") + +TRIHAND_JOINT_NAMES = [ + "thumb_rotation", + "thumb_proximal", + "thumb_distal", + "index_proximal", + "index_distal", + "middle_proximal", + "middle_distal", +] +LEFT_FINGER_JOINT_NAMES = [f"left_{n}" for n in TRIHAND_JOINT_NAMES] +RIGHT_FINGER_JOINT_NAMES = [f"right_{n}" for n in TRIHAND_JOINT_NAMES] + +SHARPA_WAVE_JOINT_NAMES = [ + "thumb_CMC_FE", + "thumb_CMC_AA", + "thumb_MCP_FE", + "thumb_MCP_AA", + "thumb_IP", + "index_MCP_FE", + "index_MCP_AA", + "index_PIP", + "index_DIP", + "middle_MCP_FE", + "middle_MCP_AA", + "middle_PIP", + "middle_DIP", + "ring_MCP_FE", + "ring_MCP_AA", + "ring_PIP", + "ring_DIP", + "pinky_CMC", + "pinky_MCP_FE", + "pinky_MCP_AA", + "pinky_PIP", + "pinky_DIP", +] +SHARPA_FINGER_JOINT_COUNT = len(SHARPA_WAVE_JOINT_NAMES) +LEFT_SHARPA_WAVE_JOINT_NAMES = [f"left_{n}" for n in SHARPA_WAVE_JOINT_NAMES] +RIGHT_SHARPA_WAVE_JOINT_NAMES = [f"right_{n}" for n in SHARPA_WAVE_JOINT_NAMES] + +DEX_HANDTRACKING_TO_BASELINK_FRAME_TRANSFORM = (0, -1, 0, -1, 0, 0, 0, 0, -1) + + +def resolve_hand_retargeter( + mode: str, hand_retargeter: HandRetargeter +) -> HandRetargeter: + if hand_retargeter == HandRetargeter.MODE_DEFAULT: + if mode == "controller_teleop": + return HandRetargeter.TRIHAND + if mode == "hand_teleop": + return HandRetargeter.DEXPILOT + return hand_retargeter + + if mode == "hand_teleop" and hand_retargeter == HandRetargeter.TRIHAND: + raise ValueError( + "Parameter 'hand_retargeter:=trihand' is only valid with " + "mode:=controller_teleop" + ) + + return hand_retargeter + + +def uses_hands_source_for_controller( + mode: str, hand_retargeter: HandRetargeter +) -> bool: + return mode == "controller_teleop" and hand_retargeter in SHARPA_HAND_RETARGETERS diff --git a/examples/teleop_ros2/python/geometry.py b/examples/teleop_ros2/python/geometry.py new file mode 100644 index 000000000..252ce9b51 --- /dev/null +++ b/examples/teleop_ros2/python/geometry.py @@ -0,0 +1,167 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Geometry helpers for ROS message construction.""" + +from typing import Sequence, Union + +import numpy as np +from geometry_msgs.msg import Pose, TransformStamped +from scipy.spatial.transform import Rotation + +from isaacteleop.retargeting_engine.tensor_types.indices import HandJointIndex + + +def append_hand_poses( + poses: list[Pose], + joint_positions: np.ndarray, + joint_orientations: np.ndarray, + joint_valid: np.ndarray, + transform_rot: Rotation | None = None, + transform_trans: Sequence[float] | None = None, +) -> None: + for joint_idx in range( + HandJointIndex.THUMB_METACARPAL, HandJointIndex.LITTLE_TIP + 1 + ): + if joint_valid[joint_idx]: + pose = to_pose(joint_positions[joint_idx], joint_orientations[joint_idx]) + if transform_rot is not None or transform_trans is not None: + pose = apply_transform_to_pose(pose, transform_rot, transform_trans) + else: + pose = to_pose([0.0, 0.0, 0.0]) + poses.append(pose) + + +def apply_manus_controller_to_hand_pose(pose: Pose, side: str) -> Pose: + """ + Apply MANUS controller-to-hand calibration in the pose's current frame. + + This is equivalent to: + + T_world_hand = T_world_controller @ T_controller_hand + """ + if side not in ("left", "right"): + raise ValueError(f"side must be 'left' or 'right', got {side!r}") + + # All MANUS calibration data is intentionally kept in this one function. + hand_left_pico_rotation = np.array( + [ + [-0.91777945, -0.18672461, -0.35044942], + [0.37550315, -0.69513369, -0.61301431], + [-0.12914434, -0.6942068, 0.70809509], + ], + dtype=float, + ) + hand_pico_translation = np.array([0.0, 0.0, 0.08], dtype=float) + + if side == "left": + controller_to_hand_rot_mat = hand_left_pico_rotation.T + else: + mirror_y = np.diag([1.0, -1.0, 1.0]) + hand_right_pico_rotation = mirror_y @ hand_left_pico_rotation @ mirror_y + controller_to_hand_rot_mat = hand_right_pico_rotation.T + + controller_to_hand_trans = -controller_to_hand_rot_mat @ hand_pico_translation + + world_controller_pos = np.array( + [pose.position.x, pose.position.y, pose.position.z], + dtype=float, + ) + world_controller_rot = Rotation.from_quat( + [ + pose.orientation.x, + pose.orientation.y, + pose.orientation.z, + pose.orientation.w, + ] + ) + + controller_to_hand_rot = Rotation.from_matrix(controller_to_hand_rot_mat) + + world_hand_pos = world_controller_pos + world_controller_rot.apply( + controller_to_hand_trans + ) + world_hand_rot = world_controller_rot * controller_to_hand_rot + + return to_pose(world_hand_pos, world_hand_rot.as_quat()) + + +def apply_transform_to_pose( + pose: Pose, + rotation: Rotation | None = None, + translation: Sequence[float] | None = None, +) -> Pose: + """ + Return a new Pose with world-frame position transform and orientation + basis change applied. + """ + p = [pose.position.x, pose.position.y, pose.position.z] + orientation = Rotation.from_quat( + [ + pose.orientation.x, + pose.orientation.y, + pose.orientation.z, + pose.orientation.w, + ] + ) + + if rotation is not None: + p = rotation.apply(p) + # Conjugation keeps the same physical orientation while expressing it + # in the rotated basis used for published EE and hand poses. + orientation = rotation * orientation * rotation.inv() + + q = orientation.as_quat() + + result = Pose() + if translation is not None: + result.position.x = float(p[0]) + translation[0] + result.position.y = float(p[1]) + translation[1] + result.position.z = float(p[2]) + translation[2] + else: + result.position.x = float(p[0]) + result.position.y = float(p[1]) + result.position.z = float(p[2]) + + result.orientation.x = float(q[0]) + result.orientation.y = float(q[1]) + result.orientation.z = float(q[2]) + result.orientation.w = float(q[3]) + return result + + +def make_transform( + stamp, + parent_frame: str, + child_frame: str, + position: Union[np.ndarray, Sequence[float]], + orientation: Union[np.ndarray, Sequence[float]], +) -> TransformStamped: + tf = TransformStamped() + tf.header.stamp = stamp + tf.header.frame_id = parent_frame + tf.child_frame_id = child_frame + tf.transform.translation.x = float(position[0]) + tf.transform.translation.y = float(position[1]) + tf.transform.translation.z = float(position[2]) + tf.transform.rotation.x = float(orientation[0]) + tf.transform.rotation.y = float(orientation[1]) + tf.transform.rotation.z = float(orientation[2]) + tf.transform.rotation.w = float(orientation[3]) + return tf + + +def to_pose(position, orientation=None) -> Pose: + pose = Pose() + pose.position.x = float(position[0]) + pose.position.y = float(position[1]) + pose.position.z = float(position[2]) + if orientation is None: + pose.orientation.w = 1.0 + else: + pose.orientation.x = float(orientation[0]) + pose.orientation.y = float(orientation[1]) + pose.orientation.z = float(orientation[2]) + pose.orientation.w = float(orientation[3]) + return pose diff --git a/examples/teleop_ros2/python/messages.py b/examples/teleop_ros2/python/messages.py new file mode 100644 index 000000000..ee70e4600 --- /dev/null +++ b/examples/teleop_ros2/python/messages.py @@ -0,0 +1,309 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""ROS message and msgpack payload builders for teleop_ros2_node.""" + +import time +from typing import Dict, Sequence + +import numpy as np +from geometry_msgs.msg import PoseArray +from scipy.spatial.transform import Rotation +from sensor_msgs.msg import JointState + +from isaacteleop.retargeting_engine.interface import OptionalTensorGroup +from isaacteleop.retargeting_engine.tensor_types.indices import ( + ControllerInputIndex, + FullBodyInputIndex, + HandInputIndex, + HandJointIndex, +) + +from constants import BODY_JOINT_NAMES +from geometry import ( + append_hand_poses, + apply_manus_controller_to_hand_pose, + apply_transform_to_pose, + to_pose, +) + + +def build_controller_payload( + left_ctrl: OptionalTensorGroup, right_ctrl: OptionalTensorGroup +) -> Dict: + def _as_list(ctrl, index): + if ctrl.is_none: + return [0.0, 0.0, 0.0] + return [float(x) for x in ctrl[index]] + + def _as_quat(ctrl, index): + if ctrl.is_none: + return [1.0, 0.0, 0.0, 0.0] + return [float(x) for x in ctrl[index]] + + def _as_float(ctrl, index): + if ctrl.is_none: + return 0.0 + return float(ctrl[index]) + + return { + "timestamp": time.time_ns(), + "left_thumbstick": [ + _as_float(left_ctrl, ControllerInputIndex.THUMBSTICK_X), + _as_float(left_ctrl, ControllerInputIndex.THUMBSTICK_Y), + ], + "right_thumbstick": [ + _as_float(right_ctrl, ControllerInputIndex.THUMBSTICK_X), + _as_float(right_ctrl, ControllerInputIndex.THUMBSTICK_Y), + ], + "left_trigger_value": _as_float(left_ctrl, ControllerInputIndex.TRIGGER_VALUE), + "right_trigger_value": _as_float( + right_ctrl, ControllerInputIndex.TRIGGER_VALUE + ), + "left_squeeze_value": _as_float(left_ctrl, ControllerInputIndex.SQUEEZE_VALUE), + "right_squeeze_value": _as_float( + right_ctrl, ControllerInputIndex.SQUEEZE_VALUE + ), + "left_aim_position": _as_list(left_ctrl, ControllerInputIndex.AIM_POSITION), + "right_aim_position": _as_list(right_ctrl, ControllerInputIndex.AIM_POSITION), + "left_grip_position": _as_list(left_ctrl, ControllerInputIndex.GRIP_POSITION), + "right_grip_position": _as_list(right_ctrl, ControllerInputIndex.GRIP_POSITION), + "left_aim_orientation": _as_quat( + left_ctrl, ControllerInputIndex.AIM_ORIENTATION + ), + "right_aim_orientation": _as_quat( + right_ctrl, ControllerInputIndex.AIM_ORIENTATION + ), + "left_grip_orientation": _as_quat( + left_ctrl, ControllerInputIndex.GRIP_ORIENTATION + ), + "right_grip_orientation": _as_quat( + right_ctrl, ControllerInputIndex.GRIP_ORIENTATION + ), + "left_primary_click": _as_float(left_ctrl, ControllerInputIndex.PRIMARY_CLICK), + "right_primary_click": _as_float( + right_ctrl, ControllerInputIndex.PRIMARY_CLICK + ), + "left_secondary_click": _as_float( + left_ctrl, ControllerInputIndex.SECONDARY_CLICK + ), + "right_secondary_click": _as_float( + right_ctrl, ControllerInputIndex.SECONDARY_CLICK + ), + "left_thumbstick_click": _as_float( + left_ctrl, ControllerInputIndex.THUMBSTICK_CLICK + ), + "right_thumbstick_click": _as_float( + right_ctrl, ControllerInputIndex.THUMBSTICK_CLICK + ), + "left_menu_click": _as_float(left_ctrl, ControllerInputIndex.MENU_CLICK), + "right_menu_click": _as_float(right_ctrl, ControllerInputIndex.MENU_CLICK), + "left_is_active": not left_ctrl.is_none, + "right_is_active": not right_ctrl.is_none, + } + + +def build_ee_msg_from_controllers( + left_ctrl: OptionalTensorGroup, + right_ctrl: OptionalTensorGroup, + now, + frame_id: str, + transform_rot: Rotation | None = None, + transform_trans: Sequence[float] | None = None, + controller_uses_hands_source: bool = False, +) -> PoseArray: + """Build a PoseArray with left then right controller/hand wrist poses.""" + msg = PoseArray() + msg.header.stamp = now + msg.header.frame_id = frame_id + + if controller_aim_is_valid(left_ctrl): + pos = [float(x) for x in left_ctrl[ControllerInputIndex.AIM_POSITION]] + ori = [float(x) for x in left_ctrl[ControllerInputIndex.AIM_ORIENTATION]] + pose = to_pose(pos, ori) + + if transform_rot is not None or transform_trans is not None: + pose = apply_transform_to_pose(pose, transform_rot, transform_trans) + + if controller_uses_hands_source: + pose = apply_manus_controller_to_hand_pose(pose, "left") + + msg.poses.append(pose) + else: + msg.poses.append(to_pose([0.0, 0.0, 0.0])) + + if controller_aim_is_valid(right_ctrl): + pos = [float(x) for x in right_ctrl[ControllerInputIndex.AIM_POSITION]] + ori = [float(x) for x in right_ctrl[ControllerInputIndex.AIM_ORIENTATION]] + pose = to_pose(pos, ori) + + if transform_rot is not None or transform_trans is not None: + pose = apply_transform_to_pose(pose, transform_rot, transform_trans) + + if controller_uses_hands_source: + pose = apply_manus_controller_to_hand_pose(pose, "right") + + msg.poses.append(pose) + else: + msg.poses.append(to_pose([0.0, 0.0, 0.0])) + + return msg + + +def build_ee_msg_from_hands( + left_hand: OptionalTensorGroup, + right_hand: OptionalTensorGroup, + now, + frame_id: str, + transform_rot: Rotation | None = None, + transform_trans: Sequence[float] | None = None, +) -> PoseArray: + """Build a PoseArray with left then right hand wrist poses (EE proxy).""" + msg = PoseArray() + msg.header.stamp = now + msg.header.frame_id = frame_id + + if hand_joint_is_valid(left_hand, HandJointIndex.WRIST): + left_positions = np.asarray(left_hand[HandInputIndex.JOINT_POSITIONS]) + left_orientations = np.asarray(left_hand[HandInputIndex.JOINT_ORIENTATIONS]) + pose = to_pose( + left_positions[HandJointIndex.WRIST], + left_orientations[HandJointIndex.WRIST], + ) + if transform_rot is not None or transform_trans is not None: + pose = apply_transform_to_pose(pose, transform_rot, transform_trans) + msg.poses.append(pose) + else: + msg.poses.append(to_pose([0.0, 0.0, 0.0])) + + if hand_joint_is_valid(right_hand, HandJointIndex.WRIST): + right_positions = np.asarray(right_hand[HandInputIndex.JOINT_POSITIONS]) + right_orientations = np.asarray(right_hand[HandInputIndex.JOINT_ORIENTATIONS]) + pose = to_pose( + right_positions[HandJointIndex.WRIST], + right_orientations[HandJointIndex.WRIST], + ) + if transform_rot is not None or transform_trans is not None: + pose = apply_transform_to_pose(pose, transform_rot, transform_trans) + msg.poses.append(pose) + else: + msg.poses.append(to_pose([0.0, 0.0, 0.0])) + + return msg + + +def build_finger_joints_msg( + left_joints: OptionalTensorGroup, + right_joints: OptionalTensorGroup, + now, + frame_id: str, +) -> JointState | None: + if left_joints.is_none and right_joints.is_none: + return None + + finger_joints_msg = JointState() + finger_joints_msg.header.stamp = now + finger_joints_msg.header.frame_id = frame_id + left_arr = ( + np.asarray(list(left_joints), dtype=np.float32) + if not left_joints.is_none + else np.array([], dtype=np.float32) + ) + right_arr = ( + np.asarray(list(right_joints), dtype=np.float32) + if not right_joints.is_none + else np.array([], dtype=np.float32) + ) + finger_joints_msg.name = ( + joint_names_from_group_type(left_joints.group_type) + if not left_joints.is_none + else [] + ) + ( + joint_names_from_group_type(right_joints.group_type) + if not right_joints.is_none + else [] + ) + finger_joints_msg.position = np.concatenate([left_arr, right_arr]).tolist() + return finger_joints_msg + + +def build_full_body_payload(full_body: OptionalTensorGroup) -> Dict: + positions = np.asarray(full_body[FullBodyInputIndex.JOINT_POSITIONS]) + orientations = np.asarray(full_body[FullBodyInputIndex.JOINT_ORIENTATIONS]) + valid = np.asarray(full_body[FullBodyInputIndex.JOINT_VALID]) + + return { + "timestamp": time.time_ns(), + "joint_names": BODY_JOINT_NAMES, + "joint_positions": [[float(v) for v in pos] for pos in positions], + "joint_orientations": [[float(v) for v in ori] for ori in orientations], + "joint_valid": [bool(v) for v in valid], + } + + +def build_hand_msg_from_hands( + left_hand: OptionalTensorGroup, + right_hand: OptionalTensorGroup, + now, + frame_id: str, + transform_rot: Rotation | None = None, + transform_trans: Sequence[float] | None = None, +) -> PoseArray: + """Build a PoseArray with finger joint poses, right hand then left hand.""" + msg = PoseArray() + msg.header.stamp = now + msg.header.frame_id = frame_id + + if not right_hand.is_none: + right_positions = np.asarray(right_hand[HandInputIndex.JOINT_POSITIONS]) + right_orientations = np.asarray(right_hand[HandInputIndex.JOINT_ORIENTATIONS]) + right_valid = np.asarray(right_hand[HandInputIndex.JOINT_VALID]) + append_hand_poses( + msg.poses, + right_positions, + right_orientations, + right_valid, + transform_rot, + transform_trans, + ) + else: + for _ in range(HandJointIndex.THUMB_METACARPAL, HandJointIndex.LITTLE_TIP + 1): + msg.poses.append(to_pose([0.0, 0.0, 0.0])) + + if not left_hand.is_none: + left_positions = np.asarray(left_hand[HandInputIndex.JOINT_POSITIONS]) + left_orientations = np.asarray(left_hand[HandInputIndex.JOINT_ORIENTATIONS]) + left_valid = np.asarray(left_hand[HandInputIndex.JOINT_VALID]) + append_hand_poses( + msg.poses, + left_positions, + left_orientations, + left_valid, + transform_rot, + transform_trans, + ) + else: + for _ in range(HandJointIndex.THUMB_METACARPAL, HandJointIndex.LITTLE_TIP + 1): + msg.poses.append(to_pose([0.0, 0.0, 0.0])) + + return msg + + +def controller_aim_is_valid(ctrl: OptionalTensorGroup) -> bool: + # DeviceIO's AIM_IS_VALID flag is the usability contract for aim poses. + return not ctrl.is_none and bool(ctrl[ControllerInputIndex.AIM_IS_VALID]) + + +def hand_joint_is_valid(hand: OptionalTensorGroup, joint_idx: HandJointIndex) -> bool: + if hand.is_none: + return False + return bool(hand[HandInputIndex.JOINT_VALID][joint_idx]) + + +def hand_wrist_is_valid(hand: OptionalTensorGroup) -> bool: + return hand_joint_is_valid(hand, HandJointIndex.WRIST) + + +def joint_names_from_group_type(group_type) -> list[str]: + return [tensor_type.name for tensor_type in group_type.types] diff --git a/examples/teleop_ros2/python/session_config.py b/examples/teleop_ros2/python/session_config.py new file mode 100644 index 000000000..9235913b1 --- /dev/null +++ b/examples/teleop_ros2/python/session_config.py @@ -0,0 +1,397 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""TeleopSession graph assembly for teleop_ros2_node.""" + +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +from isaacteleop.deviceio import McapReplayConfig +from isaacteleop.retargeting_engine.deviceio_source_nodes import ( + ControllersSource, + FullBodySource, + Generic3AxisPedalSource, + HandsSource, +) +from isaacteleop.retargeting_engine.interface import OutputCombiner +from isaacteleop.retargeters import ( + DexHandRetargeter, + DexHandRetargeterConfig, + FootPedalRootCmdRetargeter, + FootPedalRootCmdRetargeterConfig, + LocomotionRootCmdRetargeter, + LocomotionRootCmdRetargeterConfig, + TriHandMotionControllerConfig, + TriHandMotionControllerRetargeter, +) +from isaacteleop.teleop_session_manager import SessionMode, TeleopSessionConfig +from teleop_ros2_retargeters import JointNameAliasRetargeter + +from assets import ( + resolve_dex_sharpa_config, + resolve_dex_sharpa_urdf, + resolve_sharpa_mjcf, +) +from constants import ( + DEX_HANDTRACKING_TO_BASELINK_FRAME_TRANSFORM, + HandRetargeter, + LEFT_FINGER_JOINT_NAMES, + LEFT_SHARPA_WAVE_JOINT_NAMES, + RIGHT_FINGER_JOINT_NAMES, + RIGHT_SHARPA_WAVE_JOINT_NAMES, + SHARPA_FINGER_JOINT_COUNT, + SHARPA_HAND_RETARGETERS, +) +from messages import joint_names_from_group_type + + +@dataclass(frozen=True) +class TeleopRos2SessionConfigContext: + config_asset_root: Path + left_finger_joint_name_aliases: Sequence[str] | None + mcap_config: McapReplayConfig | None + pedal_collection_id: str + resolved_hand_retargeter: HandRetargeter + right_finger_joint_name_aliases: Sequence[str] | None + session_mode: SessionMode + + +def build_controller_raw_config( + context: TeleopRos2SessionConfigContext, +) -> TeleopSessionConfig: + controllers = ControllersSource(name="controllers") + pipeline = OutputCombiner( + { + "controller_left": controllers.output(ControllersSource.LEFT), + "controller_right": controllers.output(ControllersSource.RIGHT), + } + ) + + return TeleopSessionConfig( + app_name="TeleopRos2Publisher", + pipeline=pipeline, + mode=context.session_mode, + mcap_config=context.mcap_config, + ) + + +def build_controller_teleop_config( + context: TeleopRos2SessionConfigContext, +) -> TeleopSessionConfig: + controllers = ControllersSource(name="controllers") + locomotion = LocomotionRootCmdRetargeter( + LocomotionRootCmdRetargeterConfig(), name="locomotion" + ) + locomotion_connected = locomotion.connect( + { + "controller_left": controllers.output(ControllersSource.LEFT), + "controller_right": controllers.output(ControllersSource.RIGHT), + } + ) + + pipeline_outputs = { + "controller_left": controllers.output(ControllersSource.LEFT), + "controller_right": controllers.output(ControllersSource.RIGHT), + "root_command": locomotion_connected.output("root_command"), + } + + if context.resolved_hand_retargeter == HandRetargeter.TRIHAND: + validate_joint_name_alias_count( + "left_finger_joint_names", + context.left_finger_joint_name_aliases, + len(LEFT_FINGER_JOINT_NAMES), + ) + validate_joint_name_alias_count( + "right_finger_joint_names", + context.right_finger_joint_name_aliases, + len(RIGHT_FINGER_JOINT_NAMES), + ) + left_finger_joint_names = ( + list(context.left_finger_joint_name_aliases) + if context.left_finger_joint_name_aliases is not None + else list(LEFT_FINGER_JOINT_NAMES) + ) + right_finger_joint_names = ( + list(context.right_finger_joint_name_aliases) + if context.right_finger_joint_name_aliases is not None + else list(RIGHT_FINGER_JOINT_NAMES) + ) + + left_hand_retargeter = TriHandMotionControllerRetargeter( + TriHandMotionControllerConfig( + hand_joint_names=left_finger_joint_names, controller_side="left" + ), + name="trihand_left", + ) + right_hand_retargeter = TriHandMotionControllerRetargeter( + TriHandMotionControllerConfig( + hand_joint_names=right_finger_joint_names, controller_side="right" + ), + name="trihand_right", + ) + left_hand_connected = left_hand_retargeter.connect( + {ControllersSource.LEFT: controllers.output(ControllersSource.LEFT)} + ) + right_hand_connected = right_hand_retargeter.connect( + {ControllersSource.RIGHT: controllers.output(ControllersSource.RIGHT)} + ) + pipeline_outputs.update( + { + "finger_joints_left": left_hand_connected.output("hand_joints"), + "finger_joints_right": right_hand_connected.output("hand_joints"), + } + ) + elif context.resolved_hand_retargeter in SHARPA_HAND_RETARGETERS: + validate_joint_name_alias_count( + "left_finger_joint_names", + context.left_finger_joint_name_aliases, + SHARPA_FINGER_JOINT_COUNT, + ) + validate_joint_name_alias_count( + "right_finger_joint_names", + context.right_finger_joint_name_aliases, + SHARPA_FINGER_JOINT_COUNT, + ) + hands = HandsSource(name="hands") + left_finger_joints, right_finger_joints = build_sharpa_finger_joint_outputs( + hands, context, "controller_teleop" + ) + pipeline_outputs.update( + { + "hand_left": hands.output(HandsSource.LEFT), + "hand_right": hands.output(HandsSource.RIGHT), + "finger_joints_left": left_finger_joints, + "finger_joints_right": right_finger_joints, + } + ) + else: + raise ValueError( + "controller_teleop requires hand_retargeter to resolve to " + f"'trihand', 'dexpilot', or 'pink_ik', got " + f"{context.resolved_hand_retargeter!r}" + ) + + pipeline = OutputCombiner(pipeline_outputs) + + return TeleopSessionConfig( + app_name="TeleopRos2Publisher", + pipeline=pipeline, + mode=context.session_mode, + mcap_config=context.mcap_config, + ) + + +def build_full_body_config( + context: TeleopRos2SessionConfigContext, +) -> TeleopSessionConfig: + controllers = ControllersSource(name="controllers") + full_body = FullBodySource(name="full_body") + pipeline = OutputCombiner( + { + "controller_left": controllers.output(ControllersSource.LEFT), + "controller_right": controllers.output(ControllersSource.RIGHT), + "full_body": full_body.output(FullBodySource.FULL_BODY), + } + ) + + return TeleopSessionConfig( + app_name="TeleopRos2Publisher", + pipeline=pipeline, + mode=context.session_mode, + mcap_config=context.mcap_config, + ) + + +def build_hand_teleop_config( + context: TeleopRos2SessionConfigContext, +) -> TeleopSessionConfig: + validate_joint_name_alias_count( + "left_finger_joint_names", + context.left_finger_joint_name_aliases, + SHARPA_FINGER_JOINT_COUNT, + ) + validate_joint_name_alias_count( + "right_finger_joint_names", + context.right_finger_joint_name_aliases, + SHARPA_FINGER_JOINT_COUNT, + ) + + hands = HandsSource(name="hands") + pedals = Generic3AxisPedalSource( + name="pedals", collection_id=context.pedal_collection_id + ) + locomotion = FootPedalRootCmdRetargeter( + FootPedalRootCmdRetargeterConfig(), + name="foot_pedal", + ) + locomotion_connected = locomotion.connect({"pedals": pedals.output("pedals")}) + left_finger_joints, right_finger_joints = build_sharpa_finger_joint_outputs( + hands, context, "hand_teleop" + ) + + pipeline = OutputCombiner( + { + "hand_left": hands.output(HandsSource.LEFT), + "hand_right": hands.output(HandsSource.RIGHT), + "root_command": locomotion_connected.output("root_command"), + "finger_joints_left": left_finger_joints, + "finger_joints_right": right_finger_joints, + } + ) + + return TeleopSessionConfig( + app_name="TeleopRos2Publisher", + pipeline=pipeline, + mode=context.session_mode, + mcap_config=context.mcap_config, + ) + + +def build_session_config( + mode: str, context: TeleopRos2SessionConfigContext +) -> TeleopSessionConfig: + if mode == "controller_teleop": + return build_controller_teleop_config(context) + if mode == "hand_teleop": + return build_hand_teleop_config(context) + if mode == "controller_raw": + return build_controller_raw_config(context) + if mode == "full_body": + return build_full_body_config(context) + raise ValueError(f"Unsupported mode {mode!r}") + + +def build_sharpa_finger_joint_outputs( + hands: HandsSource, + context: TeleopRos2SessionConfigContext, + mode_name: str, +): + if context.resolved_hand_retargeter == HandRetargeter.PINK_IK: + try: + from isaacteleop.retargeters import ( + SharpaHandRetargeter, + SharpaHandRetargeterConfig, + ) + except ModuleNotFoundError as exc: + raise ModuleNotFoundError( + f"{mode_name} with hand_retargeter:=pink_ik requires Sharpa " + "retargeting dependencies. Install/use a build with " + "isaacteleop[grounding] and bundled robotic_grounding." + ) from exc + + left_hand_retargeter = SharpaHandRetargeter( + SharpaHandRetargeterConfig( + robot_asset_path=resolve_sharpa_mjcf("left_sharpawave_nomesh.xml"), + hand_side="left", + ), + name="sharpa_left", + ) + right_hand_retargeter = SharpaHandRetargeter( + SharpaHandRetargeterConfig( + robot_asset_path=resolve_sharpa_mjcf("right_sharpawave_nomesh.xml"), + hand_side="right", + ), + name="sharpa_right", + ) + left_alias_name = "sharpa_left_joint_aliases" + right_alias_name = "sharpa_right_joint_aliases" + elif context.resolved_hand_retargeter == HandRetargeter.DEXPILOT: + left_hand_retargeter = DexHandRetargeter( + DexHandRetargeterConfig( + hand_retargeting_config=resolve_dex_sharpa_config( + context.config_asset_root, + "sharpa_wave_left_dexpilot.yml", + ), + hand_urdf=resolve_dex_sharpa_urdf( + context.config_asset_root, + "left_sharpa_wave.urdf", + ), + hand_joint_names=LEFT_SHARPA_WAVE_JOINT_NAMES, + handtracking_to_baselink_frame_transform=( + DEX_HANDTRACKING_TO_BASELINK_FRAME_TRANSFORM + ), + hand_side="left", + ), + name="dex_sharpa_left", + ) + right_hand_retargeter = DexHandRetargeter( + DexHandRetargeterConfig( + hand_retargeting_config=resolve_dex_sharpa_config( + context.config_asset_root, + "sharpa_wave_right_dexpilot.yml", + ), + hand_urdf=resolve_dex_sharpa_urdf( + context.config_asset_root, + "right_sharpa_wave.urdf", + ), + hand_joint_names=RIGHT_SHARPA_WAVE_JOINT_NAMES, + handtracking_to_baselink_frame_transform=( + DEX_HANDTRACKING_TO_BASELINK_FRAME_TRANSFORM + ), + hand_side="right", + ), + name="dex_sharpa_right", + ) + left_alias_name = "dex_sharpa_left_joint_aliases" + right_alias_name = "dex_sharpa_right_joint_aliases" + else: + raise ValueError( + f"Sharpa hand retargeting requires one of {SHARPA_HAND_RETARGETERS}, " + f"got {context.resolved_hand_retargeter!r}" + ) + + left_hand_connected = left_hand_retargeter.connect( + {HandsSource.LEFT: hands.output(HandsSource.LEFT)} + ) + right_hand_connected = right_hand_retargeter.connect( + {HandsSource.RIGHT: hands.output(HandsSource.RIGHT)} + ) + left_finger_joints = maybe_alias_hand_joints( + left_hand_connected, + joint_names_from_group_type(left_hand_retargeter.output_spec()["hand_joints"]), + context.left_finger_joint_name_aliases, + left_alias_name, + ) + right_finger_joints = maybe_alias_hand_joints( + right_hand_connected, + joint_names_from_group_type(right_hand_retargeter.output_spec()["hand_joints"]), + context.right_finger_joint_name_aliases, + right_alias_name, + ) + return left_finger_joints, right_finger_joints + + +def maybe_alias_hand_joints( + connected_hand_retargeter, + input_joint_names: Sequence[str], + output_joint_names: Sequence[str] | None, + name: str, +): + if output_joint_names is None: + return connected_hand_retargeter.output("hand_joints") + + alias_retargeter = JointNameAliasRetargeter( + input_joint_names=input_joint_names, + output_joint_names=output_joint_names, + name=name, + ) + alias_connected = alias_retargeter.connect( + {"hand_joints": connected_hand_retargeter.output("hand_joints")} + ) + return alias_connected.output("hand_joints") + + +def validate_joint_name_alias_count( + parameter_name: str, + aliases: Sequence[str] | None, + expected_count: int, +) -> None: + if aliases is None: + return + if len(aliases) != expected_count: + raise ValueError( + f"Parameter '{parameter_name}' must contain exactly {expected_count} " + f"joint name aliases, got {len(aliases)}" + ) diff --git a/examples/teleop_ros2/python/teleop_ros2_node.py b/examples/teleop_ros2/python/teleop_ros2_node.py index 776bfbee5..d9047edb7 100755 --- a/examples/teleop_ros2/python/teleop_ros2_node.py +++ b/examples/teleop_ros2/python/teleop_ros2_node.py @@ -37,7 +37,7 @@ import math import time from pathlib import Path -from typing import Dict, List, Sequence, Union +from typing import List import msgpack import msgpack_numpy as mnp @@ -59,280 +59,44 @@ from tf2_ros import TransformBroadcaster from isaacteleop.deviceio import McapReplayConfig -from isaacteleop.retargeting_engine.deviceio_source_nodes import ( - ControllersSource, - FullBodySource, - Generic3AxisPedalSource, - HandsSource, -) from isaacteleop.retargeting_engine.deviceio_source_nodes.pedals_source import ( DEFAULT_PEDAL_COLLECTION_ID, ) -from isaacteleop.retargeting_engine.interface import OptionalTensorGroup, OutputCombiner -from isaacteleop.retargeters import ( - DexHandRetargeter, - DexHandRetargeterConfig, - FootPedalRootCmdRetargeter, - FootPedalRootCmdRetargeterConfig, - LocomotionRootCmdRetargeter, - LocomotionRootCmdRetargeterConfig, - TriHandMotionControllerRetargeter, - TriHandMotionControllerConfig, -) -from isaacteleop.retargeting_engine.tensor_types.indices import ( - BodyJointPicoIndex, - ControllerInputIndex, - FullBodyInputIndex, - HandInputIndex, - HandJointIndex, +from isaacteleop.teleop_session_manager import SessionMode, TeleopSession +from assets import resolve_config_asset_root +from constants import ( + HAND_RETARGETERS, + TELEOP_MODES, + HandRetargeter, + resolve_hand_retargeter, + uses_hands_source_for_controller, ) -from isaacteleop.teleop_session_manager import ( - SessionMode, - TeleopSession, - TeleopSessionConfig, +from geometry import make_transform +from messages import ( + build_controller_payload, + build_ee_msg_from_controllers, + build_ee_msg_from_hands, + build_finger_joints_msg, + build_full_body_payload, + build_hand_msg_from_hands, + controller_aim_is_valid, + hand_wrist_is_valid, ) -from teleop_ros2_retargeters import JointNameAliasRetargeter - - -_BODY_JOINT_NAMES = [e.name for e in BodyJointPicoIndex] -_HAND_RETARGETER_MODE_DEFAULT = "mode_default" -_HAND_RETARGETER_TRIHAND = "trihand" -_HAND_RETARGETER_PINK_IK = "pink_ik" -_HAND_RETARGETER_DEXPILOT = "dexpilot" -_HAND_RETARGETERS = ( - _HAND_RETARGETER_MODE_DEFAULT, - _HAND_RETARGETER_TRIHAND, - _HAND_RETARGETER_PINK_IK, - _HAND_RETARGETER_DEXPILOT, +from session_config import ( + TeleopRos2SessionConfigContext, + build_session_config, ) -_SHARPA_HAND_RETARGETERS = (_HAND_RETARGETER_PINK_IK, _HAND_RETARGETER_DEXPILOT) -_TELEOP_MODES = ("controller_teleop", "hand_teleop", "controller_raw", "full_body") - -_TRIHAND_JOINT_NAMES = [ - "thumb_rotation", - "thumb_proximal", - "thumb_distal", - "index_proximal", - "index_distal", - "middle_proximal", - "middle_distal", -] -_LEFT_FINGER_JOINT_NAMES = [f"left_{n}" for n in _TRIHAND_JOINT_NAMES] -_RIGHT_FINGER_JOINT_NAMES = [f"right_{n}" for n in _TRIHAND_JOINT_NAMES] - -_SHARPA_WAVE_JOINT_NAMES = [ - "thumb_CMC_FE", - "thumb_CMC_AA", - "thumb_MCP_FE", - "thumb_MCP_AA", - "thumb_IP", - "index_MCP_FE", - "index_MCP_AA", - "index_PIP", - "index_DIP", - "middle_MCP_FE", - "middle_MCP_AA", - "middle_PIP", - "middle_DIP", - "ring_MCP_FE", - "ring_MCP_AA", - "ring_PIP", - "ring_DIP", - "pinky_CMC", - "pinky_MCP_FE", - "pinky_MCP_AA", - "pinky_PIP", - "pinky_DIP", -] -_SHARPA_FINGER_JOINT_COUNT = len(_SHARPA_WAVE_JOINT_NAMES) -_LEFT_SHARPA_WAVE_JOINT_NAMES = [f"left_{n}" for n in _SHARPA_WAVE_JOINT_NAMES] -_RIGHT_SHARPA_WAVE_JOINT_NAMES = [f"right_{n}" for n in _SHARPA_WAVE_JOINT_NAMES] -_DEX_HANDTRACKING_TO_BASELINK_FRAME_TRANSFORM = (0, -1, 0, -1, 0, 0, 0, 0, -1) # Helper functions -def _append_hand_poses( - poses: List[Pose], - joint_positions: np.ndarray, - joint_orientations: np.ndarray, - joint_valid: np.ndarray, - transform_rot: Rotation | None = None, - transform_trans: Sequence[float] | None = None, -) -> None: - for joint_idx in range( - HandJointIndex.THUMB_METACARPAL, HandJointIndex.LITTLE_TIP + 1 - ): - if joint_valid[joint_idx]: - pose = _to_pose(joint_positions[joint_idx], joint_orientations[joint_idx]) - if transform_rot is not None or transform_trans is not None: - pose = _apply_transform_to_pose(pose, transform_rot, transform_trans) - else: - pose = _to_pose([0.0, 0.0, 0.0]) - poses.append(pose) - - -def _apply_manus_controller_to_hand_pose(pose: Pose, side: str) -> Pose: - """ - Apply MANUS controller-to-hand calibration in the pose's current frame. - - This is equivalent to: - - T_world_hand = T_world_controller @ T_controller_hand - """ - if side not in ("left", "right"): - raise ValueError(f"side must be 'left' or 'right', got {side!r}") - - # All MANUS calibration data is intentionally kept in this one function. - hand_left_pico_rotation = np.array( - [ - [-0.91777945, -0.18672461, -0.35044942], - [0.37550315, -0.69513369, -0.61301431], - [-0.12914434, -0.6942068, 0.70809509], - ], - dtype=float, - ) - hand_pico_translation = np.array([0.0, 0.0, 0.08], dtype=float) - - if side == "left": - controller_to_hand_rot_mat = hand_left_pico_rotation.T - else: - mirror_y = np.diag([1.0, -1.0, 1.0]) - hand_right_pico_rotation = mirror_y @ hand_left_pico_rotation @ mirror_y - controller_to_hand_rot_mat = hand_right_pico_rotation.T - - controller_to_hand_trans = -controller_to_hand_rot_mat @ hand_pico_translation - - world_controller_pos = np.array( - [pose.position.x, pose.position.y, pose.position.z], - dtype=float, - ) - world_controller_rot = Rotation.from_quat( - [ - pose.orientation.x, - pose.orientation.y, - pose.orientation.z, - pose.orientation.w, - ] - ) - - controller_to_hand_rot = Rotation.from_matrix(controller_to_hand_rot_mat) - - world_hand_pos = world_controller_pos + world_controller_rot.apply( - controller_to_hand_trans - ) - world_hand_rot = world_controller_rot * controller_to_hand_rot - - return _to_pose(world_hand_pos, world_hand_rot.as_quat()) - - -def _apply_transform_to_pose( - pose: Pose, - rotation: Rotation | None = None, - translation: Sequence[float] | None = None, -) -> Pose: - """ - Return a new Pose with world-frame position transform and orientation - basis change applied. - """ - p = [pose.position.x, pose.position.y, pose.position.z] - orientation = Rotation.from_quat( - [ - pose.orientation.x, - pose.orientation.y, - pose.orientation.z, - pose.orientation.w, - ] - ) - - if rotation is not None: - p = rotation.apply(p) - # Conjugation keeps the same physical orientation while expressing it - # in the rotated basis used for published EE and hand poses. - orientation = rotation * orientation * rotation.inv() - - q = orientation.as_quat() - - result = Pose() - if translation is not None: - result.position.x = float(p[0]) + translation[0] - result.position.y = float(p[1]) + translation[1] - result.position.z = float(p[2]) + translation[2] - else: - result.position.x = float(p[0]) - result.position.y = float(p[1]) - result.position.z = float(p[2]) - - result.orientation.x = float(q[0]) - result.orientation.y = float(q[1]) - result.orientation.z = float(q[2]) - result.orientation.w = float(q[3]) - return result - - -def _controller_aim_is_valid(ctrl: OptionalTensorGroup) -> bool: - # DeviceIO's AIM_IS_VALID flag is the usability contract for aim poses. - return not ctrl.is_none and bool(ctrl[ControllerInputIndex.AIM_IS_VALID]) - - -def _hand_joint_is_valid(hand: OptionalTensorGroup, joint_idx: HandJointIndex) -> bool: - if hand.is_none: - return False - return bool(hand[HandInputIndex.JOINT_VALID][joint_idx]) - - -def _joint_names_from_group_type(group_type) -> List[str]: - return [tensor_type.name for tensor_type in group_type.types] - - -def _make_transform( - stamp, - parent_frame: str, - child_frame: str, - position: Union[np.ndarray, Sequence[float]], - orientation: Union[np.ndarray, Sequence[float]], -) -> TransformStamped: - tf = TransformStamped() - tf.header.stamp = stamp - tf.header.frame_id = parent_frame - tf.child_frame_id = child_frame - tf.transform.translation.x = float(position[0]) - tf.transform.translation.y = float(position[1]) - tf.transform.translation.z = float(position[2]) - tf.transform.rotation.x = float(orientation[0]) - tf.transform.rotation.y = float(orientation[1]) - tf.transform.rotation.z = float(orientation[2]) - tf.transform.rotation.w = float(orientation[3]) - return tf - - def _make_unset_string_array_parameter_empty(node: Node, parameter: Parameter) -> None: if parameter.type_ != Parameter.Type.NOT_SET: return node.set_parameters([Parameter(parameter.name, Parameter.Type.STRING_ARRAY, [])]) -def _maybe_alias_hand_joints( - connected_hand_retargeter, - input_joint_names: Sequence[str], - output_joint_names: Sequence[str] | None, - name: str, -): - if output_joint_names is None: - return connected_hand_retargeter.output("hand_joints") - - alias_retargeter = JointNameAliasRetargeter( - input_joint_names=input_joint_names, - output_joint_names=output_joint_names, - name=name, - ) - alias_connected = alias_retargeter.connect( - {"hand_joints": connected_hand_retargeter.output("hand_joints")} - ) - return alias_connected.output("hand_joints") - - def _resolve_finger_joint_name_aliases( parameter_name: str, names: List[str], @@ -346,346 +110,6 @@ def _resolve_finger_joint_name_aliases( return names or None -def _resolve_hand_retargeter(mode: str, hand_retargeter: str) -> str: - if hand_retargeter == _HAND_RETARGETER_MODE_DEFAULT: - if mode == "controller_teleop": - return _HAND_RETARGETER_TRIHAND - if mode == "hand_teleop": - return _HAND_RETARGETER_DEXPILOT - return hand_retargeter - - if mode == "hand_teleop" and hand_retargeter == _HAND_RETARGETER_TRIHAND: - raise ValueError( - "Parameter 'hand_retargeter:=trihand' is only valid with " - "mode:=controller_teleop" - ) - - return hand_retargeter - - -def _resolve_sharpa_mjcf(filename: str) -> str: - try: - from importlib.resources import files - except ImportError as exc: # pragma: no cover - Python 3.10+ has this. - raise ModuleNotFoundError( - "Sharpa hand retargeting requires importlib.resources support" - ) from exc - - try: - return str( - files("robotic_grounding") / "assets" / "xmls" / "sharpawave" / filename - ) - except ModuleNotFoundError as exc: - raise ModuleNotFoundError( - "Sharpa hand retargeting requires robotic_grounding assets. " - "Install/use a build with isaacteleop[grounding] and bundled robotic_grounding." - ) from exc - - -def _resolve_config_asset_root(raw_root: str) -> Path: - raw_root = raw_root.strip() - if not raw_root: - return Path(__file__).resolve().parents[1] - - root = Path(raw_root).expanduser().resolve() - if not root.is_dir(): - raise FileNotFoundError(f"config_asset_root directory not found: {root}") - return root - - -def _resolve_teleop_ros2_file(root: Path, description: str, *parts: str) -> str: - path = root.joinpath(*parts) - if not path.is_file(): - raise FileNotFoundError(f"{description} not found at: {path}") - return str(path) - - -def _resolve_dex_sharpa_config(root: Path, filename: str) -> str: - return _resolve_teleop_ros2_file( - root, - "DexPilot Sharpa Wave retargeting config", - "configs", - filename, - ) - - -def _resolve_dex_sharpa_urdf(root: Path, filename: str) -> str: - try: - return _resolve_teleop_ros2_file( - root, - "Standalone Sharpa Wave URDF", - "assets", - "urdf", - "sharpa_standalone", - filename, - ) - except FileNotFoundError as exc: - raise FileNotFoundError( - f"{exc}. Populate the official Sharpa Wave URDFs with " - "examples/teleop_ros2/scripts/fetch_sharpa_wave_urdfs.py before using " - "hand_retargeter:=dexpilot, or use a Docker image built from " - "examples/teleop_ros2/Dockerfile." - ) from exc - - -def _to_pose(position, orientation=None) -> Pose: - pose = Pose() - pose.position.x = float(position[0]) - pose.position.y = float(position[1]) - pose.position.z = float(position[2]) - if orientation is None: - pose.orientation.w = 1.0 - else: - pose.orientation.x = float(orientation[0]) - pose.orientation.y = float(orientation[1]) - pose.orientation.z = float(orientation[2]) - pose.orientation.w = float(orientation[3]) - return pose - - -def _validate_joint_name_alias_count( - parameter_name: str, - aliases: Sequence[str] | None, - expected_count: int, -) -> None: - if aliases is None: - return - if len(aliases) != expected_count: - raise ValueError( - f"Parameter '{parameter_name}' must contain exactly {expected_count} " - f"joint name aliases, got {len(aliases)}" - ) - - -# Message builders - - -def _build_ee_msg_from_controllers( - left_ctrl: OptionalTensorGroup, - right_ctrl: OptionalTensorGroup, - now, - frame_id: str, - transform_rot: Rotation | None = None, - transform_trans: Sequence[float] | None = None, - controller_uses_hands_source: bool = False, -) -> PoseArray: - """Build a PoseArray with left then right controller/hand wrist poses.""" - msg = PoseArray() - msg.header.stamp = now - msg.header.frame_id = frame_id - - if _controller_aim_is_valid(left_ctrl): - pos = [float(x) for x in left_ctrl[ControllerInputIndex.AIM_POSITION]] - ori = [float(x) for x in left_ctrl[ControllerInputIndex.AIM_ORIENTATION]] - pose = _to_pose(pos, ori) - - if transform_rot is not None or transform_trans is not None: - pose = _apply_transform_to_pose(pose, transform_rot, transform_trans) - - if controller_uses_hands_source: - pose = _apply_manus_controller_to_hand_pose(pose, "left") - - msg.poses.append(pose) - else: - msg.poses.append(_to_pose([0.0, 0.0, 0.0])) - - if _controller_aim_is_valid(right_ctrl): - pos = [float(x) for x in right_ctrl[ControllerInputIndex.AIM_POSITION]] - ori = [float(x) for x in right_ctrl[ControllerInputIndex.AIM_ORIENTATION]] - pose = _to_pose(pos, ori) - - if transform_rot is not None or transform_trans is not None: - pose = _apply_transform_to_pose(pose, transform_rot, transform_trans) - - if controller_uses_hands_source: - pose = _apply_manus_controller_to_hand_pose(pose, "right") - - msg.poses.append(pose) - else: - msg.poses.append(_to_pose([0.0, 0.0, 0.0])) - - return msg - - -def _build_ee_msg_from_hands( - left_hand: OptionalTensorGroup, - right_hand: OptionalTensorGroup, - now, - frame_id: str, - transform_rot: Rotation | None = None, - transform_trans: Sequence[float] | None = None, -) -> PoseArray: - """Build a PoseArray with left then right hand wrist poses (EE proxy).""" - msg = PoseArray() - msg.header.stamp = now - msg.header.frame_id = frame_id - - if _hand_joint_is_valid(left_hand, HandJointIndex.WRIST): - left_positions = np.asarray(left_hand[HandInputIndex.JOINT_POSITIONS]) - left_orientations = np.asarray(left_hand[HandInputIndex.JOINT_ORIENTATIONS]) - pose = _to_pose( - left_positions[HandJointIndex.WRIST], - left_orientations[HandJointIndex.WRIST], - ) - if transform_rot is not None or transform_trans is not None: - pose = _apply_transform_to_pose(pose, transform_rot, transform_trans) - msg.poses.append(pose) - else: - msg.poses.append(_to_pose([0.0, 0.0, 0.0])) - - if _hand_joint_is_valid(right_hand, HandJointIndex.WRIST): - right_positions = np.asarray(right_hand[HandInputIndex.JOINT_POSITIONS]) - right_orientations = np.asarray(right_hand[HandInputIndex.JOINT_ORIENTATIONS]) - pose = _to_pose( - right_positions[HandJointIndex.WRIST], - right_orientations[HandJointIndex.WRIST], - ) - if transform_rot is not None or transform_trans is not None: - pose = _apply_transform_to_pose(pose, transform_rot, transform_trans) - msg.poses.append(pose) - else: - msg.poses.append(_to_pose([0.0, 0.0, 0.0])) - - return msg - - -def _build_hand_msg_from_hands( - left_hand: OptionalTensorGroup, - right_hand: OptionalTensorGroup, - now, - frame_id: str, - transform_rot: Rotation | None = None, - transform_trans: Sequence[float] | None = None, -) -> PoseArray: - """Build a PoseArray with right then left hand finger joints.""" - msg = PoseArray() - msg.header.stamp = now - msg.header.frame_id = frame_id - - if not right_hand.is_none: - right_positions = np.asarray(right_hand[HandInputIndex.JOINT_POSITIONS]) - right_orientations = np.asarray(right_hand[HandInputIndex.JOINT_ORIENTATIONS]) - right_valid = np.asarray(right_hand[HandInputIndex.JOINT_VALID]) - _append_hand_poses( - msg.poses, - right_positions, - right_orientations, - right_valid, - transform_rot, - transform_trans, - ) - else: - for _ in range(HandJointIndex.THUMB_METACARPAL, HandJointIndex.LITTLE_TIP + 1): - msg.poses.append(_to_pose([0.0, 0.0, 0.0])) - - if not left_hand.is_none: - left_positions = np.asarray(left_hand[HandInputIndex.JOINT_POSITIONS]) - left_orientations = np.asarray(left_hand[HandInputIndex.JOINT_ORIENTATIONS]) - left_valid = np.asarray(left_hand[HandInputIndex.JOINT_VALID]) - _append_hand_poses( - msg.poses, - left_positions, - left_orientations, - left_valid, - transform_rot, - transform_trans, - ) - else: - for _ in range(HandJointIndex.THUMB_METACARPAL, HandJointIndex.LITTLE_TIP + 1): - msg.poses.append(_to_pose([0.0, 0.0, 0.0])) - - return msg - - -def _build_controller_payload( - left_ctrl: OptionalTensorGroup, right_ctrl: OptionalTensorGroup -) -> Dict: - def _as_list(ctrl, index): - if ctrl.is_none: - return [0.0, 0.0, 0.0] - return [float(x) for x in ctrl[index]] - - def _as_quat(ctrl, index): - if ctrl.is_none: - return [1.0, 0.0, 0.0, 0.0] - return [float(x) for x in ctrl[index]] - - def _as_float(ctrl, index): - if ctrl.is_none: - return 0.0 - return float(ctrl[index]) - - return { - "timestamp": time.time_ns(), - "left_thumbstick": [ - _as_float(left_ctrl, ControllerInputIndex.THUMBSTICK_X), - _as_float(left_ctrl, ControllerInputIndex.THUMBSTICK_Y), - ], - "right_thumbstick": [ - _as_float(right_ctrl, ControllerInputIndex.THUMBSTICK_X), - _as_float(right_ctrl, ControllerInputIndex.THUMBSTICK_Y), - ], - "left_trigger_value": _as_float(left_ctrl, ControllerInputIndex.TRIGGER_VALUE), - "right_trigger_value": _as_float( - right_ctrl, ControllerInputIndex.TRIGGER_VALUE - ), - "left_squeeze_value": _as_float(left_ctrl, ControllerInputIndex.SQUEEZE_VALUE), - "right_squeeze_value": _as_float( - right_ctrl, ControllerInputIndex.SQUEEZE_VALUE - ), - "left_aim_position": _as_list(left_ctrl, ControllerInputIndex.AIM_POSITION), - "right_aim_position": _as_list(right_ctrl, ControllerInputIndex.AIM_POSITION), - "left_grip_position": _as_list(left_ctrl, ControllerInputIndex.GRIP_POSITION), - "right_grip_position": _as_list(right_ctrl, ControllerInputIndex.GRIP_POSITION), - "left_aim_orientation": _as_quat( - left_ctrl, ControllerInputIndex.AIM_ORIENTATION - ), - "right_aim_orientation": _as_quat( - right_ctrl, ControllerInputIndex.AIM_ORIENTATION - ), - "left_grip_orientation": _as_quat( - left_ctrl, ControllerInputIndex.GRIP_ORIENTATION - ), - "right_grip_orientation": _as_quat( - right_ctrl, ControllerInputIndex.GRIP_ORIENTATION - ), - "left_primary_click": _as_float(left_ctrl, ControllerInputIndex.PRIMARY_CLICK), - "right_primary_click": _as_float( - right_ctrl, ControllerInputIndex.PRIMARY_CLICK - ), - "left_secondary_click": _as_float( - left_ctrl, ControllerInputIndex.SECONDARY_CLICK - ), - "right_secondary_click": _as_float( - right_ctrl, ControllerInputIndex.SECONDARY_CLICK - ), - "left_thumbstick_click": _as_float( - left_ctrl, ControllerInputIndex.THUMBSTICK_CLICK - ), - "right_thumbstick_click": _as_float( - right_ctrl, ControllerInputIndex.THUMBSTICK_CLICK - ), - "left_menu_click": _as_float(left_ctrl, ControllerInputIndex.MENU_CLICK), - "right_menu_click": _as_float(right_ctrl, ControllerInputIndex.MENU_CLICK), - "left_is_active": not left_ctrl.is_none, - "right_is_active": not right_ctrl.is_none, - } - - -def _build_full_body_payload(full_body: OptionalTensorGroup) -> Dict: - positions = np.asarray(full_body[FullBodyInputIndex.JOINT_POSITIONS]) - orientations = np.asarray(full_body[FullBodyInputIndex.JOINT_ORIENTATIONS]) - valid = np.asarray(full_body[FullBodyInputIndex.JOINT_VALID]) - - return { - "timestamp": time.time_ns(), - "joint_names": _BODY_JOINT_NAMES, - "joint_positions": [[float(v) for v in pos] for pos in positions], - "joint_orientations": [[float(v) for v in ori] for ori in orientations], - "joint_valid": [bool(v) for v in valid], - } - - class TeleopRos2Node(Node): """ROS 2 node that publishes teleop data.""" @@ -706,7 +130,7 @@ def __init__(self) -> None: ) self.declare_parameter( "hand_retargeter", - _HAND_RETARGETER_MODE_DEFAULT, + HandRetargeter.MODE_DEFAULT.value, ParameterDescriptor( description=( "Hand retargeter backend. 'mode_default' resolves to " @@ -823,26 +247,27 @@ def __init__(self) -> None: raise ValueError("Parameter 'rate_hz' must be > 0") self._sleep_period_s: float = 1.0 / rate_hz mode = self.get_parameter("mode").get_parameter_value().string_value - if mode not in _TELEOP_MODES: + if mode not in TELEOP_MODES: raise ValueError( - f"Parameter 'mode' must be one of {_TELEOP_MODES}, got {mode!r}" + f"Parameter 'mode' must be one of {TELEOP_MODES}, got {mode!r}" ) self.get_logger().info(f"Mode: {mode}") self._mode: str = mode - self._hand_retargeter: str = ( + raw_hand_retargeter = ( self.get_parameter("hand_retargeter").get_parameter_value().string_value ) - if self._hand_retargeter not in _HAND_RETARGETERS: + try: + self._hand_retargeter: HandRetargeter = HandRetargeter(raw_hand_retargeter) + except ValueError as exc: raise ValueError( - f"Parameter 'hand_retargeter' must be one of {_HAND_RETARGETERS}, " - f"got {self._hand_retargeter!r}" - ) - self._resolved_hand_retargeter: str = _resolve_hand_retargeter( + f"Parameter 'hand_retargeter' must be one of {HAND_RETARGETERS}, " + f"got {raw_hand_retargeter!r}" + ) from exc + self._resolved_hand_retargeter: HandRetargeter = resolve_hand_retargeter( self._mode, self._hand_retargeter ) - self._controller_uses_hands_source: bool = ( - self._mode == "controller_teleop" - and self._resolved_hand_retargeter in _SHARPA_HAND_RETARGETERS + self._controller_uses_hands_source: bool = uses_hands_source_for_controller( + self._mode, self._resolved_hand_retargeter ) if self._mode in ("hand_teleop", "controller_teleop"): self.get_logger().info(f"Hand retargeter: {self._resolved_hand_retargeter}") @@ -850,8 +275,9 @@ def __init__(self) -> None: self.get_logger().info( "Applying MANUS controller-to-hand transform after pose transform." ) - self._config_asset_root: Path = _resolve_config_asset_root( - self.get_parameter("config_asset_root").get_parameter_value().string_value + self._config_asset_root: Path = resolve_config_asset_root( + self.get_parameter("config_asset_root").get_parameter_value().string_value, + Path(__file__).resolve().parents[1], ) self.get_logger().info(f"Config/asset root: {self._config_asset_root}") mcap_replay_path = ( @@ -974,298 +400,18 @@ def __init__(self) -> None: JointState, "xr_teleop/finger_joints", 10 ) - self._config = self._build_session_config(self._mode) - - def _build_controller_raw_config(self) -> TeleopSessionConfig: - controllers = ControllersSource(name="controllers") - pipeline = OutputCombiner( - { - "controller_left": controllers.output(ControllersSource.LEFT), - "controller_right": controllers.output(ControllersSource.RIGHT), - } - ) - - return TeleopSessionConfig( - app_name="TeleopRos2Publisher", - pipeline=pipeline, - mode=self._session_mode, - mcap_config=self._mcap_config, - ) - - def _build_controller_teleop_config(self) -> TeleopSessionConfig: - controllers = ControllersSource(name="controllers") - locomotion = LocomotionRootCmdRetargeter( - LocomotionRootCmdRetargeterConfig(), name="locomotion" - ) - locomotion_connected = locomotion.connect( - { - "controller_left": controllers.output(ControllersSource.LEFT), - "controller_right": controllers.output(ControllersSource.RIGHT), - } - ) - - pipeline_outputs = { - "controller_left": controllers.output(ControllersSource.LEFT), - "controller_right": controllers.output(ControllersSource.RIGHT), - "root_command": locomotion_connected.output("root_command"), - } - - if self._resolved_hand_retargeter == _HAND_RETARGETER_TRIHAND: - _validate_joint_name_alias_count( - "left_finger_joint_names", - self._left_finger_joint_name_aliases, - len(_LEFT_FINGER_JOINT_NAMES), - ) - _validate_joint_name_alias_count( - "right_finger_joint_names", - self._right_finger_joint_name_aliases, - len(_RIGHT_FINGER_JOINT_NAMES), - ) - left_finger_joint_names = ( - list(self._left_finger_joint_name_aliases) - if self._left_finger_joint_name_aliases is not None - else list(_LEFT_FINGER_JOINT_NAMES) - ) - right_finger_joint_names = ( - list(self._right_finger_joint_name_aliases) - if self._right_finger_joint_name_aliases is not None - else list(_RIGHT_FINGER_JOINT_NAMES) - ) - - left_hand_retargeter = TriHandMotionControllerRetargeter( - TriHandMotionControllerConfig( - hand_joint_names=left_finger_joint_names, controller_side="left" - ), - name="trihand_left", - ) - right_hand_retargeter = TriHandMotionControllerRetargeter( - TriHandMotionControllerConfig( - hand_joint_names=right_finger_joint_names, controller_side="right" - ), - name="trihand_right", - ) - left_hand_connected = left_hand_retargeter.connect( - {ControllersSource.LEFT: controllers.output(ControllersSource.LEFT)} - ) - right_hand_connected = right_hand_retargeter.connect( - {ControllersSource.RIGHT: controllers.output(ControllersSource.RIGHT)} - ) - pipeline_outputs.update( - { - "finger_joints_left": left_hand_connected.output("hand_joints"), - "finger_joints_right": right_hand_connected.output("hand_joints"), - } - ) - elif self._resolved_hand_retargeter in _SHARPA_HAND_RETARGETERS: - _validate_joint_name_alias_count( - "left_finger_joint_names", - self._left_finger_joint_name_aliases, - _SHARPA_FINGER_JOINT_COUNT, - ) - _validate_joint_name_alias_count( - "right_finger_joint_names", - self._right_finger_joint_name_aliases, - _SHARPA_FINGER_JOINT_COUNT, - ) - hands = HandsSource(name="hands") - left_finger_joints, right_finger_joints = ( - self._build_sharpa_finger_joint_outputs(hands, "controller_teleop") - ) - pipeline_outputs.update( - { - "hand_left": hands.output(HandsSource.LEFT), - "hand_right": hands.output(HandsSource.RIGHT), - "finger_joints_left": left_finger_joints, - "finger_joints_right": right_finger_joints, - } - ) - else: - raise ValueError( - "controller_teleop requires hand_retargeter to resolve to " - f"'trihand', 'dexpilot', or 'pink_ik', got " - f"{self._resolved_hand_retargeter!r}" - ) - - pipeline = OutputCombiner(pipeline_outputs) - - return TeleopSessionConfig( - app_name="TeleopRos2Publisher", - pipeline=pipeline, - mode=self._session_mode, - mcap_config=self._mcap_config, - ) - - def _build_full_body_config(self) -> TeleopSessionConfig: - controllers = ControllersSource(name="controllers") - full_body = FullBodySource(name="full_body") - pipeline = OutputCombiner( - { - "controller_left": controllers.output(ControllersSource.LEFT), - "controller_right": controllers.output(ControllersSource.RIGHT), - "full_body": full_body.output(FullBodySource.FULL_BODY), - } - ) - - return TeleopSessionConfig( - app_name="TeleopRos2Publisher", - pipeline=pipeline, - mode=self._session_mode, - mcap_config=self._mcap_config, - ) - - def _build_hand_teleop_config(self) -> TeleopSessionConfig: - _validate_joint_name_alias_count( - "left_finger_joint_names", - self._left_finger_joint_name_aliases, - _SHARPA_FINGER_JOINT_COUNT, - ) - _validate_joint_name_alias_count( - "right_finger_joint_names", - self._right_finger_joint_name_aliases, - _SHARPA_FINGER_JOINT_COUNT, - ) - - hands = HandsSource(name="hands") - pedals = Generic3AxisPedalSource( - name="pedals", collection_id=self._pedal_collection_id - ) - locomotion = FootPedalRootCmdRetargeter( - FootPedalRootCmdRetargeterConfig(), - name="foot_pedal", - ) - locomotion_connected = locomotion.connect({"pedals": pedals.output("pedals")}) - left_finger_joints, right_finger_joints = ( - self._build_sharpa_finger_joint_outputs(hands, "hand_teleop") - ) - - pipeline = OutputCombiner( - { - "hand_left": hands.output(HandsSource.LEFT), - "hand_right": hands.output(HandsSource.RIGHT), - "root_command": locomotion_connected.output("root_command"), - "finger_joints_left": left_finger_joints, - "finger_joints_right": right_finger_joints, - } - ) - - return TeleopSessionConfig( - app_name="TeleopRos2Publisher", - pipeline=pipeline, - mode=self._session_mode, - mcap_config=self._mcap_config, - ) - - def _build_session_config(self, mode: str) -> TeleopSessionConfig: - if mode == "controller_teleop": - return self._build_controller_teleop_config() - if mode == "hand_teleop": - return self._build_hand_teleop_config() - if mode == "controller_raw": - return self._build_controller_raw_config() - if mode == "full_body": - return self._build_full_body_config() - raise ValueError(f"Unsupported mode {mode!r}") - - def _build_sharpa_finger_joint_outputs(self, hands: HandsSource, context: str): - if self._resolved_hand_retargeter == _HAND_RETARGETER_PINK_IK: - try: - from isaacteleop.retargeters import ( - SharpaHandRetargeter, - SharpaHandRetargeterConfig, - ) - except ModuleNotFoundError as exc: - raise ModuleNotFoundError( - f"{context} with hand_retargeter:=pink_ik requires Sharpa " - "retargeting dependencies. Install/use a build with " - "isaacteleop[grounding] and bundled robotic_grounding." - ) from exc - - left_hand_retargeter = SharpaHandRetargeter( - SharpaHandRetargeterConfig( - robot_asset_path=_resolve_sharpa_mjcf("left_sharpawave_nomesh.xml"), - hand_side="left", - ), - name="sharpa_left", - ) - right_hand_retargeter = SharpaHandRetargeter( - SharpaHandRetargeterConfig( - robot_asset_path=_resolve_sharpa_mjcf( - "right_sharpawave_nomesh.xml" - ), - hand_side="right", - ), - name="sharpa_right", - ) - left_alias_name = "sharpa_left_joint_aliases" - right_alias_name = "sharpa_right_joint_aliases" - elif self._resolved_hand_retargeter == _HAND_RETARGETER_DEXPILOT: - left_hand_retargeter = DexHandRetargeter( - DexHandRetargeterConfig( - hand_retargeting_config=_resolve_dex_sharpa_config( - self._config_asset_root, - "sharpa_wave_left_dexpilot.yml", - ), - hand_urdf=_resolve_dex_sharpa_urdf( - self._config_asset_root, - "left_sharpa_wave.urdf", - ), - hand_joint_names=_LEFT_SHARPA_WAVE_JOINT_NAMES, - handtracking_to_baselink_frame_transform=( - _DEX_HANDTRACKING_TO_BASELINK_FRAME_TRANSFORM - ), - hand_side="left", - ), - name="dex_sharpa_left", - ) - right_hand_retargeter = DexHandRetargeter( - DexHandRetargeterConfig( - hand_retargeting_config=_resolve_dex_sharpa_config( - self._config_asset_root, - "sharpa_wave_right_dexpilot.yml", - ), - hand_urdf=_resolve_dex_sharpa_urdf( - self._config_asset_root, - "right_sharpa_wave.urdf", - ), - hand_joint_names=_RIGHT_SHARPA_WAVE_JOINT_NAMES, - handtracking_to_baselink_frame_transform=( - _DEX_HANDTRACKING_TO_BASELINK_FRAME_TRANSFORM - ), - hand_side="right", - ), - name="dex_sharpa_right", - ) - left_alias_name = "dex_sharpa_left_joint_aliases" - right_alias_name = "dex_sharpa_right_joint_aliases" - else: - raise ValueError( - f"Sharpa hand retargeting requires one of {_SHARPA_HAND_RETARGETERS}, " - f"got {self._resolved_hand_retargeter!r}" - ) - - left_hand_connected = left_hand_retargeter.connect( - {HandsSource.LEFT: hands.output(HandsSource.LEFT)} - ) - right_hand_connected = right_hand_retargeter.connect( - {HandsSource.RIGHT: hands.output(HandsSource.RIGHT)} - ) - left_finger_joints = _maybe_alias_hand_joints( - left_hand_connected, - _joint_names_from_group_type( - left_hand_retargeter.output_spec()["hand_joints"] - ), - self._left_finger_joint_name_aliases, - left_alias_name, - ) - right_finger_joints = _maybe_alias_hand_joints( - right_hand_connected, - _joint_names_from_group_type( - right_hand_retargeter.output_spec()["hand_joints"] + self._config = build_session_config( + self._mode, + TeleopRos2SessionConfigContext( + config_asset_root=self._config_asset_root, + left_finger_joint_name_aliases=self._left_finger_joint_name_aliases, + mcap_config=self._mcap_config, + pedal_collection_id=self._pedal_collection_id, + resolved_hand_retargeter=self._resolved_hand_retargeter, + right_finger_joint_name_aliases=self._right_finger_joint_name_aliases, + session_mode=self._session_mode, ), - self._right_finger_joint_name_aliases, - right_alias_name, ) - return left_finger_joints, right_finger_joints def _build_wrist_tfs( self, @@ -1289,7 +435,7 @@ def _get_orientation(pose: Pose) -> List[float]: if left_available: pose = ee_msg.poses[0] tfs.append( - _make_transform( + make_transform( now, self._world_frame, self._left_wrist_frame, @@ -1300,7 +446,7 @@ def _get_orientation(pose: Pose) -> List[float]: if right_available: pose = ee_msg.poses[1] tfs.append( - _make_transform( + make_transform( now, self._world_frame, self._right_wrist_frame, @@ -1310,6 +456,140 @@ def _get_orientation(pose: Pose) -> List[float]: ) return tfs + def _publish_controller_outputs(self, result: dict, now) -> None: + left_ctrl = result["controller_left"] + right_ctrl = result["controller_right"] + ee_msg = build_ee_msg_from_controllers( + left_ctrl, + right_ctrl, + now, + self._world_frame, + self._transform_rot, + self._transform_trans, + self._controller_uses_hands_source, + ) + if ee_msg.poses: + self._pub_ee_pose.publish(ee_msg) + wrist_tfs = self._build_wrist_tfs( + ee_msg, + right_available=controller_aim_is_valid(right_ctrl), + left_available=controller_aim_is_valid(left_ctrl), + now=now, + ) + if wrist_tfs: + self._tf_broadcaster.sendTransform(wrist_tfs) + if self._controller_uses_hands_source: + hand_msg = build_hand_msg_from_hands( + result["hand_left"], + result["hand_right"], + now, + self._world_frame, + self._transform_rot, + self._transform_trans, + ) + if hand_msg.poses: + self._pub_hand.publish(hand_msg) + + def _publish_controller_payload(self, result: dict) -> None: + if self._mode not in ("controller_raw", "controller_teleop", "full_body"): + return + + left_ctrl = result["controller_left"] + right_ctrl = result["controller_right"] + if left_ctrl.is_none and right_ctrl.is_none: + return + + controller_payload = build_controller_payload(left_ctrl, right_ctrl) + payload = msgpack.packb(controller_payload, default=mnp.encode) + controller_msg = ByteMultiArray() + controller_msg.data = tuple(bytes([a]) for a in payload) + self._pub_controller.publish(controller_msg) + + def _publish_finger_joints(self, result: dict, now) -> None: + if self._mode not in ("controller_teleop", "hand_teleop"): + return + + finger_joints_msg = build_finger_joints_msg( + result["finger_joints_left"], + result["finger_joints_right"], + now, + self._world_frame, + ) + if finger_joints_msg is not None: + self._pub_finger_joints.publish(finger_joints_msg) + + def _publish_full_body_payload(self, result: dict) -> None: + if self._mode != "full_body": + return + + full_body_data = result["full_body"] + if full_body_data.is_none: + return + + body_payload = build_full_body_payload(full_body_data) + payload = msgpack.packb(body_payload, default=mnp.encode) + body_msg = ByteMultiArray() + body_msg.data = tuple(bytes([a]) for a in payload) + self._pub_full_body.publish(body_msg) + + def _publish_hand_tracking_outputs(self, result: dict, now) -> None: + left_hand = result["hand_left"] + right_hand = result["hand_right"] + hand_msg = build_hand_msg_from_hands( + left_hand, + right_hand, + now, + self._world_frame, + self._transform_rot, + self._transform_trans, + ) + if hand_msg.poses: + self._pub_hand.publish(hand_msg) + + ee_msg = build_ee_msg_from_hands( + left_hand, + right_hand, + now, + self._world_frame, + self._transform_rot, + self._transform_trans, + ) + if ee_msg.poses: + self._pub_ee_pose.publish(ee_msg) + wrist_tfs = self._build_wrist_tfs( + ee_msg, + right_available=hand_wrist_is_valid(right_hand), + left_available=hand_wrist_is_valid(left_hand), + now=now, + ) + if wrist_tfs: + self._tf_broadcaster.sendTransform(wrist_tfs) + + def _publish_root_command(self, result: dict, now) -> None: + if self._mode not in ("hand_teleop", "controller_teleop"): + return + + root_command = result["root_command"] + if root_command.is_none: + return + + cmd = np.asarray(root_command[0]) + twist_msg = TwistStamped() + twist_msg.header.stamp = now + twist_msg.header.frame_id = self._world_frame + twist_msg.twist.linear.x = float(cmd[0]) + twist_msg.twist.linear.y = float(cmd[1]) + twist_msg.twist.linear.z = 0.0 + twist_msg.twist.angular.z = float(cmd[2]) + self._pub_root_twist.publish(twist_msg) + + pose_msg = PoseStamped() + pose_msg.header.stamp = now + pose_msg.header.frame_id = self._world_frame + pose_msg.pose.position.z = float(cmd[3]) + pose_msg.pose.orientation.w = 1.0 + self._pub_root_pose.publish(pose_msg) + def run(self) -> int: while rclpy.ok(): try: @@ -1325,162 +605,14 @@ def run(self) -> int: now = self.get_clock().now().to_msg() if self._mode == "hand_teleop": - left_hand = result["hand_left"] - right_hand = result["hand_right"] - # Build hand poses from hands - hand_msg = _build_hand_msg_from_hands( - left_hand, - right_hand, - now, - self._world_frame, - self._transform_rot, - self._transform_trans, - ) - if hand_msg.poses: - self._pub_hand.publish(hand_msg) - # Build EE poses from hands - ee_msg = _build_ee_msg_from_hands( - left_hand, - right_hand, - now, - self._world_frame, - self._transform_rot, - self._transform_trans, - ) - if ee_msg.poses: - self._pub_ee_pose.publish(ee_msg) - wrist_tfs = self._build_wrist_tfs( - ee_msg, - right_available=_hand_joint_is_valid( - right_hand, HandJointIndex.WRIST - ), - left_available=_hand_joint_is_valid( - left_hand, HandJointIndex.WRIST - ), - now=now, - ) - if wrist_tfs: - self._tf_broadcaster.sendTransform(wrist_tfs) + self._publish_hand_tracking_outputs(result, now) elif self._mode == "controller_teleop": - left_ctrl = result["controller_left"] - right_ctrl = result["controller_right"] - # Build EE poses from controllers - ee_msg = _build_ee_msg_from_controllers( - left_ctrl, - right_ctrl, - now, - self._world_frame, - self._transform_rot, - self._transform_trans, - self._controller_uses_hands_source, - ) - if ee_msg.poses: - self._pub_ee_pose.publish(ee_msg) - wrist_tfs = self._build_wrist_tfs( - ee_msg, - right_available=_controller_aim_is_valid(right_ctrl), - left_available=_controller_aim_is_valid(left_ctrl), - now=now, - ) - if wrist_tfs: - self._tf_broadcaster.sendTransform(wrist_tfs) - if self._controller_uses_hands_source: - left_hand = result["hand_left"] - right_hand = result["hand_right"] - hand_msg = _build_hand_msg_from_hands( - left_hand, - right_hand, - now, - self._world_frame, - self._transform_rot, - self._transform_trans, - ) - if hand_msg.poses: - self._pub_hand.publish(hand_msg) - - if self._mode in ("hand_teleop", "controller_teleop"): - root_command = result["root_command"] - if not root_command.is_none: - cmd = np.asarray(root_command[0]) - twist_msg = TwistStamped() - twist_msg.header.stamp = now - twist_msg.header.frame_id = self._world_frame - twist_msg.twist.linear.x = float(cmd[0]) - twist_msg.twist.linear.y = float(cmd[1]) - twist_msg.twist.linear.z = 0.0 - twist_msg.twist.angular.z = float(cmd[2]) - self._pub_root_twist.publish(twist_msg) - - pose_msg = PoseStamped() - pose_msg.header.stamp = now - pose_msg.header.frame_id = self._world_frame - pose_msg.pose.position.z = float(cmd[3]) - pose_msg.pose.orientation.w = 1.0 - self._pub_root_pose.publish(pose_msg) - - if self._mode in ("controller_teleop", "hand_teleop"): - left_joints = result["finger_joints_left"] - right_joints = result["finger_joints_right"] - if not left_joints.is_none or not right_joints.is_none: - finger_joints_msg = JointState() - finger_joints_msg.header.stamp = now - finger_joints_msg.header.frame_id = self._world_frame - left_arr = ( - np.asarray(list(left_joints), dtype=np.float32) - if not left_joints.is_none - else np.array([], dtype=np.float32) - ) - right_arr = ( - np.asarray(list(right_joints), dtype=np.float32) - if not right_joints.is_none - else np.array([], dtype=np.float32) - ) - finger_joints_msg.name = ( - _joint_names_from_group_type(left_joints.group_type) - if not left_joints.is_none - else [] - ) + ( - _joint_names_from_group_type( - right_joints.group_type - ) - if not right_joints.is_none - else [] - ) - finger_joints_msg.position = np.concatenate( - [left_arr, right_arr] - ).tolist() - self._pub_finger_joints.publish(finger_joints_msg) - - if self._mode in ( - "controller_raw", - "controller_teleop", - "full_body", - ): - left_ctrl = result["controller_left"] - right_ctrl = result["controller_right"] - if not left_ctrl.is_none or not right_ctrl.is_none: - controller_payload = _build_controller_payload( - left_ctrl, right_ctrl - ) - payload = msgpack.packb( - controller_payload, default=mnp.encode - ) - payload = tuple(bytes([a]) for a in payload) - controller_msg = ByteMultiArray() - controller_msg.data = payload - self._pub_controller.publish(controller_msg) + self._publish_controller_outputs(result, now) - if self._mode == "full_body": - full_body_data = result["full_body"] - if not full_body_data.is_none: - body_payload = _build_full_body_payload(full_body_data) - payload = msgpack.packb( - body_payload, default=mnp.encode - ) - payload = tuple(bytes([a]) for a in payload) - body_msg = ByteMultiArray() - body_msg.data = payload - self._pub_full_body.publish(body_msg) + self._publish_root_command(result, now) + self._publish_finger_joints(result, now) + self._publish_controller_payload(result) + self._publish_full_body_payload(result) time.sleep(self._sleep_period_s) except RuntimeError as e: