diff --git a/limb/agents/teleoperation/yam_viser_agent.py b/limb/agents/teleoperation/yam_viser_agent.py index ea205a2..57ae2ee 100644 --- a/limb/agents/teleoperation/yam_viser_agent.py +++ b/limb/agents/teleoperation/yam_viser_agent.py @@ -7,11 +7,10 @@ import viser import viser.extras from dm_env.specs import Array -from loguru import logger from limb.agents.agent import Agent from limb.utils.portal_utils import remote -from limb.visualization.viser_monitor import ViserMonitor +from limb.visualization.panels.camera_panel import CameraPanel def _create_ik_solver(solver_name: str, ik_params: Optional[Dict[str, Any]] = None, **kwargs): @@ -43,8 +42,9 @@ def __init__( self.viser_server = viser.ViserServer() self.ik = _create_ik_solver(ik_solver, ik_params=ik_params, viser_server=self.viser_server, bimanual=bimanual) - # Shared monitor handles camera feeds + recording on the same server - self._monitor = ViserMonitor(self.viser_server) + # Camera panel handles feed thumbnails on the same server + self._camera_panel = CameraPanel() + self._camera_panel.attach(self.viser_server) self.ik_thread = threading.Thread(target=self.ik.run) self.ik_thread.start() @@ -99,8 +99,8 @@ def _update_visualization(self): def act(self, obs: Dict[str, Any]) -> Any: self.obs = deepcopy(obs) - # Feed camera images to monitor - self._monitor.update(obs) + # Feed camera images to camera panel + self._camera_panel.update(obs) action = { "left": { @@ -118,7 +118,7 @@ def act(self, obs: Dict[str, Any]) -> Any: return action def close(self) -> None: - self._monitor.close() + self._camera_panel.detach() @remote(serialization_needed=True) def action_spec(self) -> Dict[str, Dict[str, Array]]: diff --git a/limb/cli.py b/limb/cli.py new file mode 100644 index 0000000..e13da97 --- /dev/null +++ b/limb/cli.py @@ -0,0 +1,208 @@ +"""Unified CLI entry point for limb. + +Provides a single `limb` command with subcommands for all common operations: + + limb teleop — Launch teleoperation + limb record — Launch data collection session + limb devices — Discover connected hardware + limb replay — Replay a recorded episode on hardware + limb convert-lerobot — Convert raw recordings to LeRobot format + limb convert-webdataset — Convert raw recordings to WebDataset tar shards + limb visualize — Visualize a recorded episode with Rerun + limb upload — Upload dataset to S3/GCS/HuggingFace + +Usage: + uv run limb [options] +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from typing import Optional, Tuple, Union + +import tyro + +# ── Subcommands ────────────────────────────────────────────────────────── + + +@dataclass +class TeleopCommand: + """Launch teleoperation (no recording).""" + + config_path: Tuple[str, ...] = ("configs/yam_viser_bimanual.yaml",) + log_level: str = "INFO" + + def run(self) -> None: + from limb.envs.launch import Args, main + + main(Args(config_path=self.config_path, log_level=self.log_level)) + + +@dataclass +class RecordCommand: + """Launch a data collection session (teleop + recording).""" + + config_path: Tuple[str, ...] = ("configs/yam_gello_bimanual.yaml", "configs/collection.yaml") + log_level: str = "INFO" + + def run(self) -> None: + from limb.envs.launch import Args, main + + main(Args(config_path=self.config_path, log_level=self.log_level)) + + +@dataclass +class DevicesCommand: + """Discover connected cameras, robot arms, and input devices.""" + + verbose: bool = False + + def run(self) -> None: + from limb.discovery import discover_devices + + discover_devices(verbose=self.verbose) + + +@dataclass +class ReplayCommand: + """Replay a recorded episode on hardware for verification. + + Streams joint commands from a recorded episode to the physical robot. + Useful for checking recording quality before conversion. + """ + + episode_dir: str = "" + config_path: Tuple[str, ...] = ("configs/yam_gello_bimanual.yaml",) + speed: float = 1.0 + log_level: str = "INFO" + + def run(self) -> None: + from limb.replay import replay_episode + + replay_episode( + episode_dir=self.episode_dir, + config_path=[os.path.expanduser(x) for x in self.config_path], + speed=self.speed, + log_level=self.log_level, + ) + + +@dataclass +class ConvertLerobotCommand: + """Convert raw recordings to LeRobot v2.1 dataset format.""" + + input_dir: str = "" + output_dir: str = "" + task: Optional[str] = None + robot_type: str = "yam" + fps: int = 30 + success_only: bool = False + push_to_hub: Optional[str] = None + + def run(self) -> None: + from limb.data.convert_lerobot import Args, main + + main( + Args( + input_dir=self.input_dir, + output_dir=self.output_dir, + task=self.task, + robot_type=self.robot_type, + fps=self.fps, + success_only=self.success_only, + push_to_hub=self.push_to_hub, + ) + ) + + +@dataclass +class ConvertWebdatasetCommand: + """Convert raw recordings to WebDataset .tar shards for streaming training.""" + + input_dir: str = "" + output_dir: str = "" + task: Optional[str] = None + samples_per_shard: int = 1000 + image_size: Optional[int] = None + jpeg_quality: int = 90 + fps: int = 30 + success_only: bool = False + camera: Optional[str] = None + + def run(self) -> None: + from limb.data.convert_webdataset import Args, main + + main( + Args( + input_dir=self.input_dir, + output_dir=self.output_dir, + task=self.task, + samples_per_shard=self.samples_per_shard, + image_size=self.image_size, + jpeg_quality=self.jpeg_quality, + fps=self.fps, + success_only=self.success_only, + camera=self.camera, + ) + ) + + +@dataclass +class VisualizeCommand: + """Visualize a recorded episode with Rerun.""" + + episode_dir: str = "" + + def run(self) -> None: + from limb.data.visualize_episode import Args, main + + main(Args(episode_dir=self.episode_dir)) + + +@dataclass +class UploadCommand: + """Upload a dataset to cloud storage (S3, GCS, or HuggingFace Hub). + + Target URI format: + s3://bucket/prefix — Amazon S3 (uses AWS SDK credential chain) + gs://bucket/prefix — Google Cloud Storage (uses gcloud credentials) + hf://username/repo — HuggingFace Hub (uses HF_TOKEN or huggingface-cli login) + + Or configure a default in ~/.config/limb/storage.yaml + """ + + source: str = "" + target: Optional[str] = None + task: Optional[str] = None + + def run(self) -> None: + from limb.data.upload import Args, main + + main(Args(source=self.source, target=self.target, task=self.task)) + + +Command = Union[ + TeleopCommand, + RecordCommand, + DevicesCommand, + ReplayCommand, + ConvertLerobotCommand, + ConvertWebdatasetCommand, + VisualizeCommand, + UploadCommand, +] + + +def cli_main() -> None: + """Entry point for the `limb` CLI.""" + cmd = tyro.cli( + Command, + prog="limb", + description="limb — minimal, high-frequency control stack for YAM bimanual arms", + ) + cmd.run() + + +if __name__ == "__main__": + cli_main() diff --git a/limb/data/__init__.py b/limb/data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/limb/data/convert_lerobot.py b/limb/data/convert_lerobot.py new file mode 100644 index 0000000..2ba4330 --- /dev/null +++ b/limb/data/convert_lerobot.py @@ -0,0 +1,227 @@ +"""Convert limb raw recordings to LeRobot v2.1 dataset format. + +No lerobot dependency required — only uses pyarrow and standard lib. + +Usage: + uv run limb convert-lerobot --input-dir recordings/task --output-dir datasets/task + uv run scripts/data/convert_to_lerobot.py --input-dir recordings/task --output-dir datasets/task + +LeRobot v2.1 output structure:: + + datasets/task/ + meta/ + info.json + stats.json + episodes.jsonl + tasks.jsonl + data/ + chunk-000/ + episode_000000.parquet + videos/ + observation.images.left_wrist_camera/ + episode_000000.mp4 +""" + +from __future__ import annotations + +import json +import shutil +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional + +import numpy as np +import tyro +from loguru import logger + +from limb.data.episode_utils import ( + build_action_names, + build_action_vector, + build_state_names, + build_state_vector, + compute_stats, + find_episodes, + load_episode, +) + + +@dataclass +class Args: + input_dir: str + output_dir: str + task: Optional[str] = None + robot_type: str = "yam" + fps: int = 30 + success_only: bool = False + push_to_hub: Optional[str] = None + + +def main(args: Args) -> None: + try: + import pyarrow as pa + import pyarrow.parquet as pq + except ImportError: + logger.error("pyarrow not installed. Run: uv add pyarrow") + raise SystemExit(1) from None + + input_dir = Path(args.input_dir) + output_dir = Path(args.output_dir) + + episodes = find_episodes(input_dir, args.success_only) + if not episodes: + logger.error("No episodes found in {}", input_dir) + raise SystemExit(1) + + logger.info("Found {} episodes in {}", len(episodes), input_dir) + + first_ep = load_episode(episodes[0]) + arm_names = sorted(first_ep["arms"].keys()) + cam_names = [c["name"] for c in first_ep["cameras"]] + task = args.task or first_ep["metadata"].get("task_instruction", "") + + state_names = build_state_names(arm_names, first_ep) + action_names = build_action_names(arm_names, first_ep) + state_dim = len(state_names) + action_dim = len(action_names) + + logger.info("Arms: {}, Cameras: {}", arm_names, cam_names) + logger.info("State dim: {} ({})", state_dim, state_names) + logger.info("Action dim: {} ({})", action_dim, action_names) + + meta_dir = output_dir / "meta" + data_dir = output_dir / "data" / "chunk-000" + meta_dir.mkdir(parents=True, exist_ok=True) + data_dir.mkdir(parents=True, exist_ok=True) + + for cam_name in cam_names: + video_dir = output_dir / "videos" / f"observation.images.{cam_name}" / "chunk-000" + video_dir.mkdir(parents=True, exist_ok=True) + + all_states: List[np.ndarray] = [] + all_actions: List[np.ndarray] = [] + episodes_meta: List[Dict] = [] + total_frames = 0 + + for ep_idx, ep_path in enumerate(episodes): + episode = load_episode(ep_path) + n_steps = len(episode["timestamps"]) if episode["timestamps"] is not None else 0 + + if n_steps == 0: + logger.warning("Skipping empty episode: {}", ep_path.name) + continue + + states = build_state_vector(episode, arm_names) + actions = build_action_vector(episode, arm_names) + n_steps = min(len(states), len(actions)) if len(actions) > 0 else len(states) + states = states[:n_steps] + actions = actions[:n_steps] if len(actions) > 0 else np.zeros((n_steps, action_dim), dtype=np.float32) + + timestamps = ( + episode["timestamps"][:n_steps] + if episode["timestamps"] is not None + else np.arange(n_steps, dtype=np.float64) / args.fps + ) + rel_timestamps = (timestamps - timestamps[0]).astype(np.float32) + + all_states.append(states) + all_actions.append(actions) + + table_data = { + "index": pa.array(np.arange(total_frames, total_frames + n_steps, dtype=np.int64)), + "episode_index": pa.array(np.full(n_steps, ep_idx, dtype=np.int64)), + "frame_index": pa.array(np.arange(n_steps, dtype=np.int64)), + "timestamp": pa.array(rel_timestamps), + "task_index": pa.array(np.zeros(n_steps, dtype=np.int64)), + } + + for i in range(state_dim): + table_data[f"observation.state.{i}"] = pa.array(states[:, i]) + for i in range(action_dim): + table_data[f"action.{i}"] = pa.array(actions[:, i]) + + table = pa.table(table_data) + pq.write_table(table, str(data_dir / f"episode_{ep_idx:06d}.parquet")) + + for cam in episode["cameras"]: + src = cam["video_path"] + dst = ( + output_dir / "videos" / f"observation.images.{cam['name']}" / "chunk-000" / f"episode_{ep_idx:06d}.mp4" + ) + shutil.copy2(str(src), str(dst)) + + episodes_meta.append({"episode_index": ep_idx, "tasks": [task], "length": n_steps}) + total_frames += n_steps + logger.info(" Episode {}: {} steps from {}", ep_idx, n_steps, ep_path.name) + + # Write metadata files + with open(meta_dir / "tasks.jsonl", "w") as f: + f.write(json.dumps({"task_index": 0, "task": task}) + "\n") + + with open(meta_dir / "episodes.jsonl", "w") as f: + for ep in episodes_meta: + f.write(json.dumps(ep) + "\n") + + stats = compute_stats(all_states, all_actions) + with open(meta_dir / "stats.json", "w") as f: + json.dump(stats, f, indent=2) + + features = { + "observation.state": {"dtype": "float32", "shape": [state_dim], "names": state_names}, + "action": {"dtype": "float32", "shape": [action_dim], "names": action_names}, + "timestamp": {"dtype": "float32", "shape": [1], "names": None}, + "frame_index": {"dtype": "int64", "shape": [1], "names": None}, + "episode_index": {"dtype": "int64", "shape": [1], "names": None}, + "index": {"dtype": "int64", "shape": [1], "names": None}, + "task_index": {"dtype": "int64", "shape": [1], "names": None}, + } + for cam_name in cam_names: + features[f"observation.images.{cam_name}"] = { + "dtype": "video", + "shape": [480, 640, 3], + "names": ["height", "width", "channels"], + "info": {"video.fps": args.fps, "video.codec": "mp4v"}, + } + + info = { + "codebase_version": "v2.1", + "robot_type": args.robot_type, + "total_episodes": len(episodes_meta), + "total_frames": total_frames, + "total_tasks": 1, + "fps": args.fps, + "splits": {"train": f"0:{len(episodes_meta)}"}, + "data_path": "data/chunk-{chunk_index:03d}/episode_{episode_index:06d}.parquet", + "video_path": "videos/{video_key}/chunk-{chunk_index:03d}/episode_{episode_index:06d}.mp4", + "chunks_size": 1000, + "features": features, + } + with open(meta_dir / "info.json", "w") as f: + json.dump(info, f, indent=2) + + logger.info("=" * 50) + logger.info("LeRobot dataset written to: {}", output_dir) + logger.info(" Episodes: {}, Total frames: {}", len(episodes_meta), total_frames) + logger.info(" State dim: {}, Action dim: {}", state_dim, action_dim) + logger.info(" Cameras: {}", cam_names) + + if args.push_to_hub: + _push_to_hub(output_dir, args.push_to_hub) + + +def _push_to_hub(dataset_dir: Path, repo_id: str) -> None: + """Upload dataset to HuggingFace Hub.""" + try: + from huggingface_hub import HfApi + except ImportError: + logger.error("huggingface_hub not installed. Run: uv pip install huggingface-hub") + raise SystemExit(1) from None + + api = HfApi() + logger.info("Uploading to HuggingFace Hub: {}", repo_id) + api.create_repo(repo_id, repo_type="dataset", exist_ok=True) + api.upload_folder(folder_path=str(dataset_dir), repo_id=repo_id, repo_type="dataset") + logger.info("Uploaded: https://huggingface.co/datasets/{}", repo_id) + + +if __name__ == "__main__": + main(tyro.cli(Args)) diff --git a/limb/data/convert_webdataset.py b/limb/data/convert_webdataset.py new file mode 100644 index 0000000..303dcfc --- /dev/null +++ b/limb/data/convert_webdataset.py @@ -0,0 +1,311 @@ +"""Convert limb raw recordings to WebDataset tar format. + +WebDataset stores training samples as consecutive files inside .tar archives, +designed for efficient streaming training (no random access overhead). +Each sample is a group of files sharing a key prefix: + + sample_000000.state.npy # (state_dim,) float32 + sample_000000.action.npy # (action_dim,) float32 + sample_000000.jpg # RGB image from primary camera + sample_000000.json # metadata (timestamp, episode_idx, frame_idx) + +Shards are split into configurable sizes (default 1000 samples per tar). +Stats (mean/std/min/max) are computed with Welford's online algorithm and +written alongside the shards. + +Usage: + uv run limb convert-webdataset --input-dir recordings/task --output-dir datasets/task_wds + uv run limb convert-webdataset --input-dir recordings/task --output-dir datasets/task_wds --image-size 224 + +Output structure:: + + datasets/task_wds/ + shard-000000.tar + shard-000001.tar + stats.json + meta.json +""" + +from __future__ import annotations + +import io +import json +import tarfile +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional + +import numpy as np +import tyro +from loguru import logger + +from limb.data.episode_utils import ( + build_action_names, + build_action_vector, + build_state_names, + build_state_vector, + find_episodes, + load_episode, +) + + +@dataclass +class Args: + """Convert raw limb recordings to WebDataset .tar shards.""" + + input_dir: str + output_dir: str + task: Optional[str] = None + samples_per_shard: int = 1000 + image_size: Optional[int] = None # resize images to NxN (None = original) + jpeg_quality: int = 90 + fps: int = 30 + success_only: bool = False + include_depth: bool = False + camera: Optional[str] = None # primary camera name (None = first found) + + +class WelfordStats: + """Welford's online algorithm for numerically stable mean/std.""" + + def __init__(self, dim: int) -> None: + self.n = 0 + self.mean = np.zeros(dim, dtype=np.float64) + self.m2 = np.zeros(dim, dtype=np.float64) + self.min_val = np.full(dim, np.inf, dtype=np.float64) + self.max_val = np.full(dim, -np.inf, dtype=np.float64) + + def update(self, x: np.ndarray) -> None: + self.n += 1 + delta = x - self.mean + self.mean += delta / self.n + delta2 = x - self.mean + self.m2 += delta * delta2 + self.min_val = np.minimum(self.min_val, x) + self.max_val = np.maximum(self.max_val, x) + + def finalize(self) -> Dict: + std = np.sqrt(self.m2 / max(self.n, 1)) + return { + "mean": self.mean.tolist(), + "std": std.tolist(), + "min": self.min_val.tolist(), + "max": self.max_val.tolist(), + "count": self.n, + } + + +def _encode_jpeg(rgb: np.ndarray, quality: int = 90, size: Optional[int] = None) -> bytes: + """Encode RGB numpy array to JPEG bytes, optionally resizing.""" + import cv2 + + if size is not None: + rgb = cv2.resize(rgb, (size, size), interpolation=cv2.INTER_AREA) + bgr = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR) + _, buf = cv2.imencode(".jpg", bgr, [cv2.IMWRITE_JPEG_QUALITY, quality]) + return buf.tobytes() + + +def _add_to_tar(tar: tarfile.TarFile, name: str, data: bytes) -> None: + """Add bytes to a tar archive under the given name.""" + info = tarfile.TarInfo(name=name) + info.size = len(data) + tar.addfile(info, io.BytesIO(data)) + + +def _extract_frames(video_path: Path) -> List[np.ndarray]: + """Extract all frames from a video file as RGB numpy arrays.""" + import cv2 + + frames = [] + cap = cv2.VideoCapture(str(video_path)) + while cap.isOpened(): + ret, frame = cap.read() + if not ret: + break + frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + cap.release() + return frames + + +def main(args: Args) -> None: + input_dir = Path(args.input_dir) + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + episodes = find_episodes(input_dir, args.success_only) + if not episodes: + logger.error("No episodes found in {}", input_dir) + raise SystemExit(1) + + logger.info("Found {} episodes in {}", len(episodes), input_dir) + + # Determine structure from first episode + first_ep = load_episode(episodes[0]) + arm_names = sorted(first_ep["arms"].keys()) + cam_names = [c["name"] for c in first_ep["cameras"]] + task = args.task or first_ep["metadata"].get("task_instruction", "") + + # Pick primary camera for image samples + primary_cam = args.camera + if primary_cam is None and cam_names: + primary_cam = cam_names[0] + logger.info("Using primary camera: {}", primary_cam) + + state_names = build_state_names(arm_names, first_ep) + action_names = build_action_names(arm_names, first_ep) + state_dim = len(state_names) + action_dim = len(action_names) + + logger.info("State dim: {}, Action dim: {}, Cameras: {}", state_dim, action_dim, cam_names) + + # Online stats + state_stats = WelfordStats(state_dim) if state_dim > 0 else None + action_stats = WelfordStats(action_dim) if action_dim > 0 else None + + # Sharding state + shard_idx = 0 + sample_in_shard = 0 + global_sample_idx = 0 + total_samples = 0 + tar: Optional[tarfile.TarFile] = None + + def _open_shard() -> tarfile.TarFile: + nonlocal shard_idx + path = output_dir / f"shard-{shard_idx:06d}.tar" + return tarfile.open(str(path), "w") + + tar = _open_shard() + + for ep_idx, ep_path in enumerate(episodes): + episode = load_episode(ep_path) + n_steps = len(episode["timestamps"]) if episode["timestamps"] is not None else 0 + if n_steps == 0: + logger.warning("Skipping empty episode: {}", ep_path.name) + continue + + states = build_state_vector(episode, arm_names) + actions = build_action_vector(episode, arm_names) + n_steps = min(len(states), len(actions)) if len(actions) > 0 else len(states) + states = states[:n_steps] + actions = actions[:n_steps] if len(actions) > 0 else np.zeros((n_steps, action_dim), dtype=np.float32) + + timestamps = episode["timestamps"][:n_steps] if episode["timestamps"] is not None else None + + # Extract video frames for the primary camera + frames: Optional[List[np.ndarray]] = None + if primary_cam: + video_path = ep_path / f"{primary_cam}.mp4" + if video_path.exists(): + frames = _extract_frames(video_path) + + # Also extract frames for all other cameras + other_cam_frames: Dict[str, List[np.ndarray]] = {} + for cname in cam_names: + if cname == primary_cam: + continue + vid = ep_path / f"{cname}.mp4" + if vid.exists(): + other_cam_frames[cname] = _extract_frames(vid) + + for step in range(n_steps): + key = f"sample_{global_sample_idx:08d}" + + # State and action as .npy + state_buf = io.BytesIO() + np.save(state_buf, states[step]) + _add_to_tar(tar, f"{key}.state.npy", state_buf.getvalue()) + + action_buf = io.BytesIO() + np.save(action_buf, actions[step]) + _add_to_tar(tar, f"{key}.action.npy", action_buf.getvalue()) + + # Primary camera image as JPEG + if frames is not None and step < len(frames): + jpg = _encode_jpeg(frames[step], args.jpeg_quality, args.image_size) + _add_to_tar(tar, f"{key}.jpg", jpg) + + # Additional camera images + for cname, cframes in other_cam_frames.items(): + if step < len(cframes): + jpg = _encode_jpeg(cframes[step], args.jpeg_quality, args.image_size) + _add_to_tar(tar, f"{key}.{cname}.jpg", jpg) + + # Metadata JSON + meta = { + "episode_index": ep_idx, + "frame_index": step, + "task": task, + } + if timestamps is not None: + meta["timestamp"] = float(timestamps[step]) + meta_bytes = json.dumps(meta).encode() + _add_to_tar(tar, f"{key}.json", meta_bytes) + + # Update stats + if state_stats: + state_stats.update(states[step].astype(np.float64)) + if action_stats: + action_stats.update(actions[step].astype(np.float64)) + + global_sample_idx += 1 + sample_in_shard += 1 + total_samples += 1 + + # Rotate shard + if sample_in_shard >= args.samples_per_shard: + tar.close() + shard_idx += 1 + sample_in_shard = 0 + tar = _open_shard() + + logger.info(" Episode {}: {} samples", ep_idx, n_steps) + + if tar is not None: + tar.close() + # Remove empty last shard + if sample_in_shard == 0 and shard_idx > 0: + empty_shard = output_dir / f"shard-{shard_idx:06d}.tar" + if empty_shard.exists(): + empty_shard.unlink() + shard_idx -= 1 + + # Write stats + stats = {} + if state_stats: + stats["observation.state"] = state_stats.finalize() + if action_stats: + stats["action"] = action_stats.finalize() + + with open(output_dir / "stats.json", "w") as f: + json.dump(stats, f, indent=2) + + # Write meta + meta_info = { + "format": "webdataset", + "total_samples": total_samples, + "total_shards": shard_idx + 1, + "samples_per_shard": args.samples_per_shard, + "total_episodes": len(episodes), + "task": task, + "fps": args.fps, + "state_dim": state_dim, + "action_dim": action_dim, + "state_names": state_names, + "action_names": action_names, + "cameras": cam_names, + "primary_camera": primary_cam, + "image_size": args.image_size, + "jpeg_quality": args.jpeg_quality, + } + with open(output_dir / "meta.json", "w") as f: + json.dump(meta_info, f, indent=2) + + logger.info("=" * 50) + logger.info("WebDataset written to: {}", output_dir) + logger.info(" Shards: {}, Samples: {}", shard_idx + 1, total_samples) + logger.info(" State dim: {}, Action dim: {}", state_dim, action_dim) + + +if __name__ == "__main__": + main(tyro.cli(Args)) diff --git a/limb/data/episode_utils.py b/limb/data/episode_utils.py new file mode 100644 index 0000000..4734f37 --- /dev/null +++ b/limb/data/episode_utils.py @@ -0,0 +1,159 @@ +"""Shared episode loading utilities for conversion scripts. + +Provides episode discovery, loading, and state/action vector building +used by all dataset converters (LeRobot, WebDataset, etc.). +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Dict, List + +import numpy as np +from loguru import logger + + +def find_episodes(input_dir: Path, success_only: bool = False) -> List[Path]: + """Find valid episode directories, sorted by name. + + Skips: + - Incomplete episodes (have RECORDING_IN_PROGRESS marker) + - Episodes without metadata.json (corrupted/interrupted) + + When success_only=True, only includes episodes with a SUCCESS marker. + """ + episodes = sorted(p for p in input_dir.iterdir() if p.is_dir() and p.name.startswith("episode_")) + + valid = [] + for ep in episodes: + if (ep / "RECORDING_IN_PROGRESS").exists(): + logger.warning("Skipping incomplete episode: {}", ep.name) + continue + if not (ep / "metadata.json").exists(): + logger.warning("Skipping episode without metadata: {}", ep.name) + continue + valid.append(ep) + + if success_only: + valid = [ep for ep in valid if (ep / "SUCCESS").exists()] + + return valid + + +def load_episode(episode_dir: Path) -> Dict: + """Load all data from a single episode directory.""" + data: Dict = {"dir": episode_dir} + + meta_path = episode_dir / "metadata.json" + if meta_path.exists(): + with open(meta_path) as f: + data["metadata"] = json.load(f) + else: + data["metadata"] = {} + + ts_path = episode_dir / "timestamps.npy" + data["timestamps"] = np.load(str(ts_path)) if ts_path.exists() else None + + data["arms"] = {} + for states_path in episode_dir.glob("*_states.npz"): + arm_name = states_path.stem.replace("_states", "") + arm_data = {"states": dict(np.load(str(states_path)))} + actions_path = episode_dir / f"{arm_name}_actions.npz" + if actions_path.exists(): + arm_data["actions"] = dict(np.load(str(actions_path))) + data["arms"][arm_name] = arm_data + + data["cameras"] = [] + for mp4 in sorted(episode_dir.glob("*.mp4")): + cam_name = mp4.stem + # Skip depth videos + if cam_name.endswith("_depth"): + continue + cam_ts_path = episode_dir / f"{cam_name}_timestamps.npy" + data["cameras"].append( + { + "name": cam_name, + "video_path": mp4, + "timestamps": np.load(str(cam_ts_path)) if cam_ts_path.exists() else None, + } + ) + + return data + + +def build_state_vector(episode: Dict, arm_names: List[str]) -> np.ndarray: + """Concatenate arm states into a single state vector per timestep. + + Order: [left_joint_pos(6), left_gripper(1), right_joint_pos(6), right_gripper(1)] = 14 + """ + parts = [] + for arm_name in arm_names: + states = episode["arms"][arm_name]["states"] + parts.append(states["joint_pos"]) + if "gripper_pos" in states: + parts.append(states["gripper_pos"]) + return np.concatenate(parts, axis=1).astype(np.float32) + + +def build_action_vector(episode: Dict, arm_names: List[str]) -> np.ndarray: + """Concatenate arm actions into a single action vector per timestep. + + Order: [left_pos(7), right_pos(7)] = 14 + """ + parts = [] + for arm_name in arm_names: + arm = episode["arms"][arm_name] + if "actions" in arm and "pos" in arm["actions"]: + parts.append(arm["actions"]["pos"]) + return np.concatenate(parts, axis=1).astype(np.float32) if parts else np.empty((0, 0), dtype=np.float32) + + +def build_state_names(arm_names: List[str], episode: Dict) -> List[str]: + """Build human-readable names for state vector dimensions.""" + names = [] + for arm_name in arm_names: + n_joints = episode["arms"][arm_name]["states"]["joint_pos"].shape[1] + for j in range(n_joints): + names.append(f"{arm_name}_joint_{j}") + if "gripper_pos" in episode["arms"][arm_name]["states"]: + names.append(f"{arm_name}_gripper") + return names + + +def build_action_names(arm_names: List[str], episode: Dict) -> List[str]: + """Build human-readable names for action vector dimensions.""" + names = [] + for arm_name in arm_names: + if "actions" in episode["arms"][arm_name] and "pos" in episode["arms"][arm_name]["actions"]: + n_dims = episode["arms"][arm_name]["actions"]["pos"].shape[1] + for j in range(n_dims): + names.append(f"{arm_name}_action_{j}") + return names + + +def compute_stats(all_states: List[np.ndarray], all_actions: List[np.ndarray]) -> Dict: + """Compute per-feature statistics (min, max, mean, std).""" + stats = {} + + if all_states: + cat_states = np.concatenate(all_states, axis=0) + stats["observation.state"] = { + "min": cat_states.min(axis=0).tolist(), + "max": cat_states.max(axis=0).tolist(), + "mean": cat_states.mean(axis=0).tolist(), + "std": cat_states.std(axis=0).tolist(), + "count": int(cat_states.shape[0]), + } + + if all_actions: + cat_actions = np.concatenate(all_actions, axis=0) + stats["action"] = { + "min": cat_actions.min(axis=0).tolist(), + "max": cat_actions.max(axis=0).tolist(), + "mean": cat_actions.mean(axis=0).tolist(), + "std": cat_actions.std(axis=0).tolist(), + "count": int(cat_actions.shape[0]), + } + + return stats diff --git a/limb/data/upload.py b/limb/data/upload.py new file mode 100644 index 0000000..835bb33 --- /dev/null +++ b/limb/data/upload.py @@ -0,0 +1,181 @@ +"""Upload datasets to cloud storage (S3, GCS, or HuggingFace Hub). + +Auth follows standard SDK credential chains — no limb-specific secrets. + - S3: AWS_PROFILE, AWS_ACCESS_KEY_ID, or IAM role + - GCS: GOOGLE_APPLICATION_CREDENTIALS, gcloud auth, or service account + - HuggingFace: HF_TOKEN env var or `huggingface-cli login` + +Storage target is specified as a URI: + - s3://bucket/prefix/dataset + - gs://bucket/prefix/dataset + - hf://username/dataset-name + +Or via a config file at ~/.config/limb/storage.yaml:: + + default: s3://my-bucket/datasets + # Or per-task overrides: + targets: + pick_cube: gs://lab-bucket/pick_cube + pour_water: s3://other-bucket/pour_water + +Usage: + uv run limb upload --source datasets/task_wds --target s3://bucket/prefix + uv run limb upload --source datasets/task_lerobot --target gs://bucket/prefix + uv run limb upload --source datasets/task --target hf://username/dataset-name + uv run limb upload --source datasets/task # uses default from storage.yaml +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +import tyro +from loguru import logger + + +def _load_storage_config() -> dict: + """Load storage config from ~/.config/limb/storage.yaml if it exists.""" + config_path = Path.home() / ".config" / "limb" / "storage.yaml" + if not config_path.exists(): + return {} + try: + from omegaconf import OmegaConf + + conf = OmegaConf.load(str(config_path)) + return OmegaConf.to_container(conf, resolve=True) + except Exception as e: + logger.warning("Failed to load storage config: {}", e) + return {} + + +def _resolve_target(target: Optional[str], task_name: Optional[str] = None) -> str: + """Resolve upload target from explicit arg, task override, or default config.""" + if target: + return target + + config = _load_storage_config() + + # Check task-specific override + if task_name and "targets" in config: + if task_name in config["targets"]: + resolved = config["targets"][task_name] + logger.info("Using task-specific target from storage.yaml: {}", resolved) + return resolved + + # Use default + if "default" in config: + resolved = config["default"] + logger.info("Using default target from storage.yaml: {}", resolved) + return resolved + + logger.error( + "No upload target specified. Pass --target or create ~/.config/limb/storage.yaml with a 'default' key." + ) + raise SystemExit(1) + + +def _upload_s3(source: Path, bucket: str, prefix: str) -> None: + """Upload directory to S3 using boto3.""" + try: + import boto3 + except ImportError: + logger.error("boto3 not installed. Run: uv add boto3") + raise SystemExit(1) from None + + s3 = boto3.client("s3") + files = sorted(f for f in source.rglob("*") if f.is_file()) + logger.info("Uploading {} files to s3://{}/{}", len(files), bucket, prefix) + + for i, f in enumerate(files): + key = f"{prefix}/{f.relative_to(source)}" + s3.upload_file(str(f), bucket, key) + if (i + 1) % 50 == 0 or i == len(files) - 1: + logger.info(" {}/{} uploaded", i + 1, len(files)) + + logger.info("Upload complete: s3://{}/{}", bucket, prefix) + + +def _upload_gcs(source: Path, bucket: str, prefix: str) -> None: + """Upload directory to GCS using google-cloud-storage.""" + try: + from google.cloud import storage + except ImportError: + logger.error("google-cloud-storage not installed. Run: uv add google-cloud-storage") + raise SystemExit(1) from None + + client = storage.Client() + bucket_obj = client.bucket(bucket) + files = sorted(f for f in source.rglob("*") if f.is_file()) + logger.info("Uploading {} files to gs://{}/{}", len(files), bucket, prefix) + + for i, f in enumerate(files): + blob_name = f"{prefix}/{f.relative_to(source)}" + blob = bucket_obj.blob(blob_name) + blob.upload_from_filename(str(f)) + if (i + 1) % 50 == 0 or i == len(files) - 1: + logger.info(" {}/{} uploaded", i + 1, len(files)) + + logger.info("Upload complete: gs://{}/{}", bucket, prefix) + + +def _upload_hf(source: Path, repo_id: str) -> None: + """Upload directory to HuggingFace Hub.""" + try: + from huggingface_hub import HfApi + except ImportError: + logger.error("huggingface_hub not installed. Run: uv add huggingface-hub") + raise SystemExit(1) from None + + api = HfApi() + logger.info("Uploading to HuggingFace Hub: {}", repo_id) + api.create_repo(repo_id, repo_type="dataset", exist_ok=True) + api.upload_folder(folder_path=str(source), repo_id=repo_id, repo_type="dataset") + logger.info("Uploaded: https://huggingface.co/datasets/{}", repo_id) + + +def upload(source: str, target: str) -> None: + """Upload a dataset directory to the specified target URI.""" + source_path = Path(source) + if not source_path.exists(): + logger.error("Source directory not found: {}", source) + raise SystemExit(1) + + if target.startswith("s3://"): + parts = target[5:].split("/", 1) + bucket = parts[0] + prefix = parts[1] if len(parts) > 1 else source_path.name + _upload_s3(source_path, bucket, prefix) + + elif target.startswith("gs://"): + parts = target[5:].split("/", 1) + bucket = parts[0] + prefix = parts[1] if len(parts) > 1 else source_path.name + _upload_gcs(source_path, bucket, prefix) + + elif target.startswith("hf://"): + repo_id = target[5:] + _upload_hf(source_path, repo_id) + + else: + logger.error("Unknown target scheme. Use s3://, gs://, or hf:// prefix.") + raise SystemExit(1) + + +@dataclass +class Args: + """Upload a dataset to cloud storage.""" + + source: str # local dataset directory + target: Optional[str] = None # s3://bucket/prefix, gs://bucket/prefix, or hf://user/repo + task: Optional[str] = None # task name for storage.yaml lookup + + +def main(args: Args) -> None: + resolved = _resolve_target(args.target, args.task) + upload(args.source, resolved) + + +if __name__ == "__main__": + main(tyro.cli(Args)) diff --git a/limb/data/visualize_episode.py b/limb/data/visualize_episode.py new file mode 100644 index 0000000..e0addae --- /dev/null +++ b/limb/data/visualize_episode.py @@ -0,0 +1,181 @@ +"""Visualize a recorded episode using Rerun. + +Displays joint trajectories, gripper state, EE pose, and camera video +in a synchronized timeline viewer. + +Usage: + uv run limb visualize --episode-dir recordings/episode_20260304_153045_0001 +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path + +import numpy as np +import tyro +from loguru import logger + + +@dataclass +class Args: + episode_dir: str + no_video: bool = False + timeline_name: str = "step" + + +def main(args: Args) -> None: + try: + import rerun as rr + import rerun.blueprint as rrb + except ImportError: + logger.error("rerun-sdk not installed. Run: uv add rerun-sdk") + raise SystemExit(1) from None + + episode_dir = Path(args.episode_dir) + if not episode_dir.exists(): + logger.error("Episode directory not found: {}", episode_dir) + raise SystemExit(1) + + metadata_path = episode_dir / "metadata.json" + if metadata_path.exists(): + with open(metadata_path) as f: + metadata = json.load(f) + logger.info("Episode: {} steps, {:.1f}s", metadata.get("num_steps", "?"), metadata.get("duration_s", 0)) + else: + metadata = {} + + timestamps_path = episode_dir / "timestamps.npy" + timestamps = np.load(str(timestamps_path)) if timestamps_path.exists() else None + + arm_names = metadata.get("arms", []) + if not arm_names: + arm_names = sorted(p.stem.replace("_states", "") for p in episode_dir.glob("*_states.npz")) + + all_arms = {} + for arm_name in arm_names: + states_path = episode_dir / f"{arm_name}_states.npz" + if not states_path.exists(): + continue + arm = {"states": dict(np.load(str(states_path)))} + actions_path = episode_dir / f"{arm_name}_actions.npz" + if actions_path.exists(): + arm["actions"] = dict(np.load(str(actions_path))) + all_arms[arm_name] = arm + logger.info( + "Logging {}: {} steps, keys={}", + arm_name, + len(next(iter(arm["states"].values()))), + list(arm["states"].keys()), + ) + + views = [] + for arm_name in arm_names: + if arm_name not in all_arms: + continue + states = all_arms[arm_name]["states"] + if "joint_pos" in states: + views.append(rrb.TimeSeriesView(name=f"{arm_name} joint_pos", origin=f"{arm_name}/joint_pos")) + if "joint_vel" in states: + views.append(rrb.TimeSeriesView(name=f"{arm_name} joint_vel", origin=f"{arm_name}/joint_vel")) + if "gripper_pos" in states: + views.append(rrb.TimeSeriesView(name=f"{arm_name} gripper", origin=f"{arm_name}/gripper")) + if "actions" in all_arms[arm_name] and "pos" in all_arms[arm_name]["actions"]: + views.append(rrb.TimeSeriesView(name=f"{arm_name} action", origin=f"{arm_name}/action")) + + cam_names = [] + if not args.no_video: + cam_names = metadata.get("cameras", []) + if not cam_names: + cam_names = [p.stem for p in episode_dir.glob("*.mp4")] + for cam_name in cam_names: + views.append(rrb.Spatial2DView(name=cam_name, origin=f"cameras/{cam_name}")) + + blueprint = rrb.Blueprint(rrb.Grid(*views)) + rr.init(f"limb — {episode_dir.name}", spawn=True, default_blueprint=blueprint) + rr.send_blueprint(blueprint) + + for arm_name, arm in all_arms.items(): + states = arm["states"] + if "joint_pos" in states: + for j in range(states["joint_pos"].shape[1]): + rr.log(f"{arm_name}/joint_pos/j{j}", rr.SeriesLines(names=f"j{j}"), static=True) + if "joint_vel" in states: + for j in range(states["joint_vel"].shape[1]): + rr.log(f"{arm_name}/joint_vel/j{j}", rr.SeriesLines(names=f"j{j}"), static=True) + if "gripper_pos" in states: + rr.log(f"{arm_name}/gripper/pos", rr.SeriesLines(names="gripper"), static=True) + if "actions" in arm and "pos" in arm["actions"]: + for j in range(arm["actions"]["pos"].shape[1]): + rr.log(f"{arm_name}/action/j{j}", rr.SeriesLines(names=f"j{j}"), static=True) + + n_steps = ( + len(timestamps) + if timestamps is not None + else max(len(next(iter(arm["states"].values()))) for arm in all_arms.values()) + ) + for i in range(n_steps): + rr.set_time(args.timeline_name, sequence=i) + if timestamps is not None and i < len(timestamps): + rr.set_time("wall_clock", timestamp=timestamps[i]) + + for arm_name, arm in all_arms.items(): + states = arm["states"] + if "joint_pos" in states and i < len(states["joint_pos"]): + for j, val in enumerate(states["joint_pos"][i]): + rr.log(f"{arm_name}/joint_pos/j{j}", rr.Scalars(val)) + if "joint_vel" in states and i < len(states["joint_vel"]): + for j, val in enumerate(states["joint_vel"][i]): + rr.log(f"{arm_name}/joint_vel/j{j}", rr.Scalars(val)) + if "gripper_pos" in states and i < len(states["gripper_pos"]): + rr.log(f"{arm_name}/gripper/pos", rr.Scalars(states["gripper_pos"][i])) + if "actions" in arm and "pos" in arm["actions"] and i < len(arm["actions"]["pos"]): + for j, val in enumerate(arm["actions"]["pos"][i]): + rr.log(f"{arm_name}/action/j{j}", rr.Scalars(val)) + + if not args.no_video: + for cam_name in cam_names: + video_path = episode_dir / f"{cam_name}.mp4" + if not video_path.exists(): + continue + + cam_timestamps_path = episode_dir / f"{cam_name}_timestamps.npy" + cam_timestamps = np.load(str(cam_timestamps_path)) if cam_timestamps_path.exists() else None + if cam_timestamps is not None and len(cam_timestamps) > 0 and cam_timestamps[0] > 1e12: + cam_timestamps = cam_timestamps / 1e3 + + import cv2 + + cap = cv2.VideoCapture(str(video_path)) + frame_idx = 0 + logger.info("Logging video: {}", cam_name) + + while cap.isOpened(): + ret, frame = cap.read() + if not ret: + break + + rr.set_time(args.timeline_name, sequence=frame_idx) + if cam_timestamps is not None and frame_idx < len(cam_timestamps): + rr.set_time("wall_clock", timestamp=cam_timestamps[frame_idx]) + + rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + rr.log(f"cameras/{cam_name}", rr.Image(rgb)) + frame_idx += 1 + + cap.release() + logger.info(" {} frames logged", frame_idx) + + logger.info("Done logging. Viewer should be open. Press Ctrl+C to exit.") + + try: + import signal + + signal.pause() + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main(tyro.cli(Args)) diff --git a/limb/discovery.py b/limb/discovery.py new file mode 100644 index 0000000..2ec020a --- /dev/null +++ b/limb/discovery.py @@ -0,0 +1,234 @@ +"""Hardware device discovery for limb. + +Enumerates connected cameras, robot CAN interfaces, and input devices. +Inspired by Raiden's `rd list_devices` command. + +Usage: + uv run limb devices + uv run limb devices --verbose +""" + +from __future__ import annotations + +from loguru import logger + + +def _discover_realsense() -> list[dict]: + """Find connected Intel RealSense cameras.""" + results = [] + try: + import pyrealsense2 as rs + + ctx = rs.context() + for dev in ctx.query_devices(): + info = { + "type": "realsense", + "name": dev.get_info(rs.camera_info.name), + "serial": dev.get_info(rs.camera_info.serial_number), + "firmware": dev.get_info(rs.camera_info.firmware_version), + "usb_type": dev.get_info(rs.camera_info.usb_type_descriptor) + if dev.supports(rs.camera_info.usb_type_descriptor) + else "unknown", + } + results.append(info) + except ImportError: + logger.debug("pyrealsense2 not available, skipping RealSense discovery") + except Exception as e: + logger.debug("RealSense discovery error: {}", e) + return results + + +def _discover_zed() -> list[dict]: + """Find connected ZED cameras.""" + results = [] + try: + from pyzed import sl + + devs = sl.Camera.get_device_list() + for dev in devs: + info = { + "type": "zed", + "model": str(dev.camera_model), + "serial": str(dev.serial_number), + "state": str(dev.camera_state), + } + results.append(info) + except ImportError: + logger.debug("pyzed not available, skipping ZED discovery") + except Exception as e: + logger.debug("ZED discovery error: {}", e) + return results + + +def _discover_can_interfaces() -> list[dict]: + """Find CAN network interfaces (for YAM motor chains).""" + results = [] + try: + import os + + net_dir = "/sys/class/net" + if os.path.isdir(net_dir): + for iface in sorted(os.listdir(net_dir)): + if iface.startswith("can") or iface.startswith("vcan"): + state_file = os.path.join(net_dir, iface, "operstate") + state = "unknown" + try: + with open(state_file) as f: + state = f.read().strip() + except OSError: + pass + results.append({"type": "can", "interface": iface, "state": state}) + except Exception as e: + logger.debug("CAN discovery error: {}", e) + return results + + +def _discover_dynamixel() -> list[dict]: + """Find Dynamixel USB serial devices (GELLO controllers).""" + results = [] + try: + import glob + + # Common Dynamixel USB-serial paths + patterns = ["/dev/ttyUSB*", "/dev/ttyACM*", "/dev/serial/by-id/*dynamixel*", "/dev/serial/by-id/*FTDI*"] + seen = set() + for pattern in patterns: + for path in sorted(glob.glob(pattern)): + if path not in seen: + seen.add(path) + results.append({"type": "dynamixel_serial", "path": path}) + except Exception as e: + logger.debug("Dynamixel discovery error: {}", e) + return results + + +def _discover_foot_pedals() -> list[dict]: + """Find USB foot pedals via evdev.""" + results = [] + try: + import evdev + + KNOWN_PEDALS = { + (0x3553, 0xB001): "PCsensor FootSwitch", + (0x1A86, 0xE026): "iKKEGOL FootSwitch", + } + for path in evdev.list_devices(): + try: + dev = evdev.InputDevice(path) + key = (dev.info.vendor, dev.info.product) + if key in KNOWN_PEDALS: + results.append( + { + "type": "foot_pedal", + "name": KNOWN_PEDALS[key], + "path": path, + "vendor": f"0x{dev.info.vendor:04x}", + "product": f"0x{dev.info.product:04x}", + } + ) + dev.close() + except (OSError, PermissionError): + continue + except ImportError: + logger.debug("evdev not available, skipping foot pedal discovery") + except Exception as e: + logger.debug("Foot pedal discovery error: {}", e) + return results + + +def _discover_spacemouse() -> list[dict]: + """Find 3Dconnexion SpaceMouse devices via evdev.""" + results = [] + try: + import evdev + + SPACEMOUSE_VENDORS = {0x256F} # 3Dconnexion + for path in evdev.list_devices(): + try: + dev = evdev.InputDevice(path) + if dev.info.vendor in SPACEMOUSE_VENDORS: + results.append( + { + "type": "spacemouse", + "name": dev.name, + "path": path, + "vendor": f"0x{dev.info.vendor:04x}", + "product": f"0x{dev.info.product:04x}", + } + ) + dev.close() + except (OSError, PermissionError): + continue + except ImportError: + logger.debug("evdev not available, skipping SpaceMouse discovery") + except Exception as e: + logger.debug("SpaceMouse discovery error: {}", e) + return results + + +def discover_devices(verbose: bool = False) -> dict[str, list[dict]]: + """Discover all connected hardware and print a summary. + + Returns a dict of device category -> list of device info dicts. + """ + logger.info("Scanning for connected devices...\n") + + all_devices: dict[str, list[dict]] = {} + + # Cameras + realsense = _discover_realsense() + zed = _discover_zed() + cameras = realsense + zed + all_devices["cameras"] = cameras + + if cameras: + logger.info("Cameras ({}):", len(cameras)) + for cam in cameras: + if cam["type"] == "realsense": + usb = f" (USB {cam['usb_type']})" if verbose else "" + logger.info(" [RealSense] {} — serial: {}{}", cam["name"], cam["serial"], usb) + elif cam["type"] == "zed": + logger.info(" [ZED] {} — serial: {}", cam["model"], cam["serial"]) + else: + logger.info("Cameras: none found") + + # CAN interfaces (robot arms) + can_ifaces = _discover_can_interfaces() + all_devices["can_interfaces"] = can_ifaces + + if can_ifaces: + logger.info("\nCAN interfaces ({}):", len(can_ifaces)) + for iface in can_ifaces: + logger.info(" {} — state: {}", iface["interface"], iface["state"]) + else: + logger.info("\nCAN interfaces: none found") + + # Dynamixel serial (GELLO) + dxl = _discover_dynamixel() + all_devices["dynamixel"] = dxl + + if dxl: + logger.info("\nDynamixel serial ports ({}):", len(dxl)) + for d in dxl: + logger.info(" {}", d["path"]) + else: + logger.info("\nDynamixel serial ports: none found") + + # Input devices + pedals = _discover_foot_pedals() + spacemice = _discover_spacemouse() + inputs = pedals + spacemice + all_devices["input_devices"] = inputs + + if inputs: + logger.info("\nInput devices ({}):", len(inputs)) + for dev in inputs: + extra = f" — {dev['path']}" if verbose else "" + logger.info(" [{}] {}{}", dev["type"], dev.get("name", "unknown"), extra) + else: + logger.info("\nInput devices: none found (foot pedals, SpaceMouse)") + + total = sum(len(v) for v in all_devices.values()) + logger.info("\nTotal: {} device(s) found", total) + + return all_devices diff --git a/limb/envs/launch.py b/limb/envs/launch.py index 6af0eaa..5215b33 100644 --- a/limb/envs/launch.py +++ b/limb/envs/launch.py @@ -33,7 +33,12 @@ setup_can_interfaces, setup_logging, ) -from limb.visualization.viser_monitor import ViserMonitor +from limb.visualization.app import ViserApp +from limb.visualization.panels.camera_panel import CameraPanel +from limb.visualization.panels.loop_status_panel import LoopStatusPanel +from limb.visualization.panels.recording_panel import RecordingPanel +from limb.visualization.panels.session_panel import SessionPanel +from limb.visualization.panels.urdf_panel import URDFPanel SAFE_MOVE_DURATION_S = 1.0 IK_WARMUP_TIMEOUT_S = 15.0 @@ -278,23 +283,32 @@ def main(args: Args) -> None: agent = initialize_agent(agent_cfg, server_processes) - # Create a standalone ViserMonitor for agents that don't have their own - # (e.g. GELLO, VR). YamViserAgent already embeds a ViserMonitor. - monitor: Optional[ViserMonitor] = None + # Build viser app with panels for agents that don't have their own + # viser server (GELLO, VR). YamViserAgent has its own viser in-process. + viser_app: Optional[ViserApp] = None agent_target = agent_cfg.get("_target_", "") - if main_config.enable_monitor and "YamViserAgent" not in agent_target: + agent_has_viser = "YamViserAgent" in agent_target + has_collection = main_config.collection is not None + + if main_config.enable_monitor and not agent_has_viser: is_bimanual = len(robots) > 1 right_extrinsic = ( main_config.station_metadata.get("extrinsics", {}).get("right_arm_extrinsic") if main_config.station_metadata else None ) - monitor = ViserMonitor( - enable_urdf=True, - bimanual=is_bimanual, - right_arm_extrinsic=right_extrinsic, - ) - logger.info("ViserMonitor started (standalone) for camera feeds + recording + URDF") + viser_app = ViserApp() + camera_panel = CameraPanel() + viser_app.add_panel("cameras", camera_panel) + viser_app.add_panel("urdf", URDFPanel(bimanual=is_bimanual, right_arm_extrinsic=right_extrinsic)) + logger.info("ViserApp started with camera + URDF panels") + elif main_config.enable_monitor and agent_has_viser and has_collection: + # YamViserAgent + data collection: create a lightweight viser app + # (no URDF — agent has its own). Session panel needs a server. + viser_app = ViserApp() + camera_panel = CameraPanel() + viser_app.add_panel("cameras", camera_panel) + logger.info("ViserApp started (lightweight) for session panel") logger.info("Creating robot environment...") frequency = main_config.hz @@ -342,18 +356,37 @@ def main(args: Args) -> None: recorder = instantiate(main_config.recording) logger.info("EpisodeRecorder configured (base_dir={})", recorder.base_dir) - display = StatusDisplay() - display.start() - if session is not None: - session.display = display + # Wire up display: viser panels when app exists, Rich TUI otherwise. + display: Any = None + if viser_app is not None and session is not None: + from limb.recording.trigger import CompositeTrigger + + session_panel = SessionPanel() + viser_app.add_panel("session", session_panel) + session_panel.start() + session.trigger = CompositeTrigger(sources=[session_panel, session.trigger]) + session.display = session_panel + display = session_panel + elif viser_app is not None: + # No session — add recording button + loop status + viser_app.add_panel("recording", RecordingPanel()) + loop_panel = LoopStatusPanel() + viser_app.add_panel("status", loop_panel) + display = loop_panel + else: + display = StatusDisplay() + display.start() + if session is not None: + session.display = display logger.info("Starting control loop...") try: _run_control_loop( - env, agent, main_config, monitor=monitor, recorder=recorder, session=session, display=display + env, agent, main_config, viser_app=viser_app, recorder=recorder, session=session, display=display ) finally: - display.stop() + if hasattr(display, "stop"): + display.stop() except KeyboardInterrupt: logger.info("KeyboardInterrupt received, initiating safe shutdown...") @@ -385,8 +418,8 @@ def main(args: Args) -> None: session.close() elif "recorder" in locals() and recorder is not None: recorder.close() - if "monitor" in locals() and monitor is not None: - monitor.close() + if "viser_app" in locals() and viser_app is not None: + viser_app.close() if "env" in locals(): env.close() if "agent" in locals(): @@ -464,10 +497,10 @@ def _run_control_loop( env: RobotEnv, agent: Agent, config: LaunchConfig, - monitor: Optional[ViserMonitor] = None, + viser_app: Optional[ViserApp] = None, recorder: Optional[EpisodeRecorder] = None, session: Optional[DataCollectionSession] = None, - display: Optional[StatusDisplay] = None, + display: Any = None, ) -> None: """Run the main control loop. Exits when _shutdown_requested is set by SIGINT.""" steps = 0 @@ -491,8 +524,8 @@ def _run_control_loop( with Timeout(1, "Env step", "warning"): obs = env.step(action) - if monitor is not None: - monitor.update(obs) + if viser_app is not None: + viser_app.update(obs) steps += 1 loop_count += 1 diff --git a/limb/recording/episode_recorder.py b/limb/recording/episode_recorder.py index 03b46a7..f9266b5 100644 --- a/limb/recording/episode_recorder.py +++ b/limb/recording/episode_recorder.py @@ -69,9 +69,15 @@ def __post_init__(self) -> None: self._writers: Dict[str, AsyncVideoWriter] = {} self._cam_timestamps: Dict[str, List[float]] = {} + # Depth recording (when cameras provide depth data) + self._depth_writers: Dict[str, AsyncVideoWriter] = {} + # Metadata self._metadata: Dict[str, Any] = {} + # Clean up any incomplete episodes from previous runs + self._cleanup_incomplete_episodes() + if self.auto_start: self.start_episode() @@ -83,6 +89,48 @@ def _find_next_episode_count(self) -> int: existing = sorted(base.glob("episode_*")) return len(existing) + def _cleanup_incomplete_episodes(self) -> None: + """Remove incomplete episodes from previous runs. + + An episode is considered incomplete if it has a RECORDING_IN_PROGRESS + marker but no metadata.json (meaning it was interrupted before saving). + + Safety: the marker file contains the PID that created it. We only + delete an episode if the owning process is no longer running, to + avoid removing another process's active recording. + """ + import shutil + + base = Path(self.base_dir) + if not base.exists(): + return + + for ep_dir in sorted(base.glob("episode_*")): + marker = ep_dir / "RECORDING_IN_PROGRESS" + metadata = ep_dir / "metadata.json" + if marker.exists() and not metadata.exists(): + # Check if the owning process is still alive + if self._is_marker_owner_alive(marker): + logger.debug("Skipping active recording (owner alive): {}", ep_dir.name) + continue + logger.warning("Removing incomplete episode: {}", ep_dir.name) + shutil.rmtree(ep_dir, ignore_errors=True) + elif marker.exists() and metadata.exists(): + # Recording finished but marker wasn't cleaned up (crash during save) + marker.unlink(missing_ok=True) + + @staticmethod + def _is_marker_owner_alive(marker: Path) -> bool: + """Check if the process that wrote the marker is still running.""" + import os + + try: + pid = int(marker.read_text().strip()) + os.kill(pid, 0) # signal 0 = existence check, no actual signal sent + return True + except (ValueError, OSError, ProcessLookupError): + return False + @property def is_recording(self) -> bool: return self._recording @@ -103,12 +151,18 @@ def start_episode(self, metadata: Optional[Dict[str, Any]] = None) -> Path: self._actions = {} self._writers = {} self._cam_timestamps = {} + self._depth_writers = {} self._metadata = metadata or {} self._metadata["start_time"] = time.time() self._metadata["start_time_str"] = ts if self.ee_frame_names is not None: self._metadata["ee_frame_names"] = self.ee_frame_names + # Mark episode as in-progress with our PID (removed on successful save) + import os + + (self._episode_dir / "RECORDING_IN_PROGRESS").write_text(str(os.getpid())) + self._recording = True self._episode_count += 1 logger.info("Episode recording started -> {}", self._episode_dir) @@ -166,6 +220,19 @@ def record(self, obs: Observation, action: Dict[str, Any]) -> None: self._writers[cam_name].write(cam_obs.rgb) self._cam_timestamps[cam_name].append(cam_obs.timestamp) + # Record depth as 16-bit grayscale video when available + depth = getattr(cam_obs, "depth", None) + if depth is not None and cam_name not in self._depth_writers: + h, w = depth.shape[:2] + depth_path = str(self._episode_dir / f"{cam_name}_depth.mp4") + depth_writer = AsyncVideoWriter( + path=depth_path, width=w, height=h, fps=self.recording_fps, pix_fmt="gray16le" + ) + depth_writer.start() + self._depth_writers[cam_name] = depth_writer + if depth is not None and cam_name in self._depth_writers: + self._depth_writers[cam_name].write(depth) + self._step_idx += 1 def stop_episode(self) -> Optional[Path]: @@ -184,6 +251,8 @@ def _stop_episode_unlocked(self) -> Optional[Path]: # Flush async video writers (waits for ffmpeg to finish) for w in self._writers.values(): w.stop() + for w in self._depth_writers.values(): + w.stop() # Save timestamps np.save(str(episode_dir / "timestamps.npy"), np.array(self._timestamps, dtype=np.float64)) @@ -217,9 +286,13 @@ def _stop_episode_unlocked(self) -> Optional[Path]: self._metadata["recording_fps"] = self.recording_fps self._metadata["cameras"] = list(self._cam_timestamps.keys()) self._metadata["arms"] = list(self._arm_states.keys()) + self._metadata["has_depth"] = bool(self._depth_writers) with open(str(episode_dir / "metadata.json"), "w") as f: json.dump(self._metadata, f, indent=2, default=str) + # Remove in-progress marker — episode is now complete + (episode_dir / "RECORDING_IN_PROGRESS").unlink(missing_ok=True) + logger.info( "Episode saved: {} ({} steps, {:.1f}s)", episode_dir, diff --git a/limb/replay.py b/limb/replay.py new file mode 100644 index 0000000..d26f31f --- /dev/null +++ b/limb/replay.py @@ -0,0 +1,242 @@ +"""Motion replay — replay recorded episodes on hardware for verification. + +Inspired by Raiden's `rd replay` command. Streams joint commands from a +recorded episode to the physical robot at configurable speed. Useful for +checking recording quality before conversion. + +Usage: + uv run limb replay --episode-dir recordings/session/episode_20260304_153045_0001 + uv run limb replay --episode-dir recordings/session/episode_... --speed 0.5 +""" + +from __future__ import annotations + +import json +import signal +import threading +from pathlib import Path +from typing import Any, Dict, List + +import numpy as np +from loguru import logger + +_shutdown_requested = False + + +def _sigint_handler(signum: int, frame: Any) -> None: + global _shutdown_requested + if _shutdown_requested: + raise KeyboardInterrupt + _shutdown_requested = True + + +def _load_episode_data(episode_dir: Path) -> Dict[str, Any]: + """Load arm states and timestamps from a recorded episode.""" + data: Dict[str, Any] = {} + + # Load metadata + meta_path = episode_dir / "metadata.json" + if meta_path.exists(): + with open(meta_path) as f: + data["metadata"] = json.load(f) + else: + raise FileNotFoundError(f"No metadata.json in {episode_dir}") + + # Load timestamps + ts_path = episode_dir / "timestamps.npy" + if ts_path.exists(): + data["timestamps"] = np.load(str(ts_path)) + else: + raise FileNotFoundError(f"No timestamps.npy in {episode_dir}") + + # Load arm states + data["arms"] = {} + for states_path in sorted(episode_dir.glob("*_states.npz")): + arm_name = states_path.stem.replace("_states", "") + arm_data = dict(np.load(str(states_path))) + data["arms"][arm_name] = arm_data + + # Load actions (preferred for replay since they include gripper) + data["actions"] = {} + for actions_path in sorted(episode_dir.glob("*_actions.npz")): + arm_name = actions_path.stem.replace("_actions", "") + data["actions"][arm_name] = dict(np.load(str(actions_path))) + + return data + + +def replay_episode( + episode_dir: str, + config_path: List[str], + speed: float = 1.0, + log_level: str = "INFO", +) -> None: + """Replay a recorded episode on hardware. + + Parameters + ---------- + episode_dir : str + Path to the episode directory containing states/actions. + config_path : list[str] + Robot config YAML paths (needed to initialize hardware). + speed : float + Playback speed multiplier (0.5 = half speed, 2.0 = double). + log_level : str + Logging level. + """ + global _shutdown_requested + _shutdown_requested = False + + from limb.envs.configs.instantiate import instantiate + from limb.envs.configs.loader import DictLoader + from limb.robots.robot import Robot + from limb.robots.utils import Rate + from limb.utils.launch_utils import ( + cleanup_processes, + initialize_robots, + setup_can_interfaces, + setup_logging, + ) + + setup_logging(level=log_level) + + ep_path = Path(episode_dir) + if not ep_path.exists(): + logger.error("Episode directory not found: {}", episode_dir) + return + + logger.info("Loading episode data from: {}", ep_path) + episode_data = _load_episode_data(ep_path) + + timestamps = episode_data["timestamps"] + n_steps = len(timestamps) + if n_steps == 0: + logger.error("Episode has no timesteps (empty timestamps.npy)") + return + arm_names = sorted(episode_data["arms"].keys()) + if not arm_names: + logger.error("Episode has no arm data") + return + duration = timestamps[-1] - timestamps[0] + + logger.info( + "Episode: {} steps, {:.1f}s duration, arms: {}, speed: {:.1f}x", + n_steps, + duration, + arm_names, + speed, + ) + + # Use actions if available (includes gripper), fall back to states + use_actions = bool(episode_data["actions"]) + if use_actions: + logger.info("Replaying from recorded actions (joint_pos + gripper)") + else: + logger.info("Replaying from recorded states (joint_pos only)") + + # Load config and initialize hardware + configs_dict = DictLoader.load(config_path) + configs_dict.pop("agent", None) + configs_dict.pop("sensors", None) + configs_dict.pop("api_servers", None) + configs_dict.pop("collection", None) + configs_dict.pop("recording", None) + main_config = instantiate(configs_dict) + + original_sigint = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, _sigint_handler) + + server_processes: list = [] + robots: Dict[str, Robot] = {} + + try: + setup_can_interfaces() + robots = initialize_robots(main_config.robots, server_processes) + + # Move to first pose over 3 seconds + first_targets = {} + for arm_name in arm_names: + if arm_name not in robots: + logger.warning("Arm '{}' in recording but not in robot config, skipping", arm_name) + continue + if use_actions and arm_name in episode_data["actions"] and "pos" in episode_data["actions"][arm_name]: + first_targets[arm_name] = episode_data["actions"][arm_name]["pos"][0] + elif "joint_pos" in episode_data["arms"][arm_name]: + jp = episode_data["arms"][arm_name]["joint_pos"][0] + gp = episode_data["arms"].get(arm_name, {}).get("gripper_pos", None) + if gp is not None and len(gp) > 0: + first_targets[arm_name] = np.concatenate([jp, gp[0]]) + else: + first_targets[arm_name] = jp + + if first_targets: + logger.info("Moving to first pose over 3.0s...") + + def _move_one(name: str, robot: Robot, target: np.ndarray) -> None: + try: + robot.move_joints(target, 3.0) + except Exception as e: + logger.warning("Could not move '{}' to start pose: {}", name, e) + + threads = [] + for name, robot in robots.items(): + if name in first_targets: + t = threading.Thread(target=_move_one, args=(name, robot, first_targets[name]), daemon=True) + t.start() + threads.append(t) + for t in threads: + t.join(timeout=5.0) + + # Stream recorded commands + logger.info("Starting replay...") + effective_hz = (n_steps / duration) * speed if duration > 0 else 30.0 + rate = Rate(effective_hz, rate_name="replay") + + for step_idx in range(n_steps): + if _shutdown_requested: + logger.info("Shutdown requested, stopping replay.") + break + + for arm_name in arm_names: + if arm_name not in robots: + continue + + if use_actions and arm_name in episode_data["actions"] and "pos" in episode_data["actions"][arm_name]: + target = episode_data["actions"][arm_name]["pos"][step_idx] + else: + jp = episode_data["arms"][arm_name]["joint_pos"][step_idx] + gp = episode_data["arms"][arm_name].get("gripper_pos", None) + if gp is not None and step_idx < len(gp): + target = np.concatenate([jp, gp[step_idx]]) + else: + target = jp + + robots[arm_name].command_joint_pos(target) + + rate.sleep() + + # Progress update every second + if step_idx % max(1, int(effective_hz)) == 0: + progress = (step_idx + 1) / n_steps * 100 + logger.info("Replay progress: {:.0f}% ({}/{})", progress, step_idx + 1, n_steps) + + logger.info("Replay complete.") + + except KeyboardInterrupt: + logger.info("Replay interrupted.") + except Exception as e: + logger.error("Replay error: {}", e) + raise + finally: + # Return to safe position + for _name, robot in robots.items(): + try: + robot.soft_release(2.0) + except Exception: + try: + robot.zero_torque_mode() + except Exception: + pass + + cleanup_processes(None, server_processes) + signal.signal(signal.SIGINT, original_sigint) diff --git a/limb/visualization/app.py b/limb/visualization/app.py new file mode 100644 index 0000000..2d8fb5f --- /dev/null +++ b/limb/visualization/app.py @@ -0,0 +1,139 @@ +"""ViserApp — central coordinator for viser-based visualization panels. + +Owns a single ViserServer and provides a registration API for panels. +Each panel is a self-contained component responsible for its own GUI +folder and/or scene elements. Panels are added declaratively:: + + app = ViserApp() + app.add_panel("cameras", CameraPanel(image_size=224)) + app.add_panel("urdf", URDFPanel(bimanual=True)) + app.add_panel("session", SessionPanel()) + + # In the control loop: + app.update(obs) + + # On shutdown: + app.close() + +Threading model +--------------- +- **Main thread** calls ``update(obs)`` synchronously from the control loop + (~100 Hz). All panel ``update()`` methods run on the main thread. +- **Viser server thread** (internal to viser) handles WebSocket I/O and fires + ``on_click`` / ``on_update`` GUI callbacks on its own thread. Panels that + receive callbacks (SessionPanel, RecordingPanel) must use thread-safe + structures (``deque``, ``threading.Lock``) to bridge to the main thread. +- **Portal subprocess** (YamViserAgent) has its own ViserServer + panels in a + separate process. The main-process ViserApp is independent. + +Data flow:: + + Main thread (100 Hz control loop) + │ + ├─ agent.act(obs) Portal RPC + ├─ session.step(obs, act) trigger poll (deque.popleft) + recording + ├─ env.step(action) Portal RPC + └─ viser_app.update(obs) sequential fan-out to panels: + ├─ CameraPanel.update(obs) extract RGB, update thumbnails + ├─ URDFPanel.update(obs) update joint visualization + └─ RecordingPanel.update(obs) write frames if recording + + Viser server thread (async WebSocket) + │ + ├─ SessionPanel on_click → deque.append(signal) [thread-safe] + └─ RecordingPanel on_click → Lock + toggle recording [thread-safe] +""" + +from __future__ import annotations + +from typing import Any, Dict, Optional, Protocol, runtime_checkable + +import viser +from loguru import logger + + +@runtime_checkable +class ViserPanel(Protocol): + """A self-contained viser GUI/scene component. + + Panels are attached to a ViserServer and manage their own GUI elements + (sidebar folders) and/or scene elements (3D viewport objects). + """ + + def attach(self, server: viser.ViserServer) -> None: + """Create GUI/scene elements on the given server.""" + ... + + def detach(self) -> None: + """Clean up resources (release writers, close handles, etc.).""" + ... + + +@runtime_checkable +class ObservablePanel(ViserPanel, Protocol): + """A panel that receives observations from the control loop. + + ``update()`` is always called from the main thread, synchronously, + in panel registration order. + """ + + def update(self, obs: Any) -> None: + """Process a new observation (update images, URDF, etc.).""" + ... + + +class ViserApp: + """Central viser server owner with panel registration. + + Parameters + ---------- + port : int or None + Port for the viser web server. None = viser default (8080). + viser_server : ViserServer or None + Use an existing server instead of creating one (e.g. from an agent). + """ + + def __init__( + self, + port: Optional[int] = None, + viser_server: Optional[viser.ViserServer] = None, + ) -> None: + if viser_server is not None: + self.server = viser_server + elif port is not None: + self.server = viser.ViserServer(port=port) + else: + self.server = viser.ViserServer() + # Ordered dict (Python 3.7+) — panels update in registration order. + # Register CameraPanel before RecordingPanel so RGB is extracted + # before frames are written. + self._panels: Dict[str, ViserPanel] = {} + + def add_panel(self, name: str, panel: ViserPanel) -> None: + """Register and attach a panel to the viser server.""" + if name in self._panels: + logger.warning("Replacing existing panel: {}", name) + self._panels[name].detach() + self._panels[name] = panel + panel.attach(self.server) + + def get_panel(self, name: str) -> Optional[ViserPanel]: + """Get a registered panel by name.""" + return self._panels.get(name) + + def update(self, obs: Any) -> None: + """Fan out an observation to all ObservablePanel instances. + + Called from the main thread only. Panels update in registration + order, which matters for data dependencies (e.g. CameraPanel + extracts RGB before RecordingPanel writes frames). + """ + for panel in self._panels.values(): + if isinstance(panel, ObservablePanel): + panel.update(obs) + + def close(self) -> None: + """Detach all panels and clean up.""" + for panel in self._panels.values(): + panel.detach() + self._panels.clear() diff --git a/limb/visualization/panels/__init__.py b/limb/visualization/panels/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/limb/visualization/panels/camera_panel.py b/limb/visualization/panels/camera_panel.py new file mode 100644 index 0000000..dcfb77c --- /dev/null +++ b/limb/visualization/panels/camera_panel.py @@ -0,0 +1,53 @@ +"""Camera feed panel — displays camera thumbnails in the viser sidebar.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +import numpy as np +import viser + +from limb.sensors.cameras.camera_utils import obs_get_rgb, resize_with_pad + + +@dataclass +class CameraPanel: + """Shows camera RGB thumbnails in the viser GUI sidebar. + + Implements ObservablePanel: attach(server), update(obs), detach(). + """ + + image_size: int = 224 + + _server: Optional[viser.ViserServer] = field(default=None, init=False, repr=False) + _cam_img_handles: Dict[str, Any] = field(default_factory=dict, init=False, repr=False) + _latest_rgb: Dict[str, np.ndarray] = field(default_factory=dict, init=False, repr=False) + + def attach(self, server: viser.ViserServer) -> None: + self._server = server + + def update(self, obs: Any) -> None: + """Extract RGB images from observation and update thumbnails.""" + if self._server is None: + return + rgb_images = obs_get_rgb(obs) + if not rgb_images: + return + + self._latest_rgb = rgb_images + + for cam_name, img in rgb_images.items(): + thumb = resize_with_pad(img, self.image_size, self.image_size) + if cam_name not in self._cam_img_handles: + self._cam_img_handles[cam_name] = self._server.gui.add_image(thumb, label=cam_name) + else: + self._cam_img_handles[cam_name].image = thumb + + def get_latest_rgb(self) -> Dict[str, np.ndarray]: + """Get the most recent RGB images (used by RecordingPanel).""" + return self._latest_rgb + + def detach(self) -> None: + self._cam_img_handles.clear() + self._latest_rgb.clear() diff --git a/limb/visualization/panels/loop_status_panel.py b/limb/visualization/panels/loop_status_panel.py new file mode 100644 index 0000000..df7947e --- /dev/null +++ b/limb/visualization/panels/loop_status_panel.py @@ -0,0 +1,38 @@ +"""Loop status panel — minimal Hz and step count display in the viser sidebar.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Optional + +import viser + + +@dataclass +class LoopStatusPanel: + """Shows control loop Hz and step count in the viser GUI. + + Implements ViserPanel: attach(server), detach(). + Also provides update_loop(hz, step) for StatusDisplay compatibility. + """ + + _server: Optional[viser.ViserServer] = field(default=None, init=False, repr=False) + _hz_handle: object = field(default=None, init=False, repr=False) + + def attach(self, server: viser.ViserServer) -> None: + self._server = server + self._hz_handle = server.gui.add_text("Loop", initial_value="-- Hz | step 0", disabled=True) + + def update_loop(self, hz: float, step: int) -> None: + if self._hz_handle is not None: + self._hz_handle.value = f"{hz:.1f} Hz | step {step}" + + # StatusDisplay compat stubs + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + def detach(self) -> None: + pass diff --git a/limb/visualization/panels/recording_panel.py b/limb/visualization/panels/recording_panel.py new file mode 100644 index 0000000..c5d80f4 --- /dev/null +++ b/limb/visualization/panels/recording_panel.py @@ -0,0 +1,115 @@ +"""Recording panel — simple Start/Stop recording button in the viser sidebar. + +Used in non-session mode (standalone teleop). For data collection sessions, +use SessionPanel instead. + +Threading model: +- ``update(obs)`` is called from the **main thread** (100 Hz control loop). + It writes video frames while holding ``_lock``. +- ``_toggle()`` is called from the **viser server thread** (on_click callback). + It also holds ``_lock`` to safely start/stop recording. +- ``get_latest_rgb`` callback reads from CameraPanel which is updated on the + main thread before this panel (registration order guarantees this). +""" + +from __future__ import annotations + +import threading +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional + +import cv2 +import viser +from loguru import logger + +from limb.sensors.cameras.camera_utils import obs_get_rgb + + +@dataclass +class RecordingPanel: + """One-button video recording toggle in the viser GUI. + + Implements ObservablePanel: attach(server), update(obs), detach(). + + The ``update(obs)`` method extracts RGB from the observation and writes + frames to video when recording is active. This is called from the main + thread, after CameraPanel has already processed the same observation. + """ + + recording_fps: int = 30 + + _server: Optional[viser.ViserServer] = field(default=None, init=False, repr=False) + _button: Optional[Any] = field(default=None, init=False, repr=False) + _recording: bool = field(default=False, init=False, repr=False) + _writers: Dict[str, cv2.VideoWriter] = field(default_factory=dict, init=False, repr=False) + _record_dir: Optional[Path] = field(default=None, init=False, repr=False) + _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False) + + def attach(self, server: viser.ViserServer) -> None: + self._server = server + self._button = server.gui.add_button("Start Recording", color="green") + + @self._button.on_click + def _(_event: viser.GuiEvent) -> None: + self._toggle() + + def update(self, obs: Any) -> None: + """Extract RGB from obs and write frames if recording. + + Called from main thread at control loop rate. + """ + with self._lock: + if not self._recording: + return + + # Extract outside lock — obs_get_rgb is read-only + rgb_images = obs_get_rgb(obs) + if not rgb_images: + return + + with self._lock: + if not self._recording: + return + for cam_name, img in rgb_images.items(): + writer = self._writers.get(cam_name) + if writer is None: + h, w = img.shape[:2] + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + path = str(self._record_dir / f"{cam_name}.mp4") + writer = cv2.VideoWriter(path, fourcc, self.recording_fps, (w, h)) + self._writers[cam_name] = writer + writer.write(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) + + def _toggle(self) -> None: + """Toggle recording on/off. Called from viser server thread.""" + with self._lock: + if not self._recording: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + self._record_dir = Path("recordings") / f"recording_{ts}" + self._record_dir.mkdir(parents=True, exist_ok=True) + + self._recording = True + self._button.name = "Stop Recording" + self._button.color = "red" + logger.info("Recording started -> {}", self._record_dir) + else: + for w in self._writers.values(): + w.release() + logger.info("Recording saved to {}", self._record_dir) + self._writers.clear() + self._record_dir = None + self._recording = False + self._button.name = "Start Recording" + self._button.color = "green" + + def detach(self) -> None: + """Stop recording and release all writers.""" + with self._lock: + if self._recording: + for w in self._writers.values(): + w.release() + logger.info("Recording saved to {}", self._record_dir) + self._writers.clear() + self._recording = False diff --git a/limb/visualization/panels/session_panel.py b/limb/visualization/panels/session_panel.py new file mode 100644 index 0000000..8bdeb65 --- /dev/null +++ b/limb/visualization/panels/session_panel.py @@ -0,0 +1,162 @@ +"""Session panel — data collection controls and status in the viser sidebar. + +Implements both the display interface (update_loop, update_session) and +TriggerSource protocol (get_signal, close) so GUI buttons feed signals +into the session alongside keyboard/pedal triggers. +""" + +from __future__ import annotations + +import collections +from dataclasses import dataclass, field +from typing import Optional + +import viser +from loguru import logger + +from limb.recording.trigger import TriggerSignal +from limb.tui import SessionState + + +@dataclass +class SessionPanel: + """Viser GUI panel for data collection session management. + + Display interface (same as StatusDisplay): + - update_loop(hz, step) + - update_session(state) + + TriggerSource protocol: + - get_signal() -> TriggerSignal | None + - close() + + ViserPanel protocol: + - attach(server) + - detach() + """ + + _server: Optional[viser.ViserServer] = field(default=None, init=False, repr=False) + _signal_queue: collections.deque = field(default_factory=lambda: collections.deque(maxlen=16), init=False) + _recording: bool = field(default=False, init=False) + _attached: bool = field(default=False, init=False) + + # GUI handles + _hz_handle: object = field(default=None, init=False, repr=False) + _status_handle: object = field(default=None, init=False, repr=False) + _episode_handle: object = field(default=None, init=False, repr=False) + _task_handle: object = field(default=None, init=False, repr=False) + _controls_handle: object = field(default=None, init=False, repr=False) + _btn_start_stop: object = field(default=None, init=False, repr=False) + + def attach(self, server: viser.ViserServer) -> None: + """Create GUI elements in the viser sidebar.""" + if self._attached: + return + self._server = server + self._attached = True + gui = server.gui + + with gui.add_folder("Data Collection", expand_by_default=True): + self._hz_handle = gui.add_text("Loop", initial_value="-- Hz | step 0", disabled=True) + self._status_handle = gui.add_text("Status", initial_value="IDLE", disabled=True) + self._episode_handle = gui.add_text("Episode", initial_value="0 / 0", disabled=True) + self._task_handle = gui.add_text("Task", initial_value="(none)", disabled=True) + + self._btn_start_stop = gui.add_button("Start", color="green") + btn_success = gui.add_button("Mark Success", color="blue") + btn_discard = gui.add_button("Discard", color="orange") + btn_quit = gui.add_button("Quit Session", color="red") + + self._controls_handle = gui.add_text("Controls", initial_value="", disabled=True) + + @self._btn_start_stop.on_click + def _(_event: viser.GuiEvent) -> None: + self._signal_queue.append(TriggerSignal.START_STOP) + + @btn_success.on_click + def _(_event: viser.GuiEvent) -> None: + self._signal_queue.append(TriggerSignal.SUCCESS) + + @btn_discard.on_click + def _(_event: viser.GuiEvent) -> None: + self._signal_queue.append(TriggerSignal.DISCARD) + + @btn_quit.on_click + def _(_event: viser.GuiEvent) -> None: + self._signal_queue.append(TriggerSignal.QUIT) + + logger.info("SessionPanel attached to viser GUI") + + # Alias for StatusDisplay compatibility + def start(self) -> None: + """No-op — attach() does the work. Kept for StatusDisplay compat.""" + + def stop(self) -> None: + """No-op — detach() does cleanup.""" + + # ── Display interface ── + + def update_loop(self, hz: float, step: int) -> None: + if self._hz_handle is not None: + self._hz_handle.value = f"{hz:.1f} Hz | step {step}" + + def update_session(self, state: SessionState) -> None: + if not self._attached: + return + + # Status + if state.countdown_remaining > 0: + status = f"COUNTDOWN {state.countdown_remaining:.0f}s" + elif state.recording: + status = f"RECORDING {state.episode_duration_s:.0f}s" + else: + status = "IDLE" + if self._status_handle is not None: + self._status_handle.value = status + + # Episode progress + target = str(state.episode_total) if state.episode_total > 0 else "\u221e" + ep_text = f"{state.episode_current} / {target}" + if state.episodes_successful > 0: + ep_text += f" ({state.episodes_successful} success)" + if state.episodes_discarded > 0: + ep_text += f" ({state.episodes_discarded} discarded)" + if self._episode_handle is not None: + self._episode_handle.value = ep_text + + # Task + if self._task_handle is not None: + task_display = state.task_instruction or "(none)" + if len(task_display) > 60: + task_display = task_display[:57] + "..." + self._task_handle.value = task_display + + # Controls hint + if self._controls_handle is not None and state.controls_hint: + self._controls_handle.value = state.controls_hint + + # Toggle start/stop button + if state.recording and not self._recording: + if self._btn_start_stop is not None: + self._btn_start_stop.name = "Stop" + self._btn_start_stop.color = "red" + self._recording = True + elif not state.recording and self._recording: + if self._btn_start_stop is not None: + self._btn_start_stop.name = "Start" + self._btn_start_stop.color = "green" + self._recording = False + + # ── TriggerSource protocol ── + + def get_signal(self) -> Optional[TriggerSignal]: + try: + return self._signal_queue.popleft() + except IndexError: + return None + + def close(self) -> None: + pass + + def detach(self) -> None: + pass diff --git a/limb/visualization/panels/urdf_panel.py b/limb/visualization/panels/urdf_panel.py new file mode 100644 index 0000000..438d0dd --- /dev/null +++ b/limb/visualization/panels/urdf_panel.py @@ -0,0 +1,82 @@ +"""URDF panel — 3D robot visualization in the viser scene.""" + +from __future__ import annotations + +import os +from copy import deepcopy +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +import numpy as np +import viser +import viser.extras + +from limb.core.observation import Observation + + +@dataclass +class URDFPanel: + """Shows the YAM URDF in the viser 3D viewport, updated from observations. + + Implements ObservablePanel: attach(server), update(obs), detach(). + """ + + bimanual: bool = False + right_arm_extrinsic: Optional[Dict[str, Any]] = None + + _server: Optional[viser.ViserServer] = field(default=None, init=False, repr=False) + _urdf_vis_left: Optional[viser.extras.ViserUrdf] = field(default=None, init=False, repr=False) + _urdf_vis_right: Optional[viser.extras.ViserUrdf] = field(default=None, init=False, repr=False) + + def attach(self, server: viser.ViserServer) -> None: + self._server = server + self._setup_urdf() + + def _setup_urdf(self) -> None: + import yourdfpy + + from limb import ROOT_PATH + + yam_models = os.path.join(ROOT_PATH, "dependencies", "i2rt", "i2rt", "robot_models", "yam") + urdf_path = os.path.join(yam_models, "yam.urdf") + mesh_dir = os.path.join(yam_models, "assets") + + urdf = yourdfpy.URDF.load(urdf_path, mesh_dir=mesh_dir) + + self._server.scene.add_frame("/base", show_axes=False) + self._urdf_vis_left = viser.extras.ViserUrdf(self._server, urdf, root_node_name="/base") + self._server.scene.add_grid("ground", width=2, height=2, cell_size=0.1) + + if self.bimanual and self.right_arm_extrinsic is not None: + right_frame = self._server.scene.add_frame("/base/base_right", show_axes=False) + right_frame.position = tuple(self.right_arm_extrinsic.get("position", [0, -0.61, 0])) + if "rotation" in self.right_arm_extrinsic: + right_frame.wxyz = tuple(self.right_arm_extrinsic["rotation"]) + self._urdf_vis_right = viser.extras.ViserUrdf( + self._server, deepcopy(urdf), root_node_name="/base/base_right" + ) + + def update(self, obs: Any) -> None: + """Update URDF joint visualization from observation.""" + if self._urdf_vis_left is None: + return + + if isinstance(obs, Observation): + left = obs.arms.get("left") + left_jp = left.joint_pos if left is not None else None + right_data = obs.arms.get("right") + right_jp = right_data.joint_pos if right_data is not None else None + else: + left = obs.get("left") + left_jp = left["joint_pos"] if isinstance(left, dict) and "joint_pos" in left else None + right = obs.get("right") + right_jp = right["joint_pos"] if isinstance(right, dict) and "joint_pos" in right else None + + if left_jp is not None: + self._urdf_vis_left.update_cfg(np.flip(left_jp[:6])) + if self.bimanual and self._urdf_vis_right is not None and right_jp is not None: + self._urdf_vis_right.update_cfg(np.flip(right_jp[:6])) + + def detach(self) -> None: + self._urdf_vis_left = None + self._urdf_vis_right = None diff --git a/limb/visualization/viser_monitor.py b/limb/visualization/viser_monitor.py index b0b5316..8972aa3 100644 --- a/limb/visualization/viser_monitor.py +++ b/limb/visualization/viser_monitor.py @@ -1,46 +1,28 @@ -""" -Reusable camera-feed, URDF visualization, and recording monitor backed by a Viser GUI. +"""ViserMonitor — backward-compatible wrapper around ViserApp + panels. -Can be given an existing ViserServer (e.g. from YamViserAgent) or will -create its own (standalone mode for GELLO / VR agents). +Preserved for backward compatibility with existing configs and code that +creates ViserMonitor directly. New code should use ViserApp + individual +panels instead. """ -import os -import threading -from copy import deepcopy -from datetime import datetime -from pathlib import Path +from __future__ import annotations + from typing import Any, Dict, Optional -import cv2 -import numpy as np import viser -import viser.extras -from loguru import logger -from limb.core.observation import Observation -from limb.sensors.cameras.camera_utils import obs_get_rgb, resize_with_pad +from limb.visualization.app import ViserApp +from limb.visualization.panels.camera_panel import CameraPanel +from limb.visualization.panels.recording_panel import RecordingPanel +from limb.visualization.panels.urdf_panel import URDFPanel class ViserMonitor: - """Viser-based monitoring UI for camera feeds, URDF visualization, and recording. + """Backward-compatible wrapper that composes CameraPanel + URDFPanel + RecordingPanel. - Parameters - ---------- - viser_server : optional - Existing ViserServer to attach GUI elements to. If *None*, a new - server is created automatically. - image_size : int - Width/height of the square thumbnails shown in the GUI. - recording_fps : int - Frame rate written into recorded video files. - enable_urdf : bool - If True, load the YAM URDF and show the real robot state in 3-D. - bimanual : bool - Whether to visualize two arms (only used when *enable_urdf* is True). - right_arm_extrinsic : dict | None - ``{"position": [x,y,z], "rotation": [w,x,y,z]}`` offset for the - right arm base frame (only used when *bimanual* is True). + New code should use ViserApp + panels directly. This class exists so that + external code (e.g. scripts, custom agents) that creates ViserMonitor + continues to work. """ def __init__( @@ -52,156 +34,19 @@ def __init__( bimanual: bool = False, right_arm_extrinsic: Optional[Dict[str, Any]] = None, ) -> None: - self.viser_server = viser_server if viser_server is not None else viser.ViserServer() - self._image_size = image_size - self._recording_fps = recording_fps - self._bimanual = bimanual - self._right_arm_extrinsic = right_arm_extrinsic - - self._recording = False - self._writers: Dict[str, cv2.VideoWriter] = {} - self._record_dir: Optional[Path] = None - self._lock = threading.Lock() + self._app = ViserApp(viser_server=viser_server) if viser_server is not None else ViserApp() + self.viser_server = self._app.server - self._cam_img_handles: Dict[str, Any] = {} - self._latest_rgb: Dict[str, np.ndarray] = {} - - # URDF visualization handles (populated by _setup_urdf) - self._urdf_vis_left: Optional[viser.extras.ViserUrdf] = None - self._urdf_vis_right: Optional[viser.extras.ViserUrdf] = None + self._app.add_panel("cameras", CameraPanel(image_size=image_size)) if enable_urdf: - self._setup_urdf() - - self._record_button = self.viser_server.gui.add_button("Start Recording", color="green") - - @self._record_button.on_click - def _(_event: viser.GuiEvent) -> None: - self._toggle_recording() - - # ------------------------------------------------------------------ # - # URDF setup - # ------------------------------------------------------------------ # - - def _setup_urdf(self) -> None: - """Load the YAM URDF and add it to the Viser scene.""" - import yourdfpy - - from limb import ROOT_PATH - - yam_models = os.path.join(ROOT_PATH, "dependencies", "i2rt", "i2rt", "robot_models", "yam") - urdf_path = os.path.join(yam_models, "yam.urdf") - mesh_dir = os.path.join(yam_models, "assets") - - urdf = yourdfpy.URDF.load(urdf_path, mesh_dir=mesh_dir) - - self.viser_server.scene.add_frame("/base", show_axes=False) - self._urdf_vis_left = viser.extras.ViserUrdf(self.viser_server, urdf, root_node_name="/base") - self.viser_server.scene.add_grid("ground", width=2, height=2, cell_size=0.1) + self._app.add_panel("urdf", URDFPanel(bimanual=bimanual, right_arm_extrinsic=right_arm_extrinsic)) - if self._bimanual and self._right_arm_extrinsic is not None: - right_frame = self.viser_server.scene.add_frame("/base/base_right", show_axes=False) - right_frame.position = tuple(self._right_arm_extrinsic.get("position", [0, -0.61, 0])) - if "rotation" in self._right_arm_extrinsic: - right_frame.wxyz = tuple(self._right_arm_extrinsic["rotation"]) - self._urdf_vis_right = viser.extras.ViserUrdf( - self.viser_server, deepcopy(urdf), root_node_name="/base/base_right" - ) - - # ------------------------------------------------------------------ # - # Public API - # ------------------------------------------------------------------ # + self._app.add_panel("recording", RecordingPanel(recording_fps=recording_fps)) def update(self, obs: Any) -> None: - """Feed a new observation into the monitor (thread-safe). - - Accepts either a typed ``Observation`` (from the main process) or a - plain dict (when embedded inside an agent subprocess via portal RPC). - - Extracts RGB images, updates the Viser GUI thumbnails, updates URDF - joint visualization, and writes video frames when recording. - """ - # --- URDF visualization --- - if self._urdf_vis_left is not None: - if isinstance(obs, Observation): - left = obs.arms.get("left") - left_jp = left.joint_pos if left is not None else None - right_data = obs.arms.get("right") - right_jp = right_data.joint_pos if right_data is not None else None - else: - left = obs.get("left") - left_jp = left["joint_pos"] if isinstance(left, dict) and "joint_pos" in left else None - right = obs.get("right") - right_jp = right["joint_pos"] if isinstance(right, dict) and "joint_pos" in right else None - if left_jp is not None: - self._urdf_vis_left.update_cfg(np.flip(left_jp[:6])) - if self._bimanual and self._urdf_vis_right is not None and right_jp is not None: - self._urdf_vis_right.update_cfg(np.flip(right_jp[:6])) - - # --- Camera feeds --- - rgb_images = obs_get_rgb(obs) - if not rgb_images: - return - - self._latest_rgb = rgb_images - - for cam_name, img in rgb_images.items(): - thumb = resize_with_pad(img, self._image_size, self._image_size) - if cam_name not in self._cam_img_handles: - self._cam_img_handles[cam_name] = self.viser_server.gui.add_image(thumb, label=cam_name) - else: - self._cam_img_handles[cam_name].image = thumb - - # --- Recording --- - with self._lock: - if self._recording: - for cam_name, img in rgb_images.items(): - writer = self._writers.get(cam_name) - if writer is None: - h, w = img.shape[:2] - fourcc = cv2.VideoWriter_fourcc(*"mp4v") - path = str(self._record_dir / f"{cam_name}.mp4") - writer = cv2.VideoWriter(path, fourcc, self._recording_fps, (w, h)) - self._writers[cam_name] = writer - writer.write(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) + """Feed a new observation — fans out to all panels.""" + self._app.update(obs) def close(self) -> None: - """Release all video writers and stop recording.""" - with self._lock: - if self._recording: - for w in self._writers.values(): - w.release() - logger.info(f"Recording saved to {self._record_dir}") - self._writers.clear() - self._recording = False - - # ------------------------------------------------------------------ # - # Internal - # ------------------------------------------------------------------ # - - def _toggle_recording(self) -> None: - with self._lock: - if not self._recording: - ts = datetime.now().strftime("%Y%m%d_%H%M%S") - self._record_dir = Path("recordings") / f"recording_{ts}" - self._record_dir.mkdir(parents=True, exist_ok=True) - - fourcc = cv2.VideoWriter_fourcc(*"mp4v") - for cam_name, img in self._latest_rgb.items(): - h, w = img.shape[:2] - path = str(self._record_dir / f"{cam_name}.mp4") - self._writers[cam_name] = cv2.VideoWriter(path, fourcc, self._recording_fps, (w, h)) - - self._recording = True - self._record_button.name = "Stop Recording" - self._record_button.color = "red" - logger.info(f"Recording started -> {self._record_dir}") - else: - for w in self._writers.values(): - w.release() - logger.info(f"Recording saved to {self._record_dir}") - self._writers.clear() - self._record_dir = None - self._recording = False - self._record_button.name = "Start Recording" - self._record_button.color = "green" + self._app.close() diff --git a/pyproject.toml b/pyproject.toml index af2cfcb..6e1cc07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,9 @@ where = ["."] include = ["limb*"] exclude = ["configs*"] +[project.scripts] +limb = "limb.cli:cli_main" + [project.urls] Repository = "https://github.com/TToTMooN/limb" diff --git a/scripts/data/convert_to_lerobot.py b/scripts/data/convert_to_lerobot.py index dd6bef0..c46262b 100644 --- a/scripts/data/convert_to_lerobot.py +++ b/scripts/data/convert_to_lerobot.py @@ -1,389 +1,15 @@ """Convert limb raw recordings to LeRobot v2.1 dataset format. -No lerobot dependency required — only uses pyarrow and standard lib. +Thin wrapper — core logic lives in limb.data.convert_lerobot. Usage: - # Convert a single session (directory containing episode_* subdirs) - uv run scripts/data/convert_to_lerobot.py --input_dir recordings/red_cube_task --output_dir datasets/red_cube - - # Only include successful episodes - uv run scripts/data/convert_to_lerobot.py --input_dir recordings/red_cube_task --output_dir datasets/red_cube --success_only - -LeRobot v2.1 output structure:: - - datasets/red_cube/ - meta/ - info.json - stats.json - episodes.jsonl - tasks.jsonl - data/ - chunk-000/ - episode_000000.parquet - episode_000001.parquet - videos/ - observation.images.left_wrist_camera/ - episode_000000.mp4 - observation.images.right_wrist_camera/ - episode_000000.mp4 + uv run scripts/data/convert_to_lerobot.py --input-dir recordings/task --output-dir datasets/task + uv run limb convert-lerobot --input-dir recordings/task --output-dir datasets/task """ -from __future__ import annotations - -import json -import shutil -from dataclasses import dataclass -from pathlib import Path -from typing import Dict, List, Optional - -import numpy as np import tyro -from loguru import logger - - -@dataclass -class Args: - input_dir: str - output_dir: str - task: Optional[str] = None # override task instruction from metadata - robot_type: str = "yam" - fps: int = 30 - success_only: bool = False - push_to_hub: Optional[str] = None # HuggingFace repo id, e.g. "username/dataset-name" - - -def _find_episodes(input_dir: Path, success_only: bool) -> List[Path]: - """Find episode directories, sorted by name.""" - episodes = sorted(p for p in input_dir.iterdir() if p.is_dir() and p.name.startswith("episode_")) - if success_only: - episodes = [ep for ep in episodes if (ep / "SUCCESS").exists()] - return episodes - - -def _load_episode(episode_dir: Path) -> Dict: - """Load all data from a single episode directory.""" - data = {"dir": episode_dir} - - # Metadata - meta_path = episode_dir / "metadata.json" - if meta_path.exists(): - with open(meta_path) as f: - data["metadata"] = json.load(f) - else: - data["metadata"] = {} - - # Timestamps - ts_path = episode_dir / "timestamps.npy" - data["timestamps"] = np.load(str(ts_path)) if ts_path.exists() else None - - # Arm states and actions - data["arms"] = {} - for states_path in episode_dir.glob("*_states.npz"): - arm_name = states_path.stem.replace("_states", "") - arm_data = {"states": dict(np.load(str(states_path)))} - actions_path = episode_dir / f"{arm_name}_actions.npz" - if actions_path.exists(): - arm_data["actions"] = dict(np.load(str(actions_path))) - data["arms"][arm_name] = arm_data - - # Camera info - data["cameras"] = [] - for mp4 in sorted(episode_dir.glob("*.mp4")): - cam_name = mp4.stem - cam_ts_path = episode_dir / f"{cam_name}_timestamps.npy" - data["cameras"].append( - { - "name": cam_name, - "video_path": mp4, - "timestamps": np.load(str(cam_ts_path)) if cam_ts_path.exists() else None, - } - ) - - return data - - -def _build_state_vector(episode: Dict, arm_names: List[str]) -> np.ndarray: - """Concatenate arm states into a single state vector per timestep. - - Order: [left_joint_pos(6), left_gripper(1), right_joint_pos(6), right_gripper(1)] = 14 - """ - parts = [] - for arm_name in arm_names: - arm = episode["arms"][arm_name] - states = arm["states"] - parts.append(states["joint_pos"]) # (N, 6) - if "gripper_pos" in states: - parts.append(states["gripper_pos"]) # (N, 1) - return np.concatenate(parts, axis=1).astype(np.float32) - - -def _build_action_vector(episode: Dict, arm_names: List[str]) -> np.ndarray: - """Concatenate arm actions into a single action vector per timestep. - - Order: [left_pos(7), right_pos(7)] = 14 - """ - parts = [] - for arm_name in arm_names: - arm = episode["arms"][arm_name] - if "actions" in arm and "pos" in arm["actions"]: - parts.append(arm["actions"]["pos"]) # (N, 7) - return np.concatenate(parts, axis=1).astype(np.float32) if parts else np.empty((0, 0), dtype=np.float32) - - -def _build_state_names(arm_names: List[str], episode: Dict) -> List[str]: - """Build human-readable names for state vector dimensions.""" - names = [] - for arm_name in arm_names: - n_joints = episode["arms"][arm_name]["states"]["joint_pos"].shape[1] - for j in range(n_joints): - names.append(f"{arm_name}_joint_{j}") - if "gripper_pos" in episode["arms"][arm_name]["states"]: - names.append(f"{arm_name}_gripper") - return names - - -def _build_action_names(arm_names: List[str], episode: Dict) -> List[str]: - """Build human-readable names for action vector dimensions.""" - names = [] - for arm_name in arm_names: - if "actions" in episode["arms"][arm_name] and "pos" in episode["arms"][arm_name]["actions"]: - n_dims = episode["arms"][arm_name]["actions"]["pos"].shape[1] - for j in range(n_dims): - names.append(f"{arm_name}_action_{j}") - return names - - -def _compute_stats(all_states: List[np.ndarray], all_actions: List[np.ndarray]) -> Dict: - """Compute per-feature statistics (min, max, mean, std).""" - stats = {} - - if all_states: - cat_states = np.concatenate(all_states, axis=0) - stats["observation.state"] = { - "min": cat_states.min(axis=0).tolist(), - "max": cat_states.max(axis=0).tolist(), - "mean": cat_states.mean(axis=0).tolist(), - "std": cat_states.std(axis=0).tolist(), - "count": int(cat_states.shape[0]), - } - - if all_actions: - cat_actions = np.concatenate(all_actions, axis=0) - stats["action"] = { - "min": cat_actions.min(axis=0).tolist(), - "max": cat_actions.max(axis=0).tolist(), - "mean": cat_actions.mean(axis=0).tolist(), - "std": cat_actions.std(axis=0).tolist(), - "count": int(cat_actions.shape[0]), - } - - return stats - - -def main(args: Args) -> None: - try: - import pyarrow as pa - import pyarrow.parquet as pq - except ImportError: - logger.error("pyarrow not installed. Run: uv add pyarrow") - raise SystemExit(1) from None - - input_dir = Path(args.input_dir) - output_dir = Path(args.output_dir) - - episodes = _find_episodes(input_dir, args.success_only) - if not episodes: - logger.error("No episodes found in {}", input_dir) - raise SystemExit(1) - - logger.info("Found {} episodes in {}", len(episodes), input_dir) - - # Load first episode to determine structure - first_ep = _load_episode(episodes[0]) - arm_names = sorted(first_ep["arms"].keys()) - cam_names = [c["name"] for c in first_ep["cameras"]] - - # Determine task instruction - task = args.task or first_ep["metadata"].get("task_instruction", "") - - # Build feature names - state_names = _build_state_names(arm_names, first_ep) - action_names = _build_action_names(arm_names, first_ep) - state_dim = len(state_names) - action_dim = len(action_names) - - logger.info("Arms: {}, Cameras: {}", arm_names, cam_names) - logger.info("State dim: {} ({})", state_dim, state_names) - logger.info("Action dim: {} ({})", action_dim, action_names) - - # Create output directories - meta_dir = output_dir / "meta" - data_dir = output_dir / "data" / "chunk-000" - meta_dir.mkdir(parents=True, exist_ok=True) - data_dir.mkdir(parents=True, exist_ok=True) - - for cam_name in cam_names: - video_dir = output_dir / "videos" / f"observation.images.{cam_name}" / "chunk-000" - video_dir.mkdir(parents=True, exist_ok=True) - - # Process episodes - all_states: List[np.ndarray] = [] - all_actions: List[np.ndarray] = [] - episodes_meta: List[Dict] = [] - total_frames = 0 - - for ep_idx, ep_path in enumerate(episodes): - episode = _load_episode(ep_path) - n_steps = len(episode["timestamps"]) if episode["timestamps"] is not None else 0 - - if n_steps == 0: - logger.warning("Skipping empty episode: {}", ep_path.name) - continue - - states = _build_state_vector(episode, arm_names) - actions = _build_action_vector(episode, arm_names) - n_steps = min(len(states), len(actions)) if len(actions) > 0 else len(states) - states = states[:n_steps] - actions = actions[:n_steps] if len(actions) > 0 else np.zeros((n_steps, action_dim), dtype=np.float32) - - timestamps = ( - episode["timestamps"][:n_steps] - if episode["timestamps"] is not None - else np.arange(n_steps, dtype=np.float64) / args.fps - ) - - # Relative timestamps (from episode start) - rel_timestamps = (timestamps - timestamps[0]).astype(np.float32) - - all_states.append(states) - all_actions.append(actions) - - # Build parquet table for this episode - table_data = { - "index": pa.array(np.arange(total_frames, total_frames + n_steps, dtype=np.int64)), - "episode_index": pa.array(np.full(n_steps, ep_idx, dtype=np.int64)), - "frame_index": pa.array(np.arange(n_steps, dtype=np.int64)), - "timestamp": pa.array(rel_timestamps), - "task_index": pa.array(np.zeros(n_steps, dtype=np.int64)), - } - - # Add state columns (one column per dimension for LeRobot compat) - for i in range(state_dim): - table_data[f"observation.state.{i}"] = pa.array(states[:, i]) - - # Add action columns - for i in range(action_dim): - table_data[f"action.{i}"] = pa.array(actions[:, i]) - - table = pa.table(table_data) - parquet_path = data_dir / f"episode_{ep_idx:06d}.parquet" - pq.write_table(table, str(parquet_path)) - - # Copy videos - for cam in episode["cameras"]: - src = cam["video_path"] - dst = ( - output_dir / "videos" / f"observation.images.{cam['name']}" / "chunk-000" / f"episode_{ep_idx:06d}.mp4" - ) - shutil.copy2(str(src), str(dst)) - - episodes_meta.append( - { - "episode_index": ep_idx, - "tasks": [task], - "length": n_steps, - } - ) - total_frames += n_steps - logger.info(" Episode {}: {} steps from {}", ep_idx, n_steps, ep_path.name) - - # Write meta/tasks.jsonl - with open(meta_dir / "tasks.jsonl", "w") as f: - f.write(json.dumps({"task_index": 0, "task": task}) + "\n") - - # Write meta/episodes.jsonl - with open(meta_dir / "episodes.jsonl", "w") as f: - for ep in episodes_meta: - f.write(json.dumps(ep) + "\n") - - # Write meta/stats.json - stats = _compute_stats(all_states, all_actions) - with open(meta_dir / "stats.json", "w") as f: - json.dump(stats, f, indent=2) - - # Write meta/info.json - features = { - "observation.state": { - "dtype": "float32", - "shape": [state_dim], - "names": state_names, - }, - "action": { - "dtype": "float32", - "shape": [action_dim], - "names": action_names, - }, - "timestamp": {"dtype": "float32", "shape": [1], "names": None}, - "frame_index": {"dtype": "int64", "shape": [1], "names": None}, - "episode_index": {"dtype": "int64", "shape": [1], "names": None}, - "index": {"dtype": "int64", "shape": [1], "names": None}, - "task_index": {"dtype": "int64", "shape": [1], "names": None}, - } - for cam_name in cam_names: - features[f"observation.images.{cam_name}"] = { - "dtype": "video", - "shape": [480, 640, 3], # placeholder, actual dims in video - "names": ["height", "width", "channels"], - "info": {"video.fps": args.fps, "video.codec": "mp4v"}, - } - - info = { - "codebase_version": "v2.1", - "robot_type": args.robot_type, - "total_episodes": len(episodes_meta), - "total_frames": total_frames, - "total_tasks": 1, - "fps": args.fps, - "splits": {"train": f"0:{len(episodes_meta)}"}, - "data_path": "data/chunk-{chunk_index:03d}/episode_{episode_index:06d}.parquet", - "video_path": "videos/{video_key}/chunk-{chunk_index:03d}/episode_{episode_index:06d}.mp4", - "chunks_size": 1000, - "features": features, - } - with open(meta_dir / "info.json", "w") as f: - json.dump(info, f, indent=2) - - logger.info("=" * 50) - logger.info("LeRobot dataset written to: {}", output_dir) - logger.info(" Episodes: {}", len(episodes_meta)) - logger.info(" Total frames: {}", total_frames) - logger.info(" State dim: {}", state_dim) - logger.info(" Action dim: {}", action_dim) - logger.info(" Cameras: {}", cam_names) - - # Upload to HuggingFace Hub - if args.push_to_hub: - _push_to_hub(output_dir, args.push_to_hub) - - -def _push_to_hub(dataset_dir: Path, repo_id: str) -> None: - """Upload dataset to HuggingFace Hub.""" - try: - from huggingface_hub import HfApi - except ImportError: - logger.error("huggingface_hub not installed. Run: uv pip install huggingface-hub") - raise SystemExit(1) from None - - api = HfApi() - logger.info("Uploading to HuggingFace Hub: {}", repo_id) - api.create_repo(repo_id, repo_type="dataset", exist_ok=True) - api.upload_folder( - folder_path=str(dataset_dir), - repo_id=repo_id, - repo_type="dataset", - ) - logger.info("Uploaded: https://huggingface.co/datasets/{}", repo_id) +from limb.data.convert_lerobot import Args, main if __name__ == "__main__": main(tyro.cli(Args)) diff --git a/scripts/data/visualize_episode.py b/scripts/data/visualize_episode.py index bcdaa7d..1273797 100644 --- a/scripts/data/visualize_episode.py +++ b/scripts/data/visualize_episode.py @@ -1,194 +1,15 @@ """Visualize a recorded episode using Rerun. -Displays joint trajectories, gripper state, EE pose, and camera video -in a synchronized timeline viewer. +Thin wrapper — core logic lives in limb.data.visualize_episode. Usage: - uv run scripts/data/visualize_episode.py --episode_dir recordings/episode_20260304_153045_0001 - uv run scripts/data/visualize_episode.py --episode_dir recordings/episode_20260304_153045_0001 --no-video + uv run scripts/data/visualize_episode.py --episode-dir recordings/episode_... + uv run limb visualize --episode-dir recordings/episode_... """ -from __future__ import annotations - -import json -from dataclasses import dataclass -from pathlib import Path - -import numpy as np import tyro -from loguru import logger - - -@dataclass -class Args: - episode_dir: str - no_video: bool = False - timeline_name: str = "step" - - -def main(args: Args) -> None: - try: - import rerun as rr - import rerun.blueprint as rrb - except ImportError: - logger.error("rerun-sdk not installed. Run: uv add rerun-sdk") - raise SystemExit(1) from None - - episode_dir = Path(args.episode_dir) - if not episode_dir.exists(): - logger.error("Episode directory not found: {}", episode_dir) - raise SystemExit(1) - - # Load metadata - metadata_path = episode_dir / "metadata.json" - if metadata_path.exists(): - with open(metadata_path) as f: - metadata = json.load(f) - logger.info("Episode: {} steps, {:.1f}s", metadata.get("num_steps", "?"), metadata.get("duration_s", 0)) - else: - metadata = {} - - # Load timestamps - timestamps_path = episode_dir / "timestamps.npy" - timestamps = np.load(str(timestamps_path)) if timestamps_path.exists() else None - - # Load all arm data - arm_names = metadata.get("arms", []) - if not arm_names: - arm_names = sorted(p.stem.replace("_states", "") for p in episode_dir.glob("*_states.npz")) - - all_arms = {} - for arm_name in arm_names: - states_path = episode_dir / f"{arm_name}_states.npz" - if not states_path.exists(): - continue - arm = {"states": dict(np.load(str(states_path)))} - actions_path = episode_dir / f"{arm_name}_actions.npz" - if actions_path.exists(): - arm["actions"] = dict(np.load(str(actions_path))) - all_arms[arm_name] = arm - logger.info( - "Logging {}: {} steps, keys={}", - arm_name, - len(next(iter(arm["states"].values()))), - list(arm["states"].keys()), - ) - - # Build blueprint — explicit layout with one TimeSeriesView per (arm, signal) - views = [] - for arm_name in arm_names: - if arm_name not in all_arms: - continue - states = all_arms[arm_name]["states"] - if "joint_pos" in states: - views.append(rrb.TimeSeriesView(name=f"{arm_name} joint_pos", origin=f"{arm_name}/joint_pos")) - if "joint_vel" in states: - views.append(rrb.TimeSeriesView(name=f"{arm_name} joint_vel", origin=f"{arm_name}/joint_vel")) - if "gripper_pos" in states: - views.append(rrb.TimeSeriesView(name=f"{arm_name} gripper", origin=f"{arm_name}/gripper")) - if "actions" in all_arms[arm_name] and "pos" in all_arms[arm_name]["actions"]: - views.append(rrb.TimeSeriesView(name=f"{arm_name} action", origin=f"{arm_name}/action")) - - # Add camera views - cam_names = [] - if not args.no_video: - cam_names = metadata.get("cameras", []) - if not cam_names: - cam_names = [p.stem for p in episode_dir.glob("*.mp4")] - for cam_name in cam_names: - views.append(rrb.Spatial2DView(name=cam_name, origin=f"cameras/{cam_name}")) - - blueprint = rrb.Blueprint(rrb.Grid(*views)) - - # Spawn viewer and send blueprint - rr.init(f"limb — {episode_dir.name}", spawn=True, default_blueprint=blueprint) - rr.send_blueprint(blueprint) - - # Set up series styling - for arm_name, arm in all_arms.items(): - states = arm["states"] - if "joint_pos" in states: - for j in range(states["joint_pos"].shape[1]): - rr.log(f"{arm_name}/joint_pos/j{j}", rr.SeriesLines(names=f"j{j}"), static=True) - if "joint_vel" in states: - for j in range(states["joint_vel"].shape[1]): - rr.log(f"{arm_name}/joint_vel/j{j}", rr.SeriesLines(names=f"j{j}"), static=True) - if "gripper_pos" in states: - rr.log(f"{arm_name}/gripper/pos", rr.SeriesLines(names="gripper"), static=True) - if "actions" in arm and "pos" in arm["actions"]: - for j in range(arm["actions"]["pos"].shape[1]): - rr.log(f"{arm_name}/action/j{j}", rr.SeriesLines(names=f"j{j}"), static=True) - - # Log time-series data - n_steps = ( - len(timestamps) - if timestamps is not None - else max(len(next(iter(arm["states"].values()))) for arm in all_arms.values()) - ) - for i in range(n_steps): - rr.set_time(args.timeline_name, sequence=i) - if timestamps is not None and i < len(timestamps): - rr.set_time("wall_clock", timestamp=timestamps[i]) - - for arm_name, arm in all_arms.items(): - states = arm["states"] - if "joint_pos" in states and i < len(states["joint_pos"]): - for j, val in enumerate(states["joint_pos"][i]): - rr.log(f"{arm_name}/joint_pos/j{j}", rr.Scalars(val)) - if "joint_vel" in states and i < len(states["joint_vel"]): - for j, val in enumerate(states["joint_vel"][i]): - rr.log(f"{arm_name}/joint_vel/j{j}", rr.Scalars(val)) - if "gripper_pos" in states and i < len(states["gripper_pos"]): - rr.log(f"{arm_name}/gripper/pos", rr.Scalars(states["gripper_pos"][i])) - if "actions" in arm and "pos" in arm["actions"] and i < len(arm["actions"]["pos"]): - for j, val in enumerate(arm["actions"]["pos"][i]): - rr.log(f"{arm_name}/action/j{j}", rr.Scalars(val)) - - # Log camera video frames - if not args.no_video: - for cam_name in cam_names: - video_path = episode_dir / f"{cam_name}.mp4" - if not video_path.exists(): - continue - - cam_timestamps_path = episode_dir / f"{cam_name}_timestamps.npy" - cam_timestamps = np.load(str(cam_timestamps_path)) if cam_timestamps_path.exists() else None - # Convert ms → s if timestamps look like milliseconds (>1e12) - if cam_timestamps is not None and len(cam_timestamps) > 0 and cam_timestamps[0] > 1e12: - cam_timestamps = cam_timestamps / 1e3 - - import cv2 - - cap = cv2.VideoCapture(str(video_path)) - frame_idx = 0 - logger.info("Logging video: {}", cam_name) - - while cap.isOpened(): - ret, frame = cap.read() - if not ret: - break - - rr.set_time(args.timeline_name, sequence=frame_idx) - if cam_timestamps is not None and frame_idx < len(cam_timestamps): - rr.set_time("wall_clock", timestamp=cam_timestamps[frame_idx]) - - rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - rr.log(f"cameras/{cam_name}", rr.Image(rgb)) - frame_idx += 1 - - cap.release() - logger.info(" {} frames logged", frame_idx) - - logger.info("Done logging. Viewer should be open. Press Ctrl+C to exit.") - - # Keep process alive so data stays in the viewer - try: - import signal - - signal.pause() - except KeyboardInterrupt: - pass +from limb.data.visualize_episode import Args, main if __name__ == "__main__": main(tyro.cli(Args))