diff --git a/.github/workflows/build-ubuntu.yml b/.github/workflows/build-ubuntu.yml index b112a371d..209b56a03 100644 --- a/.github/workflows/build-ubuntu.yml +++ b/.github/workflows/build-ubuntu.yml @@ -491,98 +491,42 @@ jobs: -t teleop_ros2_ref:${{ env.ROS_DISTRO }} . - name: Verify application modes - env: - ACCEPT_CLOUDXR_EULA: Y - CXR_BUILD_CONTEXT: ${{ github.workspace }} - CXR_RUNTIME_NETWORK_MODE: bridge - PYTHON_VERSION: ${{ matrix.python_version }} - ISAACTELEOP_PIP_DEBUG: "0" - ISAACTELEOP_PIP_FIND_LINKS: /workspace/install/wheels - ISAACTELEOP_PIP_SPEC: isaacteleop[cloudxr] run: | - source scripts/setup_cloudxr_env.sh + set -euo pipefail PROJECT_NAME="isaacteleop-test-${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${{ matrix.arch }}-${{ env.ROS_DISTRO }}" LOG_FILE_BASE="/tmp/ros2_test_output_${GITHUB_RUN_ID}_${GITHUB_RUN_ATTEMPT}_${{ matrix.arch }}_${{ env.ROS_DISTRO }}" + REPLAY_MCAP="/tmp/teleop_ros2_test_input_${GITHUB_RUN_ID}_${GITHUB_RUN_ATTEMPT}_${{ matrix.arch }}_${{ env.ROS_DISTRO }}.mcap" STARTUP_MARKER="TeleopSession started successfully" READINESS_TIMEOUT_SEC=30 READINESS_POLL_INTERVAL_SEC=1 # Cleanup - trap 'docker compose -p "$PROJECT_NAME" \ - --env-file "$ENV_DEFAULT" \ - ${ENV_LOCAL:+--env-file "$ENV_LOCAL"} \ - -f deps/cloudxr/docker-compose.runtime.yaml \ - -f deps/cloudxr/docker-compose.test.yaml \ - -f deps/cloudxr/docker-compose.override.yaml \ - down -v' EXIT - - # Start CloudXR runtime - cat < deps/cloudxr/docker-compose.override.yaml - services: - cloudxr-runtime: - container_name: "cloudxr-runtime-$PROJECT_NAME" - EOF - - docker compose -p "$PROJECT_NAME" \ - --env-file "$ENV_DEFAULT" \ - ${ENV_LOCAL:+--env-file "$ENV_LOCAL"} \ - -f deps/cloudxr/docker-compose.runtime.yaml \ - -f deps/cloudxr/docker-compose.test.yaml \ - -f deps/cloudxr/docker-compose.override.yaml \ - up --build -d cloudxr-runtime - - # Wait for healthy - healthy=false - for i in {1..30}; do - if docker compose -p "$PROJECT_NAME" \ - --env-file "$ENV_DEFAULT" \ - ${ENV_LOCAL:+--env-file "$ENV_LOCAL"} \ - -f deps/cloudxr/docker-compose.runtime.yaml \ - -f deps/cloudxr/docker-compose.test.yaml \ - -f deps/cloudxr/docker-compose.override.yaml \ - ps cloudxr-runtime | grep -q "healthy"; then - healthy=true - break - fi - sleep 2 - done + trap 'docker ps -aq --filter "name=teleop-ros2-$PROJECT_NAME" | xargs -r docker rm -f >/dev/null 2>&1 || true; rm -f "$REPLAY_MCAP"' EXIT - if [ "$healthy" = false ]; then - echo "Error: CloudXR runtime failed to become healthy" - docker compose -p "$PROJECT_NAME" \ - --env-file "$ENV_DEFAULT" \ - ${ENV_LOCAL:+--env-file "$ENV_LOCAL"} \ - -f deps/cloudxr/docker-compose.runtime.yaml \ - -f deps/cloudxr/docker-compose.test.yaml \ - -f deps/cloudxr/docker-compose.override.yaml \ - logs cloudxr-runtime - exit 1 - fi + docker run --rm \ + --user "$(id -u):$(id -g)" \ + -v /tmp:/tmp \ + --entrypoint bash teleop_ros2_ref:${{ env.ROS_DISTRO }} -c \ + "source /usr/local/bin/teleop-env-setup && exec /opt/isaacteleop/install/examples/teleop_ros2/cpp/integration_tests/teleop_ros2_mcap_generator '$REPLAY_MCAP'" - # Run tests for mode in controller_teleop hand_teleop controller_raw full_body; do echo "Testing mode: $mode" LOG_FILE="${LOG_FILE_BASE}_${mode}.log" RUN_NAME="teleop-ros2-${PROJECT_NAME}-${mode//_/-}" + verifier_rc=0 docker_rc=0 started=false rm -f "$LOG_FILE" - # This test overrides the image entrypoint, so keep uv's runtime - # resolver pointed at the locally built arm64 nlopt wheel. docker run -d \ --name "$RUN_NAME" \ - --gpus all \ --network host \ - -v "$CXR_HOST_VOLUME_PATH:$CXR_HOST_VOLUME_PATH" \ -v /tmp:/tmp \ - -e NV_CXR_RUNTIME_DIR="$CXR_HOST_VOLUME_PATH/.cloudxr/run" \ - -e XR_RUNTIME_JSON="$CXR_HOST_VOLUME_PATH/.cloudxr/openxr_cloudxr.json" \ -e PYTHONUNBUFFERED=1 \ --entrypoint bash teleop_ros2_ref:${{ env.ROS_DISTRO }} -c \ - "source /usr/local/bin/teleop-env-setup && exec uv run --find-links=/opt/nlopt-wheels teleop_ros2_node.py --ros-args -p mode:=$mode -p use_mock_operators:=true" >/dev/null + "source /usr/local/bin/teleop-env-setup && exec uv run --no-sync teleop_ros2_node.py --ros-args -p mode:=$mode -p mcap_replay_path:=$REPLAY_MCAP" >/dev/null deadline=$((SECONDS + READINESS_TIMEOUT_SEC)) while [ "$SECONDS" -lt "$deadline" ]; do @@ -605,6 +549,15 @@ jobs: docker logs "$RUN_NAME" > "$LOG_FILE" 2>&1 || true if [ "$started" = true ]; then + if docker exec \ + -e PYTHONUNBUFFERED=1 \ + "$RUN_NAME" bash -c \ + "source /usr/local/bin/teleop-env-setup && exec uv run --no-sync integration_tests/teleop_ros2_topic_verifier.py --mode $mode"; then + verifier_rc=0 + else + verifier_rc=$? + fi + docker logs "$RUN_NAME" > "$LOG_FILE" 2>&1 || true docker stop --time 5 "$RUN_NAME" >/dev/null || true docker wait "$RUN_NAME" >/dev/null || true else @@ -629,6 +582,10 @@ jobs: if [ "$started" != true ]; then exit 1 fi + if [ "$verifier_rc" -ne 0 ]; then + echo "Error: topic verifier failed in mode $mode (exit code $verifier_rc)" + exit "$verifier_rc" + fi done # These are the actual dependency jobs for publish-wheel. publish-wheel diff --git a/examples/teleop_ros2/AGENTS.md b/examples/teleop_ros2/AGENTS.md index ad1c610f7..a1c3939a0 100644 --- a/examples/teleop_ros2/AGENTS.md +++ b/examples/teleop_ros2/AGENTS.md @@ -12,3 +12,4 @@ SPDX-License-Identifier: Apache-2.0 ## Python Node Layout - In `python/teleop_ros2_node.py`, preserve the existing grouped/sorted organization for global non-member helpers and `TeleopRos2Node` member functions: scan the surrounding order before inserting, and do not place helpers near call sites when the existing section is sorted. +- In Python integration test verifier code, do not use bare `assert` for runtime validation; Python optimization can disable it, so raise explicit exceptions from validators. diff --git a/examples/teleop_ros2/CMakeLists.txt b/examples/teleop_ros2/CMakeLists.txt index 5fed30276..29dc30328 100644 --- a/examples/teleop_ros2/CMakeLists.txt +++ b/examples/teleop_ros2/CMakeLists.txt @@ -5,6 +5,9 @@ cmake_minimum_required(VERSION 3.20) project(teleop_ros2_examples) include(${CMAKE_SOURCE_DIR}/cmake/InstallPythonExample.cmake) + +add_subdirectory(cpp/integration_tests) + install_python_example(DESTINATION examples/teleop_ros2/python EXTRA_UV_EXTRA_BUILD_DEPS "nlopt = [\"numpy\"]") diff --git a/examples/teleop_ros2/README.md b/examples/teleop_ros2/README.md index 6ab8f1575..53cfd92d1 100644 --- a/examples/teleop_ros2/README.md +++ b/examples/teleop_ros2/README.md @@ -154,6 +154,24 @@ The `mode` parameter selects the teleoperation scenario and which topics are pub Example: `--ros-args -p mode:=controller_raw` +### MCAP Replay + +Set `mcap_replay_path` to run the same ROS 2 publisher from recorded DeviceIO +tracker data instead of live OpenXR/DeviceIO inputs: + +```bash +docker run --rm --net=host --ipc=host \ + -v /tmp:/tmp \ + --name teleop_ros2_ref \ + teleop_ros2_ref --ros-args -p mode:=controller_raw \ + -p mcap_replay_path:=/tmp/teleop_ros2_input.mcap +``` + +The installed integration test utility +`examples/teleop_ros2/cpp/integration_tests/teleop_ros2_mcap_generator` creates a +deterministic fixture with controller, hand, pedal, and full-body samples for CI +coverage. + ## Echo Topics ```bash diff --git a/examples/teleop_ros2/cpp/integration_tests/CMakeLists.txt b/examples/teleop_ros2/cpp/integration_tests/CMakeLists.txt new file mode 100644 index 000000000..069855491 --- /dev/null +++ b/examples/teleop_ros2/cpp/integration_tests/CMakeLists.txt @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20) + +add_executable(teleop_ros2_mcap_generator + mcap_generator.cpp +) + +target_link_libraries(teleop_ros2_mcap_generator + PRIVATE + deviceio::deviceio_session + mcap::mcap_core +) + +install(TARGETS teleop_ros2_mcap_generator + RUNTIME DESTINATION examples/teleop_ros2/cpp/integration_tests +) diff --git a/examples/teleop_ros2/cpp/integration_tests/mcap_generator.cpp b/examples/teleop_ros2/cpp/integration_tests/mcap_generator.cpp new file mode 100644 index 000000000..9a0e78665 --- /dev/null +++ b/examples/teleop_ros2/cpp/integration_tests/mcap_generator.cpp @@ -0,0 +1,188 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace +{ + +using ControllerChannels = core::McapTrackerChannels; +using HandChannels = core::McapTrackerChannels; +using PedalChannels = core::McapTrackerChannels; +using FullBodyChannels = core::McapTrackerChannels; + +constexpr int kDefaultFrameCount = 1800; +constexpr int64_t kFramePeriodNs = 16'666'667; + +std::vector to_strings(auto channels) +{ + std::vector result; + result.reserve(channels.size()); + for (std::string_view channel : channels) + { + result.emplace_back(channel); + } + return result; +} + +std::shared_ptr make_controller_sample(bool left, int frame) +{ + const float side = left ? -1.0f : 1.0f; + const float delta = 0.001f * static_cast(frame); + auto sample = std::make_shared(); + sample->grip_pose = std::make_shared( + core::Pose(core::Point(0.15f * side, 0.10f + delta, 1.10f), core::Quaternion(0.0f, 0.0f, 0.0f, 1.0f)), true); + sample->aim_pose = std::make_shared( + core::Pose(core::Point(0.20f * side, 0.15f + delta, 1.20f), core::Quaternion(0.0f, 0.0f, 0.0f, 1.0f)), true); + sample->inputs = std::make_shared( + true, !left, false, left, 0.25f * side, left ? 0.40f : -0.40f, 0.55f, 0.70f); + return sample; +} + +std::shared_ptr make_hand_sample(bool left, int frame) +{ + const float side = left ? -1.0f : 1.0f; + const float delta = 0.0005f * static_cast(frame); + auto sample = std::make_shared(); + sample->joints = std::make_unique(); + for (int joint = 0; joint < core::HandJoint_NUM_JOINTS; ++joint) + { + const float joint_f = static_cast(joint); + const float x = 0.05f * side + 0.003f * side * joint_f; + const float y = 0.03f + 0.006f * joint_f + delta; + const float z = 1.00f + 0.004f * joint_f; + const core::Point position(x, y, z); + const core::Pose pose(position, core::Quaternion(0.0f, 0.0f, 0.0f, 1.0f)); + sample->joints->mutable_poses()->Mutate(joint, core::HandJointPose(pose, true, 0.010f)); + } + return sample; +} + +std::shared_ptr make_pedal_sample(int frame) +{ + auto sample = std::make_shared(); + sample->left_pedal = 0.20f; + sample->right_pedal = 0.80f; + sample->rudder = (frame % 2 == 0) ? 0.15f : -0.15f; + return sample; +} + +std::shared_ptr make_full_body_sample(int frame) +{ + const float delta = 0.0005f * static_cast(frame); + auto sample = std::make_shared(); + sample->joints = std::make_unique(); + for (int joint = 0; joint < core::BodyJointPico_NUM_JOINTS; ++joint) + { + const float joint_f = static_cast(joint); + const float x = 0.01f * joint_f; + const float y = -0.02f + 0.002f * joint_f + delta; + const float z = 0.80f + 0.01f * joint_f; + const core::Point position(x, y, z); + const core::Pose pose(position, core::Quaternion(0.0f, 0.0f, 0.0f, 1.0f)); + sample->joints->mutable_joints()->Mutate(joint, core::BodyJointPose(pose, true)); + } + return sample; +} + +std::unique_ptr open_writer(const std::filesystem::path& path) +{ + if (path.has_parent_path()) + { + std::filesystem::create_directories(path.parent_path()); + } + + auto writer = std::make_unique(); + mcap::McapWriterOptions options("teleop-ros2-integration-test"); + options.compression = mcap::Compression::None; + const auto status = writer->open(path.string(), options); + if (!status.ok()) + { + throw std::runtime_error("failed to open MCAP writer for " + path.string() + ": " + status.message); + } + return writer; +} + +void write_fixture(const std::filesystem::path& output_path, int frame_count) +{ + auto writer = open_writer(output_path); + const auto controller_names = to_strings(core::ControllerRecordingTraits::recording_channels); + const auto hand_names = to_strings(core::HandRecordingTraits::recording_channels); + const auto pedal_names = to_strings(core::PedalRecordingTraits::recording_channels); + const auto full_body_names = to_strings(core::FullBodyPicoRecordingTraits::recording_channels); + + ControllerChannels controller_channels( + *writer, "controllers", core::ControllerRecordingTraits::schema_name, controller_names); + HandChannels hand_channels(*writer, "hands", core::HandRecordingTraits::schema_name, hand_names); + PedalChannels pedal_channels(*writer, "pedals", core::PedalRecordingTraits::schema_name, pedal_names); + FullBodyChannels full_body_channels( + *writer, "full_body", core::FullBodyPicoRecordingTraits::schema_name, full_body_names); + + for (int frame = 0; frame < frame_count; ++frame) + { + const int64_t time_ns = static_cast(frame + 1) * kFramePeriodNs; + const core::DeviceDataTimestamp timestamp(time_ns, time_ns, time_ns); + controller_channels.write(0, timestamp, make_controller_sample(true, frame)); + controller_channels.write(1, timestamp, make_controller_sample(false, frame)); + hand_channels.write(0, timestamp, make_hand_sample(true, frame)); + hand_channels.write(1, timestamp, make_hand_sample(false, frame)); + pedal_channels.write(0, timestamp, make_pedal_sample(frame)); + full_body_channels.write(0, timestamp, make_full_body_sample(frame)); + } + + writer->close(); +} + +int parse_frame_count(const char* value) +{ + const int frame_count = std::stoi(value); + if (frame_count <= 0) + { + throw std::invalid_argument("frame count must be positive"); + } + return frame_count; +} + +} // namespace + +int main(int argc, char** argv) +try +{ + if (argc < 2 || argc > 3) + { + std::cerr << "Usage: " << argv[0] << " [frame_count]\n"; + return 2; + } + + const std::filesystem::path output_path(argv[1]); + const int frame_count = argc == 3 ? parse_frame_count(argv[2]) : kDefaultFrameCount; + write_fixture(output_path, frame_count); + std::cout << "Wrote " << frame_count << " teleop ROS 2 replay frames to " << output_path << "\n"; + return 0; +} +catch (const std::exception& e) +{ + std::cerr << argv[0] << ": " << e.what() << "\n"; + return 1; +} +catch (...) +{ + std::cerr << argv[0] << ": Unknown error occurred\n"; + return 1; +} diff --git a/examples/teleop_ros2/python/integration_tests/teleop_ros2_topic_verifier.py b/examples/teleop_ros2/python/integration_tests/teleop_ros2_topic_verifier.py new file mode 100755 index 000000000..2695a65ed --- /dev/null +++ b/examples/teleop_ros2/python/integration_tests/teleop_ros2_topic_verifier.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Verify teleop_ros2_node.py publishes expected topics for one mode.""" + +from __future__ import annotations + +import argparse +import math +import sys +import time +from typing import Callable + +import msgpack +import rclpy +from geometry_msgs.msg import PoseArray, PoseStamped, TwistStamped +from rclpy.node import Node +from sensor_msgs.msg import JointState +from std_msgs.msg import ByteMultiArray +from tf2_msgs.msg import TFMessage + + +_MODES = ("controller_teleop", "hand_teleop", "controller_raw", "full_body") +_EXPECTED_TF_FRAMES = {"right_wrist", "left_wrist"} + + +def _is_finite_sequence(values) -> bool: + return all(math.isfinite(float(value)) for value in values) + + +def _byte_multi_array_to_bytes(msg: ByteMultiArray) -> bytes: + payload = bytearray() + for value in msg.data: + if isinstance(value, bytes): + payload.extend(value) + else: + payload.append((int(value) + 256) % 256) + return bytes(payload) + + +def _unpack_msgpack(msg: ByteMultiArray) -> dict: + return msgpack.unpackb( + _byte_multi_array_to_bytes(msg), raw=False, strict_map_key=False + ) + + +def _assert_pose_array(msg: PoseArray, *, expected_count: int) -> None: + if msg.header.frame_id != "world": + raise ValueError(f"unexpected frame_id {msg.header.frame_id!r}") + if len(msg.poses) != expected_count: + raise ValueError(f"expected {expected_count} poses, got {len(msg.poses)}") + positions = [] + for pose in msg.poses: + position = (pose.position.x, pose.position.y, pose.position.z) + orientation = ( + pose.orientation.x, + pose.orientation.y, + pose.orientation.z, + pose.orientation.w, + ) + if not _is_finite_sequence(position): + raise ValueError("pose position contains non-finite values") + if not _is_finite_sequence(orientation): + raise ValueError("pose orientation contains non-finite values") + positions.extend(position) + if not any(abs(value) > 1e-6 for value in positions): + raise ValueError("pose array positions are all zero") + + +def _assert_joint_state(msg: JointState) -> None: + if msg.header.frame_id != "world": + raise ValueError(f"unexpected frame_id {msg.header.frame_id!r}") + if not msg.name: + raise ValueError("JointState names are empty") + if len(msg.position) != len(msg.name): + raise ValueError("JointState names and positions differ in length") + if not _is_finite_sequence(msg.position): + raise ValueError("JointState positions contain non-finite values") + + +def _assert_twist(msg: TwistStamped) -> None: + if msg.header.frame_id != "world": + raise ValueError(f"unexpected frame_id {msg.header.frame_id!r}") + values = ( + msg.twist.linear.x, + msg.twist.linear.y, + msg.twist.linear.z, + msg.twist.angular.x, + msg.twist.angular.y, + msg.twist.angular.z, + ) + if not _is_finite_sequence(values): + raise ValueError("TwistStamped contains non-finite values") + + +def _assert_root_pose(msg: PoseStamped) -> None: + if msg.header.frame_id != "world": + raise ValueError(f"unexpected frame_id {msg.header.frame_id!r}") + position = (msg.pose.position.x, msg.pose.position.y, msg.pose.position.z) + orientation = ( + msg.pose.orientation.x, + msg.pose.orientation.y, + msg.pose.orientation.z, + msg.pose.orientation.w, + ) + if not _is_finite_sequence(position): + raise ValueError("PoseStamped position contains non-finite values") + if not _is_finite_sequence(orientation): + raise ValueError("PoseStamped orientation contains non-finite values") + + +def _assert_controller_payload(msg: ByteMultiArray) -> None: + payload = _unpack_msgpack(msg) + for side in ("left", "right"): + if payload[f"{side}_is_active"] is not True: + raise ValueError(f"{side} controller is not active") + for field in ("aim_position", "grip_position"): + values = payload[f"{side}_{field}"] + if len(values) != 3: + raise ValueError(f"{side}_{field} must have 3 values") + if not _is_finite_sequence(values): + raise ValueError(f"{side}_{field} contains non-finite values") + if not any(abs(float(value)) > 1e-6 for value in values): + raise ValueError(f"{side}_{field} is all zero") + for field in ("aim_orientation", "grip_orientation"): + values = payload[f"{side}_{field}"] + if len(values) != 4: + raise ValueError(f"{side}_{field} must have 4 values") + if not _is_finite_sequence(values): + raise ValueError(f"{side}_{field} contains non-finite values") + thumbstick = payload[f"{side}_thumbstick"] + if len(thumbstick) != 2: + raise ValueError(f"{side}_thumbstick must have 2 values") + if not _is_finite_sequence(thumbstick): + raise ValueError(f"{side}_thumbstick contains non-finite values") + if not 0.0 <= float(payload[f"{side}_trigger_value"]) <= 1.0: + raise ValueError(f"{side}_trigger_value must be in [0, 1]") + if not 0.0 <= float(payload[f"{side}_squeeze_value"]) <= 1.0: + raise ValueError(f"{side}_squeeze_value must be in [0, 1]") + + +def _assert_full_body_payload(msg: ByteMultiArray) -> None: + payload = _unpack_msgpack(msg) + if len(payload["joint_names"]) != 24: + raise ValueError("expected 24 full-body joint names") + if len(payload["joint_positions"]) != 24: + raise ValueError("expected 24 full-body joint positions") + if len(payload["joint_orientations"]) != 24: + raise ValueError("expected 24 full-body joint orientations") + if len(payload["joint_valid"]) != 24: + raise ValueError("expected 24 full-body joint valid flags") + if not any(bool(value) for value in payload["joint_valid"]): + raise ValueError("all full-body joints are invalid") + for values in payload["joint_positions"]: + if len(values) != 3: + raise ValueError("full-body joint position must have 3 values") + if not _is_finite_sequence(values): + raise ValueError("full-body joint position contains non-finite values") + for values in payload["joint_orientations"]: + if len(values) != 4: + raise ValueError("full-body joint orientation must have 4 values") + if not _is_finite_sequence(values): + raise ValueError("full-body joint orientation contains non-finite values") + + +class TopicVerifier(Node): + """Small ROS 2 node that waits for mode-specific verified messages.""" + + def __init__(self, mode: str) -> None: + super().__init__("teleop_ros2_topic_verifier") + self._pending: set[str] = set() + self._errors: dict[str, str] = {} + self._seen_tf_frames: set[str] = set() + + for name, topic, msg_type, validator in self._expected_subscriptions(mode): + self._pending.add(name) + self.create_subscription( + msg_type, topic, self._make_callback(name, validator), 10 + ) + + if mode in ("controller_teleop", "hand_teleop"): + self._pending.add("tf") + self.create_subscription(TFMessage, "/tf", self._tf_callback, 100) + + @property + def pending(self) -> set[str]: + return set(self._pending) + + @property + def errors(self) -> dict[str, str]: + return dict(self._errors) + + def _make_callback(self, name: str, validator: Callable) -> Callable: + def _callback(msg) -> None: + if name not in self._pending: + return + try: + validator(msg) + except ValueError as exc: + self._errors[name] = str(exc) + return + self._errors.pop(name, None) + self._pending.discard(name) + + return _callback + + def _tf_callback(self, msg: TFMessage) -> None: + if "tf" not in self._pending: + return + for transform in msg.transforms: + if transform.header.frame_id == "world": + self._seen_tf_frames.add(transform.child_frame_id) + missing = _EXPECTED_TF_FRAMES - self._seen_tf_frames + if missing: + self._errors["tf"] = f"missing TF frames: {sorted(missing)}" + return + self._errors.pop("tf", None) + self._pending.discard("tf") + + def _expected_subscriptions( + self, mode: str + ) -> list[tuple[str, str, type, Callable]]: + if mode == "controller_teleop": + return [ + ( + "ee_poses", + "xr_teleop/ee_poses", + PoseArray, + lambda msg: _assert_pose_array(msg, expected_count=2), + ), + ("root_twist", "xr_teleop/root_twist", TwistStamped, _assert_twist), + ("root_pose", "xr_teleop/root_pose", PoseStamped, _assert_root_pose), + ( + "finger_joints", + "xr_teleop/finger_joints", + JointState, + _assert_joint_state, + ), + ( + "controller_data", + "xr_teleop/controller_data", + ByteMultiArray, + _assert_controller_payload, + ), + ] + if mode == "hand_teleop": + return [ + ( + "hand", + "xr_teleop/hand", + PoseArray, + lambda msg: _assert_pose_array(msg, expected_count=48), + ), + ( + "ee_poses", + "xr_teleop/ee_poses", + PoseArray, + lambda msg: _assert_pose_array(msg, expected_count=2), + ), + ("root_twist", "xr_teleop/root_twist", TwistStamped, _assert_twist), + ("root_pose", "xr_teleop/root_pose", PoseStamped, _assert_root_pose), + ( + "finger_joints", + "xr_teleop/finger_joints", + JointState, + _assert_joint_state, + ), + ] + if mode == "controller_raw": + return [ + ( + "controller_data", + "xr_teleop/controller_data", + ByteMultiArray, + _assert_controller_payload, + ), + ] + if mode == "full_body": + return [ + ( + "full_body", + "xr_teleop/full_body", + ByteMultiArray, + _assert_full_body_payload, + ), + ( + "controller_data", + "xr_teleop/controller_data", + ByteMultiArray, + _assert_controller_payload, + ), + ] + raise ValueError(f"Unsupported mode: {mode}") + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--mode", choices=_MODES, required=True) + parser.add_argument("--timeout", type=float, default=20.0) + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + rclpy.init() + verifier = TopicVerifier(args.mode) + try: + deadline = time.monotonic() + args.timeout + while verifier.pending and time.monotonic() < deadline: + rclpy.spin_once(verifier, timeout_sec=0.1) + + if verifier.pending: + print( + f"Timed out waiting for verified {args.mode} topics: {sorted(verifier.pending)}", + file=sys.stderr, + ) + for name, error in sorted(verifier.errors.items()): + print(f" {name}: {error}", file=sys.stderr) + return 1 + + print(f"Verified teleop_ros2 topics for mode {args.mode}") + return 0 + finally: + verifier.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/teleop_ros2/python/teleop_ros2_node.py b/examples/teleop_ros2/python/teleop_ros2_node.py index 97aefae4a..776bfbee5 100755 --- a/examples/teleop_ros2/python/teleop_ros2_node.py +++ b/examples/teleop_ros2/python/teleop_ros2_node.py @@ -35,7 +35,6 @@ """ import math -import os import time from pathlib import Path from typing import Dict, List, Sequence, Union @@ -59,6 +58,7 @@ from std_msgs.msg import ByteMultiArray from tf2_ros import TransformBroadcaster +from isaacteleop.deviceio import McapReplayConfig from isaacteleop.retargeting_engine.deviceio_source_nodes import ( ControllersSource, FullBodySource, @@ -87,7 +87,7 @@ HandJointIndex, ) from isaacteleop.teleop_session_manager import ( - PluginConfig, + SessionMode, TeleopSession, TeleopSessionConfig, ) @@ -271,44 +271,11 @@ def _apply_transform_to_pose( return result -def _build_plugins( - use_mock_operators: bool, - *, - include_synthetic_hands: bool, - search_start: Path, -) -> List[PluginConfig]: - if not use_mock_operators or not include_synthetic_hands: - return [] - - plugin_paths = [] - env_paths = os.environ.get("ISAAC_TELEOP_PLUGIN_PATH") - if env_paths: - plugin_paths.extend([Path(p) for p in env_paths.split(os.pathsep) if p]) - plugin_paths.extend(_find_plugins_dirs(search_start)) - - return [ - PluginConfig( - plugin_name="controller_synthetic_hands", - plugin_root_id="synthetic_hands", - search_paths=plugin_paths, - ) - ] - - def _controller_aim_is_valid(ctrl: OptionalTensorGroup) -> bool: # DeviceIO's AIM_IS_VALID flag is the usability contract for aim poses. return not ctrl.is_none and bool(ctrl[ControllerInputIndex.AIM_IS_VALID]) -def _find_plugins_dirs(start: Path) -> List[Path]: - candidates: List[Path] = [] - for parent in [start, *start.parents]: - plugin_dir = parent / "plugins" - if plugin_dir.is_dir() and plugin_dir not in candidates: - candidates.append(plugin_dir) - return candidates - - def _hand_joint_is_valid(hand: OptionalTensorGroup, joint_idx: HandJointIndex) -> bool: if hand.is_none: return False @@ -727,7 +694,6 @@ def __init__(self) -> None: self.declare_parameter("mode", "controller_teleop") self.declare_parameter("rate_hz", 60.0) - self.declare_parameter("use_mock_operators", value=False) self.declare_parameter( "pedal_collection_id", DEFAULT_PEDAL_COLLECTION_ID, @@ -761,6 +727,17 @@ def __init__(self) -> None: ), ), ) + self.declare_parameter( + "mcap_replay_path", + "", + ParameterDescriptor( + type=ParameterType.PARAMETER_STRING, + description=( + "Optional MCAP file to replay through TeleopSession instead " + "of connecting to live OpenXR/DeviceIO inputs." + ), + ), + ) self.declare_parameter( "transform_translation", @@ -845,9 +822,6 @@ def __init__(self) -> None: if rate_hz <= 0 or not math.isfinite(rate_hz): raise ValueError("Parameter 'rate_hz' must be > 0") self._sleep_period_s: float = 1.0 / rate_hz - self._use_mock_operators: bool = ( - self.get_parameter("use_mock_operators").get_parameter_value().bool_value - ) mode = self.get_parameter("mode").get_parameter_value().string_value if mode not in _TELEOP_MODES: raise ValueError( @@ -880,6 +854,22 @@ def __init__(self) -> None: self.get_parameter("config_asset_root").get_parameter_value().string_value ) self.get_logger().info(f"Config/asset root: {self._config_asset_root}") + mcap_replay_path = ( + self.get_parameter("mcap_replay_path") + .get_parameter_value() + .string_value.strip() + ) + self._session_mode: SessionMode = SessionMode.LIVE + self._mcap_config: McapReplayConfig | None = None + if mcap_replay_path: + replay_path = Path(mcap_replay_path).expanduser().resolve() + if not replay_path.is_file(): + raise FileNotFoundError( + f"mcap_replay_path file not found: {replay_path}" + ) + self._session_mode = SessionMode.REPLAY + self._mcap_config = McapReplayConfig(str(replay_path)) + self.get_logger().info(f"Replaying MCAP input: {replay_path}") self._pedal_collection_id: str = ( self.get_parameter("pedal_collection_id").get_parameter_value().string_value @@ -998,11 +988,8 @@ def _build_controller_raw_config(self) -> TeleopSessionConfig: return TeleopSessionConfig( app_name="TeleopRos2Publisher", pipeline=pipeline, - plugins=_build_plugins( - self._use_mock_operators, - include_synthetic_hands=False, - search_start=Path(__file__).resolve(), - ), + mode=self._session_mode, + mcap_config=self._mcap_config, ) def _build_controller_teleop_config(self) -> TeleopSessionConfig: @@ -1104,11 +1091,8 @@ def _build_controller_teleop_config(self) -> TeleopSessionConfig: return TeleopSessionConfig( app_name="TeleopRos2Publisher", pipeline=pipeline, - plugins=_build_plugins( - self._use_mock_operators, - include_synthetic_hands=self._controller_uses_hands_source, - search_start=Path(__file__).resolve(), - ), + mode=self._session_mode, + mcap_config=self._mcap_config, ) def _build_full_body_config(self) -> TeleopSessionConfig: @@ -1125,11 +1109,8 @@ def _build_full_body_config(self) -> TeleopSessionConfig: return TeleopSessionConfig( app_name="TeleopRos2Publisher", pipeline=pipeline, - plugins=_build_plugins( - self._use_mock_operators, - include_synthetic_hands=False, - search_start=Path(__file__).resolve(), - ), + mode=self._session_mode, + mcap_config=self._mcap_config, ) def _build_hand_teleop_config(self) -> TeleopSessionConfig: @@ -1170,11 +1151,8 @@ def _build_hand_teleop_config(self) -> TeleopSessionConfig: return TeleopSessionConfig( app_name="TeleopRos2Publisher", pipeline=pipeline, - plugins=_build_plugins( - self._use_mock_operators, - include_synthetic_hands=True, - search_start=Path(__file__).resolve(), - ), + mode=self._session_mode, + mcap_config=self._mcap_config, ) def _build_session_config(self, mode: str) -> TeleopSessionConfig: