Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 155 additions & 14 deletions benchmark/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,39 @@ def _decode_frames_to_png_and_video(
VideoEncoder(frames=tensors, frame_rate=fps).to_file(mp4_path)


def _resize_with_pad(chw, size: int):
"""Aspect-preserving letterbox of a (C, H, W) uint8 tensor to size x size.

Scales the longer side to ``size`` and pads the shorter with black (0).
Mirrors the server's ``Pi05ViTEncoderSubmodule._prepare_one`` geometry, so
sending the pre-resized frame produces the same model input as decoding at
native resolution and resizing on the worker.
"""
import torch
import torch.nn.functional as F

_, h, w = chw.shape
if (h, w) == (size, size):
return chw
ratio = max(w / size, h / size)
rh, rw = int(h / ratio), int(w / ratio)
x = F.interpolate(chw[None].float(), size=(rh, rw), mode="bilinear", align_corners=False)
ph0, remh = divmod(size - rh, 2)
pw0, remw = divmod(size - rw, 2)
x = F.pad(x, (pw0, pw0 + remw, ph0, ph0 + remh), value=0.0)
return x[0].round().clamp(0, 255).to(torch.uint8)


def _decode_frame_to_npy(video_path: str, frame_index: int, npy_path: str, size: int) -> None:
"""Decode one frame, letterbox-resize to ``size`` x ``size``, save as a
(C, H, W) uint8 ``.npy`` (the "numpy" upload the server np.loads in memory)."""
import numpy as np
from torchcodec.decoders import VideoDecoder

frame = VideoDecoder(video_path).get_frames_at(indices=[frame_index]).data[0] # (C,H,W) uint8
np.save(npy_path, _resize_with_pad(frame, size).numpy())


class DROIDDataset(BaseDataset):
"""DROID robotics dataset for evaluating pi0.5 and V-JEPA 2-AC.

Expand All @@ -631,6 +664,14 @@ class DROIDDataset(BaseDataset):
"""

HF_REPO = "lerobot/droid_100"
# pi05 camera frames are letterboxed to this size client-side (matches the
# server's vit_image_size) so both mstar and openpi get identical input.
IMAGE_SIZE = 224

PI05_KEYS = [
"observation.images.exterior_image_1_left",
"observation.images.wrist_image_left"
]

def __init__(
self,
Expand Down Expand Up @@ -663,6 +704,17 @@ def __init__(
# producing 8 token-frames; only the first is used as rollout context.
self.num_video_frames = num_video_frames

# Fast path: reuse a manifest (PNG paths + robot_state + prompt) built
# on a previous run so repeat benchmarks skip the full-parquet load and
# the per-frame video decode entirely. Only pi05 caches; vjepa2_ac
# streams the episode mp4 directly and is left uncached.
if task == "pi05":
cached = self._load_manifest()
if cached is not None:
print(f" [cache] reusing {len(cached)} pi05 items from manifest")
self.items = self._resize_data(cached)
return

def _dl(filename):
return hf_hub_download(
self.HF_REPO, filename, repo_type="dataset", cache_dir=cache_dir
Expand All @@ -683,6 +735,12 @@ def _dl(filename):
f"No video keys found in {self.HF_REPO}/meta/info.json. "
f"Top-level keys: {list(info.keys())}"
)

if task == "pi05":
assert all((key in camera_keys for key in self.PI05_KEYS)), \
f"Expected camera keys {self.PI05_KEYS} not all found in {camera_keys}"
camera_keys = self.PI05_KEYS

chunks_size: int = info.get("chunks_size", info.get("chunk_size", 1000))
print(f" camera keys : {camera_keys}")
print(f" chunks_size : {chunks_size}")
Expand Down Expand Up @@ -724,7 +782,13 @@ def _dl(filename):
for frames in episodes.values():
frames.sort(key=lambda r: int(r[frame_col]))

ep_ids = sorted(episodes.keys())[:num_requests]
# pi05 caches a complete manifest, so build every episode once
# (_resize_data truncates to num_requests below) and the cache is reused
# for any num_requests. vjepa2_ac streams mp4s uncached, so keep the
# original [:num_requests] cap to bound its first-run decode cost.
ep_ids = sorted(episodes.keys())
if task != "pi05":
ep_ids = ep_ids[:num_requests]
print(f" using {len(ep_ids)} of {len(episodes)} episodes")

self.items: list[RequestInput] = []
Expand All @@ -749,6 +813,9 @@ def _dl(filename):
if item is not None:
self.items.append(item)

if task == "pi05":
self._save_manifest(self.items)

self.items = self._resize_data(self.items)

# ------------------------------------------------------------------
Expand All @@ -772,28 +839,30 @@ def _make_pi05(self, idx, ep_id, frames, camera_keys, state_col,
local_indices = self._local_frame_indices(frames)
first_local = local_indices[0]

image_paths: list[str] = []
# Decode + letterbox-resize each camera frame to 224x224 uint8 and save
# as a ".npy" (the "numpy" modality). Sending pre-resized arrays lets the
# server skip both image decode and the resize, and lets us hand mstar
# and openpi identical input.
npy_paths: list[str] = []
for cam_key in camera_keys[:3]:
chunk_video = download_fn(self._chunk_video_path(ep_id, cam_key, chunks_size))
png_path = os.path.join(self.local_file_dir, f"ep{ep_id}_cam{len(image_paths)}.png")
_decode_frames_to_png_and_video(
chunk_video, [first_local], png_path=png_path, mp4_path=None
)
image_paths.append(png_path)
npy_path = os.path.join(self.local_file_dir, f"ep{ep_id}_cam{len(npy_paths)}.npy")
_decode_frame_to_npy(chunk_video, first_local, npy_path, self.IMAGE_SIZE)
npy_paths.append(npy_path)

if not image_paths:
if not npy_paths:
raise ValueError("no camera videos found")
while len(image_paths) < 3:
image_paths.append(image_paths[0])
while len(npy_paths) < 3:
npy_paths.append(npy_paths[0])

state = _to_float_list(
frames[0].get(state_col) if state_col else None, self.action_dim
)
return RequestInput(
req_type=RequestType.VLA,
prompt=language or "manipulate the object",
image_path=image_paths[0],
extra_image_paths=image_paths[1:],
# openpi droid policy only uses the first extra image, so send 2 cameras.
_numpy_paths=npy_paths[:2],
Comment on lines +864 to +865

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark is not apples-to-apples - M* silently runs on one camera, openpi on two. This may invalidates the comparison your PR is presumably built to demonstrate.

  • The benchmark uploads 2 cameras as 2 separate .npy blobs (benchmark/dataset.py line 854, request.py lines 756-762), and data_worker.py lines224 to 231 loads each as a separate tensor --> image_inputs = [cam0, cam1].
  • But the encoder consumes only image_inputs[0]: Pi05ViTEncoderSubmodule.prepare_inputs --> self._prepare_one(inputs["image_inputs"][0]) (submodules.py line 182-184). Camera 1 is silently dropped. openpi's baseline uses both (request.py _build_obs).
  • The encoder's actual contract is that image_inputs[0] is a stacked (num_cameras,3,H,W) tensor (_prepare_one handles the 4-D case; forward_batched flattens cameras into the token sequence). The benchmark/raw-numpy path violates this by sending a list of separate tensors.
  • Compounding it: the new num_cameras: 2 in configs/pi05_droid.yaml sizes the vit CUDA-graph static buffer to (1,2,3,H,W), while runtime feeds (1,1,3,H,W). static_buf[:1].copy_(real_val) (cuda_graph_runner.py on line 2169) broadcasts the size-1 camera dim, so the model processes two duplicate copies of camera 0, not a crash, just silently wrong.
  • Also found that the camera labeled "wrist" sent to openpi is actually exterior_image_2_left, and gripper_position is always a padding 0.0 for lerobot/droid_100.

model_kwargs={"robot_state": state},
)

Expand Down Expand Up @@ -824,7 +893,9 @@ def _make_vjepa2_ac(self, idx, ep_id, frames, camera_keys, action_col,
)
mp4_path = os.path.join(self.local_file_dir, f"ep{ep_id}.mp4")
_decode_frames_to_png_and_video(
chunk_video, video_local_indices, png_path=None, mp4_path=mp4_path
video_path=chunk_video,
frame_indices=video_local_indices,
png_path=None, mp4_path=mp4_path
)

actions = [_to_float_list(f.get(action_col), self.action_dim)
Expand All @@ -842,6 +913,76 @@ def _make_vjepa2_ac(self, idx, ep_id, frames, camera_keys, action_col,
},
)

# ------------------------------------------------------------------
# pi05 manifest cache
# ------------------------------------------------------------------

def _manifest_path(self) -> str:
"""Manifest filename keyed by the params that change the built items."""
return os.path.join(
self.local_file_dir,
f"manifest_pi05_npy{self.IMAGE_SIZE}_nvf{self.num_video_frames}_ad{self.action_dim}.json",
)

def _load_manifest(self) -> list[RequestInput] | None:
"""Return cached pi05 RequestInputs, or None to force a rebuild.

Returns None if the manifest is absent, unreadable, or references a .npy
that no longer exists on disk.
"""
import json as _json

path = self._manifest_path()
if not os.path.exists(path):
return None
try:
with open(path) as f:
data = _json.load(f)
items: list[RequestInput] = []
for entry in data["items"]:
npy_paths = [os.path.join(self.local_file_dir, p)
for p in entry.get("numpy_paths", [])]
for p in npy_paths:
if not os.path.exists(p):
print(f" [cache] missing {p}; rebuilding")
return None
items.append(RequestInput(
req_type=RequestType.VLA,
prompt=entry["prompt"],
_numpy_paths=npy_paths,
model_kwargs=entry.get("model_kwargs", {}),
))
return items or None
except Exception as e:
print(f" [cache] manifest unreadable ({e}); rebuilding")
return None

def _save_manifest(self, items: list[RequestInput]) -> None:
"""Persist built pi05 items so the next run can skip parquet + decode.

.npy paths are stored as basenames (relative to local_file_dir) and the
write is atomic (tmp + os.replace) so an interrupted run never leaves a
half-written manifest that would later be reused.
"""
import json as _json

entries = [{
"prompt": it.prompt,
"numpy_paths": [os.path.basename(p) for p in it._numpy_paths],
"model_kwargs": it.model_kwargs,
} for it in items]
payload = {
"version": 2,
"task": "pi05",
"num_video_frames": self.num_video_frames,
"action_dim": self.action_dim,
"items": entries,
}
tmp = self._manifest_path() + ".tmp"
with open(tmp, "w") as f:
_json.dump(payload, f)
os.replace(tmp, self._manifest_path())

@property
def num_requests(self) -> int:
return self._num_requests
Expand All @@ -852,7 +993,7 @@ def __len__(self) -> int:
def __getitem__(self, idx: int) -> RequestInput:
return self.items[idx]

# ------------------------------------------------------------------

class VideoMMEDataset(BaseDataset):
"""
Dataset loader for Video-MME (https://video-mme.github.io/).
Expand Down
38 changes: 38 additions & 0 deletions benchmark/download_pi05_ckpt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""Download the pi0.5 checkpoint and print the local path.
conda activate openpi
python benchmark/download_pi05_ckpt.py
"""

from __future__ import annotations

import argparse
import sys

DEFAULT_CONFIG = "pi05_droid"
DEFAULT_CHECKPOINT = "gs://openpi-assets/checkpoints/pi05_droid"


def main():
p = argparse.ArgumentParser(description="Download pi0.5 checkpoint")
p.add_argument("--config", default=DEFAULT_CONFIG)
p.add_argument("--checkpoint", default=DEFAULT_CHECKPOINT)
args = p.parse_args()

try:
from openpi.shared import download
from openpi.training import config as _config
except ImportError as e:
sys.exit(
f"\n[ERROR] openpi is not importable ({e}).\n"
"Run inside the openpi conda env:\n"
" conda activate openpi\n"
)

_config.get_config(args.config)
ckpt_dir = download.maybe_download(args.checkpoint)
print(ckpt_dir)


if __name__ == "__main__":
main()
19 changes: 19 additions & 0 deletions benchmark/openpi_instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
1. Make a python3.12 environment
2. Clone the `openpi` repo
3. Run the following in your environment:
```
git submodule update --init --recursive
GIT_LFS_SKIP_SMUDGE=1 uv sync
GIT_LFS_SKIP_SMUDGE=1 uv pip install -e .
```
4. From the mstar repo, run:
```
pip install gsutil
python benchmark/download_pi05_ckpt.py
mkdir <DESIRED_OPENPI_CACHE_DIR>
mv /home/$USER/.cache/openpi/* <DESIRED_OPENPI_CACHE_DIR>
```
5. Start the server with:
```
uv run scripts/serve_policy.py policy:checkpoint --policy.config=pi05_droid --policy.dir=<DESIRED_OPENPI_CACHE_DIR>/openpi-assets/checkpoints/pi05_droid
```
Loading
Loading