From 98b564797ac2c7913803182b5baa172860c0b82e Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sat, 9 May 2026 14:13:33 +0800 Subject: [PATCH 1/9] feat: add unified embodied workflow planning --- roboclaw/embodied/service/__init__.py | 68 +++++ roboclaw/embodied/workflow.py | 358 ++++++++++++++++++++++++++ roboclaw/http/routes/__init__.py | 2 + roboclaw/http/routes/workflows.py | 30 +++ tests/test_workflow_plan.py | 89 +++++++ tests/test_workflow_routes.py | 106 ++++++++ tests/test_workflow_service.py | 82 ++++++ 7 files changed, 735 insertions(+) create mode 100644 roboclaw/embodied/workflow.py create mode 100644 roboclaw/http/routes/workflows.py create mode 100644 tests/test_workflow_plan.py create mode 100644 tests/test_workflow_routes.py create mode 100644 tests/test_workflow_service.py diff --git a/roboclaw/embodied/service/__init__.py b/roboclaw/embodied/service/__init__.py index 910214c2d..3f3c53d2c 100644 --- a/roboclaw/embodied/service/__init__.py +++ b/roboclaw/embodied/service/__init__.py @@ -37,6 +37,7 @@ VerificationRequest, Verifier, ) +from roboclaw.embodied.workflow import WorkflowPlan, WorkflowPlanner, WorkflowSpec class EmbodiedService: @@ -226,6 +227,73 @@ def _verify_inference_preflight( if not result.ok: raise ActionError(result.format_violations()) + def plan_workflow(self, spec: WorkflowSpec | dict[str, Any]) -> WorkflowPlan: + """Compile a workflow spec into concrete stage plans and validations.""" + return WorkflowPlanner(self.manifest, self.datasets).plan(spec) + + async def start_workflow_phase( + self, + spec: WorkflowSpec | dict[str, Any], + phase: str, + ) -> dict[str, Any]: + """Start a workflow phase using the unified workflow spec interface.""" + workflow = spec if isinstance(spec, WorkflowSpec) else WorkflowSpec.model_validate(spec) + plan = self.plan_workflow(workflow) + stage = next((item for item in plan.stages if item.stage == phase), None) + if stage is None: + raise RuntimeError(f"Unknown workflow phase '{phase}'.") + if not stage.enabled: + raise RuntimeError(f"Workflow phase '{phase}' is disabled.") + if stage.issues: + raise RuntimeError(" · ".join(issue.message for issue in stage.issues)) + + if phase == "record": + dataset_name = await self.start_recording( + task=workflow.record.task, + num_episodes=workflow.record.num_episodes, + fps=workflow.record.fps, + episode_time_s=workflow.record.episode_time_s, + reset_time_s=workflow.record.reset_time_s, + dataset_name=workflow.record.dataset_name, + use_cameras=workflow.hardware.use_cameras, + arms=workflow.hardware.arms, + ) + return {"status": "recording", "dataset_name": dataset_name} + + if phase == "train": + train_dataset_name = workflow.train.dataset_name.strip() or plan.stage("record").dataset_name + result = await self.train.train( + manifest=self.manifest, + kwargs={ + "dataset_name": train_dataset_name, + "policy_type": workflow.train.policy_type, + "steps": workflow.train.steps, + "device": workflow.train.device, + }, + tty_handoff=None, + ) + job_id = result.rsplit("Job ID:", 1)[-1].strip() if "Job ID:" in result else "" + return {"message": result, "job_id": job_id} + + if phase == "infer": + await self.start_inference( + checkpoint_path=workflow.infer.checkpoint_path, + source_dataset=workflow.infer.source_dataset or plan.stage("train").dataset_name or plan.stage("record").dataset_name, + dataset_name=workflow.infer.dataset_name, + task=workflow.infer.task, + num_episodes=workflow.infer.num_episodes, + episode_time_s=workflow.infer.episode_time_s, + arms=workflow.hardware.arms, + use_cameras=workflow.hardware.use_cameras, + ) + return { + "status": "inferring", + "dataset_name": stage.dataset_name, + "checkpoint_path": stage.checkpoint_path, + } + + raise RuntimeError(f"Workflow phase '{phase}' is not supported.") + # -- Operations (Web entry points) -- async def start_teleop(self, *, fps: int = 30, arms: str = "") -> None: diff --git a/roboclaw/embodied/workflow.py b/roboclaw/embodied/workflow.py new file mode 100644 index 000000000..1726325cb --- /dev/null +++ b/roboclaw/embodied/workflow.py @@ -0,0 +1,358 @@ +"""Unified embodied workflow specification and planning helpers.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Literal + +from pydantic import BaseModel, ConfigDict, Field +from pydantic.alias_generators import to_camel + +from roboclaw.data.datasets import DatasetCatalog, DatasetRuntimeRef +from roboclaw.embodied.command import ActionError, CommandBuilder + +WorkflowStageName = Literal["record", "train", "infer"] + + +class WorkflowModel(BaseModel): + """Base workflow model with camelCase compatibility for JSON APIs.""" + + model_config = ConfigDict( + alias_generator=to_camel, + populate_by_name=True, + extra="ignore", + ) + + +class WorkflowHardwareSpec(WorkflowModel): + """Hardware-facing options shared across workflow stages.""" + + arms: str = "" + use_cameras: bool = True + + +class RecordWorkflowSpec(WorkflowModel): + enabled: bool = False + task: str = "" + dataset_name: str = "" + num_episodes: int = 10 + fps: int = 30 + episode_time_s: int = 300 + reset_time_s: int = 10 + + +class TrainWorkflowSpec(WorkflowModel): + enabled: bool = False + dataset_name: str = "" + policy_type: str = "act" + steps: int = 100_000 + device: str = "cuda" + + +class InferWorkflowSpec(WorkflowModel): + enabled: bool = False + checkpoint_path: str = "" + source_dataset: str = "" + dataset_name: str = "" + task: str = "eval" + num_episodes: int = 1 + episode_time_s: int = 60 + + +class WorkflowSpec(WorkflowModel): + """Single spec that describes an embodied data/train/infer workflow.""" + + name: str = "" + hardware: WorkflowHardwareSpec = Field(default_factory=WorkflowHardwareSpec) + record: RecordWorkflowSpec = Field(default_factory=RecordWorkflowSpec) + train: TrainWorkflowSpec = Field(default_factory=TrainWorkflowSpec) + infer: InferWorkflowSpec = Field(default_factory=InferWorkflowSpec) + + +class WorkflowIssue(WorkflowModel): + """Validation issue found while compiling or checking a workflow.""" + + stage: WorkflowStageName + code: str + message: str + field: str = "" + + +class WorkflowStagePlan(WorkflowModel): + """Planner output for one workflow stage.""" + + stage: WorkflowStageName + enabled: bool = False + ready: bool = False + owner: str = "" + capability: str = "" + dataset_name: str = "" + source_dataset: str = "" + checkpoint_path: str = "" + output_path: str = "" + command: list[str] = Field(default_factory=list) + notes: list[str] = Field(default_factory=list) + issues: list[WorkflowIssue] = Field(default_factory=list) + + +class WorkflowPlan(WorkflowModel): + """Complete compiled view of a workflow spec.""" + + name: str = "" + ok: bool = False + stages: list[WorkflowStagePlan] = Field(default_factory=list) + issues: list[WorkflowIssue] = Field(default_factory=list) + + def stage(self, stage_name: WorkflowStageName) -> WorkflowStagePlan: + for stage in self.stages: + if stage.stage == stage_name: + return stage + raise KeyError(stage_name) + + +class WorkflowPlanner: + """Compile a workflow spec into concrete stage plans and validations.""" + + def __init__(self, manifest: Any, datasets: DatasetCatalog) -> None: + self._manifest = manifest + self._datasets = datasets + + def plan(self, spec: WorkflowSpec | dict[str, Any]) -> WorkflowPlan: + workflow = spec if isinstance(spec, WorkflowSpec) else WorkflowSpec.model_validate(spec) + stages: list[WorkflowStagePlan] = [] + + record_plan = self._plan_record(workflow) + stages.append(record_plan) + + train_plan = self._plan_train(workflow, record_plan) + stages.append(train_plan) + + infer_plan = self._plan_infer(workflow, record_plan, train_plan) + stages.append(infer_plan) + + issues = [issue for stage in stages for issue in stage.issues] + if not any(stage.enabled for stage in stages): + issues.append(WorkflowIssue( + stage="record", + code="empty_workflow", + message="Enable at least one workflow stage.", + )) + + return WorkflowPlan( + name=workflow.name, + ok=not issues, + stages=stages, + issues=issues, + ) + + def _plan_record(self, spec: WorkflowSpec) -> WorkflowStagePlan: + stage = WorkflowStagePlan( + stage="record", + enabled=spec.record.enabled, + owner="recording", + capability="record" if spec.hardware.use_cameras else "record_without_cameras", + ) + if not spec.record.enabled: + return stage + + dataset = self._datasets.prepare_recording_dataset(spec.record.dataset_name, prefix="rec") + stage.dataset_name = dataset.runtime.name + stage.output_path = str(dataset.runtime.local_path) + if not spec.record.dataset_name.strip(): + stage.notes.append("dataset_name is omitted and will be auto-generated at runtime.") + if not spec.record.task.strip(): + stage.issues.append(WorkflowIssue( + stage="record", + code="missing_task", + field="record.task", + message="record.task is required when the record stage is enabled.", + )) + + if stage.issues: + return stage + + try: + stage.command = CommandBuilder.record( + self._manifest, + dataset=dataset.runtime, + task=spec.record.task, + num_episodes=spec.record.num_episodes, + fps=spec.record.fps, + episode_time_s=spec.record.episode_time_s, + reset_time_s=spec.record.reset_time_s, + arms=spec.hardware.arms, + use_cameras=spec.hardware.use_cameras, + ) + except (ActionError, ValueError) as exc: + stage.issues.append(WorkflowIssue( + stage="record", + code="compile_error", + message=str(exc), + )) + return stage + + stage.ready = True + return stage + + def _plan_train(self, spec: WorkflowSpec, record_plan: WorkflowStagePlan) -> WorkflowStagePlan: + stage = WorkflowStagePlan(stage="train", enabled=spec.train.enabled, owner="training") + if not spec.train.enabled: + return stage + + dataset_name = spec.train.dataset_name.strip() or record_plan.dataset_name + stage.dataset_name = dataset_name + if not dataset_name: + stage.issues.append(WorkflowIssue( + stage="train", + code="missing_dataset", + field="train.dataset_name", + message="train.dataset_name is required unless the record stage feeds training.", + )) + return stage + + dataset_runtime = self._resolve_runtime_dataset( + dataset_name, + allow_planned=record_plan.enabled and dataset_name == record_plan.dataset_name, + ) + if dataset_runtime is None: + stage.issues.append(WorkflowIssue( + stage="train", + code="dataset_not_found", + field="train.dataset_name", + message=f"Runtime dataset '{dataset_name}' was not found.", + )) + return stage + + try: + stage.command = CommandBuilder.train( + self._manifest, + dataset=dataset_runtime, + policy_type=spec.train.policy_type, + steps=spec.train.steps, + device=spec.train.device, + ) + except (ActionError, ValueError) as exc: + stage.issues.append(WorkflowIssue( + stage="train", + code="compile_error", + message=str(exc), + )) + return stage + + output_dir = _argv_value(stage.command, "--output_dir=") + if output_dir: + stage.output_path = output_dir + stage.checkpoint_path = str(Path(output_dir) / "checkpoints" / "last" / "pretrained_model") + + if not spec.train.dataset_name.strip() and record_plan.enabled: + stage.notes.append("train.dataset_name is inherited from the record stage output.") + + stage.ready = True + return stage + + def _plan_infer( + self, + spec: WorkflowSpec, + record_plan: WorkflowStagePlan, + train_plan: WorkflowStagePlan, + ) -> WorkflowStagePlan: + stage = WorkflowStagePlan( + stage="infer", + enabled=spec.infer.enabled, + owner="inferring", + capability="infer" if spec.hardware.use_cameras else "infer_without_cameras", + ) + if not spec.infer.enabled: + return stage + + output_dataset = self._datasets.prepare_recording_dataset(spec.infer.dataset_name, prefix="eval") + stage.dataset_name = output_dataset.runtime.name + stage.output_path = str(output_dataset.runtime.local_path) + if not spec.infer.dataset_name.strip(): + stage.notes.append("infer.dataset_name is omitted and will be auto-generated at runtime.") + + source_dataset_name = ( + spec.infer.source_dataset.strip() + or train_plan.dataset_name + or record_plan.dataset_name + ) + stage.source_dataset = source_dataset_name + + checkpoint_path = spec.infer.checkpoint_path.strip() + stage.checkpoint_path = checkpoint_path + if not checkpoint_path and not source_dataset_name: + stage.issues.append(WorkflowIssue( + stage="infer", + code="missing_checkpoint_source", + field="infer.checkpoint_path", + message="Provide infer.checkpoint_path or a source dataset/train stage for checkpoint resolution.", + )) + return stage + + source_dataset = None + if source_dataset_name: + source_dataset = self._resolve_runtime_dataset( + source_dataset_name, + allow_planned=( + (record_plan.enabled and source_dataset_name == record_plan.dataset_name) + or (train_plan.enabled and source_dataset_name == train_plan.dataset_name) + ), + ) + if source_dataset is None and not checkpoint_path: + stage.issues.append(WorkflowIssue( + stage="infer", + code="source_dataset_not_found", + field="infer.source_dataset", + message=f"Runtime dataset '{source_dataset_name}' was not found for checkpoint resolution.", + )) + return stage + + try: + stage.command = CommandBuilder.infer( + self._manifest, + dataset=output_dataset.runtime, + checkpoint_path=checkpoint_path, + source_dataset=source_dataset, + task=spec.infer.task, + num_episodes=spec.infer.num_episodes, + episode_time_s=spec.infer.episode_time_s, + arms=spec.hardware.arms, + use_cameras=spec.hardware.use_cameras, + ) + except (ActionError, ValueError) as exc: + stage.issues.append(WorkflowIssue( + stage="infer", + code="compile_error", + message=str(exc), + )) + return stage + + resolved_checkpoint = _argv_value(stage.command, "--policy.path=") + if resolved_checkpoint: + stage.checkpoint_path = resolved_checkpoint + if not checkpoint_path and train_plan.enabled and train_plan.checkpoint_path: + stage.notes.append("infer.checkpoint_path is derived from the planned training output.") + elif not checkpoint_path and source_dataset_name: + stage.notes.append("infer.checkpoint_path is derived from the source dataset policy directory.") + + stage.ready = True + return stage + + def _resolve_runtime_dataset(self, name: str, *, allow_planned: bool) -> DatasetRuntimeRef | None: + try: + return self._datasets.resolve_runtime_dataset(name).runtime + except ValueError: + if not allow_planned: + return None + return DatasetRuntimeRef( + name=name, + repo_id=f"local/{name}", + local_path=self._datasets.root / "local" / name, + ) + + +def _argv_value(argv: list[str], prefix: str) -> str: + for item in argv: + if item.startswith(prefix): + return item.split("=", 1)[1] + return "" diff --git a/roboclaw/http/routes/__init__.py b/roboclaw/http/routes/__init__.py index c30d91a01..37a947943 100644 --- a/roboclaw/http/routes/__init__.py +++ b/roboclaw/http/routes/__init__.py @@ -29,6 +29,7 @@ def register_all_routes( from roboclaw.http.routes.train import register_train_routes from roboclaw.http.routes.infer import register_infer_routes from roboclaw.http.routes.hub import register_hub_routes + from roboclaw.http.routes.workflows import register_workflow_routes from roboclaw.http.routes.chat_uploads import register_chat_upload_routes register_chat_upload_routes(app) @@ -45,6 +46,7 @@ def register_all_routes( register_train_routes(app, service) register_infer_routes(app, service) register_hub_routes(app, service) + register_workflow_routes(app, service) from roboclaw.http.routes.curation import register_curation_routes from roboclaw.http.routes.explorer import register_explorer_routes diff --git a/roboclaw/http/routes/workflows.py b/roboclaw/http/routes/workflows.py new file mode 100644 index 000000000..f4b8bab35 --- /dev/null +++ b/roboclaw/http/routes/workflows.py @@ -0,0 +1,30 @@ +"""Workflow routes — unified embodied workflow planning and execution.""" + +from __future__ import annotations + +from typing import Any, Literal + +from fastapi import FastAPI, HTTPException + +from roboclaw.embodied.service import EmbodiedService +from roboclaw.embodied.workflow import WorkflowSpec + +WorkflowPhase = Literal["record", "train", "infer"] + + +def register_workflow_routes(app: FastAPI, service: EmbodiedService) -> None: + + @app.post("/api/workflows/validate") + async def workflow_validate(body: WorkflowSpec) -> dict[str, Any]: + return service.plan_workflow(body).model_dump(by_alias=True) + + @app.post("/api/workflows/plan") + async def workflow_plan(body: WorkflowSpec) -> dict[str, Any]: + return service.plan_workflow(body).model_dump(by_alias=True) + + @app.post("/api/workflows/run/{phase}") + async def workflow_run(phase: WorkflowPhase, body: WorkflowSpec) -> dict[str, Any]: + try: + return await service.start_workflow_phase(body, phase) + except (RuntimeError, ValueError) as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc diff --git a/tests/test_workflow_plan.py b/tests/test_workflow_plan.py new file mode 100644 index 000000000..47c0d33ae --- /dev/null +++ b/tests/test_workflow_plan.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from roboclaw.embodied.embodiment.interface.serial import SerialInterface +from roboclaw.embodied.embodiment.interface.video import VideoInterface +from roboclaw.embodied.embodiment.manifest import Manifest +from roboclaw.embodied.service import EmbodiedService +from roboclaw.embodied.workflow import WorkflowPlanner, WorkflowSpec + + +def _make_service(tmp_path: Path) -> EmbodiedService: + manifest_data = { + "version": 2, + "arms": [], + "hands": [], + "cameras": [], + "datasets": {"root": str(tmp_path / "datasets")}, + "policies": {"root": str(tmp_path / "policies")}, + } + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text(json.dumps(manifest_data), encoding="utf-8") + manifest = Manifest(path=manifest_path) + service = EmbodiedService(manifest=manifest) + service.bind_arm("follower", "so101_follower", SerialInterface(dev="/tmp/follower")) + service.bind_arm("leader", "so101_leader", SerialInterface(dev="/tmp/leader")) + service.bind_camera("wrist", VideoInterface(dev="/tmp/wrist")) + return service + + +def test_workflow_planner_compiles_record_train_infer_chain(tmp_path: Path) -> None: + service = _make_service(tmp_path) + plan = WorkflowPlanner(service.manifest, service.datasets).plan(WorkflowSpec.model_validate({ + "name": "pick-cube-pipeline", + "hardware": {"use_cameras": True}, + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + "num_episodes": 5, + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 2000, + }, + "infer": { + "enabled": True, + "dataset_name": "eval_pick_cube_v1", + "num_episodes": 2, + }, + })) + + assert plan.ok is True + + record_stage = plan.stage("record") + assert record_stage.ready is True + assert record_stage.dataset_name == "pick_cube_v1" + assert "--dataset.repo_id=local/pick_cube_v1" in record_stage.command + + train_stage = plan.stage("train") + assert train_stage.ready is True + assert train_stage.dataset_name == "pick_cube_v1" + assert "--policy.type=act" in train_stage.command + assert train_stage.checkpoint_path.endswith( + "pick_cube_v1/checkpoints/last/pretrained_model" + ) + + infer_stage = plan.stage("infer") + assert infer_stage.ready is True + assert infer_stage.dataset_name == "eval_pick_cube_v1" + assert infer_stage.source_dataset == "pick_cube_v1" + assert infer_stage.checkpoint_path.endswith( + "pick_cube_v1/checkpoints/last/pretrained_model" + ) + assert any(arg.startswith("--policy.path=") for arg in infer_stage.command) + + +def test_workflow_planner_reports_missing_train_dataset_without_record_stage(tmp_path: Path) -> None: + service = _make_service(tmp_path) + plan = WorkflowPlanner(service.manifest, service.datasets).plan({ + "train": {"enabled": True}, + }) + + assert plan.ok is False + train_stage = plan.stage("train") + assert train_stage.ready is False + assert any(issue.code == "missing_dataset" for issue in train_stage.issues) diff --git a/tests/test_workflow_routes.py b/tests/test_workflow_routes.py new file mode 100644 index 000000000..a1fd3075b --- /dev/null +++ b/tests/test_workflow_routes.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +pytest.importorskip("fastapi") +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from roboclaw.embodied.board import Board +from roboclaw.embodied.embodiment.hardware.monitor import HardwareMonitor +from roboclaw.embodied.embodiment.interface.serial import SerialInterface +from roboclaw.embodied.embodiment.interface.video import VideoInterface +from roboclaw.embodied.embodiment.manifest import Manifest +from roboclaw.embodied.service import EmbodiedService +from roboclaw.http.routes.workflows import register_workflow_routes + + +@pytest.fixture(autouse=True) +def isolated_roboclaw_home(tmp_path): + with patch( + "roboclaw.embodied.embodiment.lock.get_roboclaw_home", + return_value=tmp_path, + ), patch( + "roboclaw.embodied.embodiment.manifest.helpers.get_roboclaw_home", + return_value=tmp_path, + ): + yield + + +@pytest.fixture() +def app(tmp_path): + app = FastAPI() + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text(json.dumps({ + "version": 2, + "arms": [], + "hands": [], + "cameras": [], + "datasets": {"root": str(tmp_path / "datasets")}, + "policies": {"root": str(tmp_path / "policies")}, + }), encoding="utf-8") + board = Board() + manifest = Manifest(path=manifest_path, board=board) + hw_monitor = HardwareMonitor(board=board, manifest=manifest) + service = EmbodiedService(hardware_monitor=hw_monitor, board=board, manifest=manifest) + service.bind_arm("follower", "so101_follower", SerialInterface(dev="/tmp/follower")) + service.bind_arm("leader", "so101_leader", SerialInterface(dev="/tmp/leader")) + service.bind_camera("wrist", VideoInterface(dev="/tmp/wrist")) + register_workflow_routes(app, service) + app.state.embodied_service = service + return app + + +@pytest.fixture() +def client(app): + return TestClient(app, raise_server_exceptions=False) + + +def test_workflow_plan_route_returns_compiled_stages(client): + resp = client.post("/api/workflows/plan", json={ + "name": "pick-cube-pipeline", + "hardware": {"useCameras": True}, + "record": { + "enabled": True, + "task": "pick cube", + "datasetName": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policyType": "act", + }, + "infer": { + "enabled": True, + "datasetName": "eval_pick_cube_v1", + }, + }) + + assert resp.status_code == 200 + payload = resp.json() + assert payload["ok"] is True + assert [stage["stage"] for stage in payload["stages"]] == ["record", "train", "infer"] + assert payload["stages"][1]["datasetName"] == "pick_cube_v1" + assert payload["stages"][2]["checkpointPath"].endswith( + "pick_cube_v1/checkpoints/last/pretrained_model" + ) + + +def test_workflow_run_route_delegates_to_service(client, app): + service = app.state.embodied_service + service.start_workflow_phase = AsyncMock(return_value={"status": "recording", "dataset_name": "demo"}) + + resp = client.post("/api/workflows/run/record", json={ + "record": { + "enabled": True, + "task": "pick cube", + "datasetName": "demo", + }, + }) + + assert resp.status_code == 200 + assert resp.json() == {"status": "recording", "dataset_name": "demo"} + service.start_workflow_phase.assert_awaited_once() diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py new file mode 100644 index 000000000..bb076cc94 --- /dev/null +++ b/tests/test_workflow_service.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import asyncio +import json +from pathlib import Path +from unittest.mock import AsyncMock + +from roboclaw.embodied.embodiment.interface.serial import SerialInterface +from roboclaw.embodied.embodiment.interface.video import VideoInterface +from roboclaw.embodied.embodiment.manifest import Manifest +from roboclaw.embodied.service import EmbodiedService + + +def _make_service(tmp_path: Path) -> EmbodiedService: + manifest_data = { + "version": 2, + "arms": [], + "hands": [], + "cameras": [], + "datasets": {"root": str(tmp_path / "datasets")}, + "policies": {"root": str(tmp_path / "policies")}, + } + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text(json.dumps(manifest_data), encoding="utf-8") + manifest = Manifest(path=manifest_path) + service = EmbodiedService(manifest=manifest) + service.bind_arm("follower", "so101_follower", SerialInterface(dev="/tmp/follower")) + service.bind_arm("leader", "so101_leader", SerialInterface(dev="/tmp/leader")) + service.bind_camera("wrist", VideoInterface(dev="/tmp/wrist")) + return service + + +def test_start_workflow_phase_train_inherits_record_dataset_name(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service.train.train = AsyncMock(return_value="Training started. Job ID: job-1") + + result = asyncio.run(service.start_workflow_phase({ + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 1000, + }, + }, "train")) + + assert result == {"message": "Training started. Job ID: job-1", "job_id": "job-1"} + service.train.train.assert_awaited_once() + kwargs = service.train.train.await_args.kwargs["kwargs"] + assert kwargs["dataset_name"] == "pick_cube_v1" + + +def test_start_workflow_phase_infer_inherits_train_source_dataset(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service.start_inference = AsyncMock(return_value=None) + + result = asyncio.run(service.start_workflow_phase({ + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 1000, + }, + "infer": { + "enabled": True, + "dataset_name": "eval_pick_cube_v1", + }, + }, "infer")) + + assert result["status"] == "inferring" + assert result["dataset_name"] == "eval_pick_cube_v1" + assert result["checkpoint_path"].endswith("pick_cube_v1/checkpoints/last/pretrained_model") + service.start_inference.assert_awaited_once() + kwargs = service.start_inference.await_args.kwargs + assert kwargs["source_dataset"] == "pick_cube_v1" From 23362d2bd007a6b1155d17fbcddbc6e2a87aa7f4 Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sat, 9 May 2026 15:09:28 +0800 Subject: [PATCH 2/9] fix: align workflow planning review and execution --- README.md | 10 +++++ docs/WORKFLOWS.md | 65 +++++++++++++++++++++++++++ roboclaw/embodied/service/__init__.py | 11 +++-- roboclaw/http/routes/workflows.py | 27 +++++++++-- tests/test_workflow_service.py | 20 +++++++++ 5 files changed, 124 insertions(+), 9 deletions(-) create mode 100644 docs/WORKFLOWS.md diff --git a/README.md b/README.md index df609fab0..458c83246 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,16 @@ Help me install RoboClaw from https://github.com/MINT-SJTU/RoboClaw - [Non-Docker Installation](./docs/INSTALLATION.md) - [Docker Installation](./docs/DOCKERINSTALLATION.md) +## 🧭 Workflow Planning + +RoboClaw includes a unified embodied workflow spec for `record`, `train`, and `infer`, so the same payload can be used to preview a pipeline before launching any hardware job. + +- `POST /api/workflows/validate`: compile the workflow and surface issues +- `POST /api/workflows/plan`: preview derived datasets, checkpoint paths, and commands +- `POST /api/workflows/run/{phase}`: execute one validated phase + +See [Workflow Planning API](./docs/WORKFLOWS.md) for the request format and a review-first flow using `/docs`. + ## 📢 Community Co-Creation RoboClaw is being built in the open. We want major direction-setting choices, such as embodiment support, simulator priorities, and roadmap focus, to be discussed with the community. diff --git a/docs/WORKFLOWS.md b/docs/WORKFLOWS.md new file mode 100644 index 000000000..ef27bda15 --- /dev/null +++ b/docs/WORKFLOWS.md @@ -0,0 +1,65 @@ +# Workflow Planning API + +RoboClaw now exposes a unified workflow spec for the common embodied pipeline: + +- `record`: collect data from the robot +- `train`: fit a policy from a dataset +- `infer`: run a policy back on hardware and record the evaluation dataset + +The goal is not to add another dashboard. The goal is to make one workflow description reusable across planning, validation, and execution. + +## What The Planner Resolves + +Given a single `WorkflowSpec`, RoboClaw can: + +- validate whether each stage is runnable +- infer missing dataset names between stages +- infer the checkpoint path for inference from the planned training output +- compile the concrete command that each stage would run + +This makes `plan` the main review surface before anyone starts a real robot job. + +## API Surface + +- `POST /api/workflows/validate` + Returns the compiled workflow plus issues, without starting any stage. +- `POST /api/workflows/plan` + Returns the compiled stage plan, including derived dataset names, checkpoint paths, and commands. +- `POST /api/workflows/run/{phase}` + Starts a single validated phase, where `phase` is `record`, `train`, or `infer`. + +The run endpoints consume the same derived dataset and checkpoint values that appear in `plan`, so review and execution stay aligned. + +## Example Spec + +```json +{ + "name": "pick-cube-pipeline", + "hardware": { + "useCameras": true + }, + "record": { + "enabled": true, + "task": "pick cube", + "datasetName": "pick_cube_v1", + "numEpisodes": 5 + }, + "train": { + "enabled": true, + "policyType": "act", + "steps": 2000 + }, + "infer": { + "enabled": true, + "datasetName": "eval_pick_cube_v1", + "numEpisodes": 2 + } +} +``` + +## Review Flow + +1. Open the FastAPI docs page at `/docs`. +2. Use `POST /api/workflows/plan` with a workflow spec. +3. Check whether the returned stages are `ready`, whether datasets flow between stages as expected, and whether the checkpoint path matches the intended policy output. +4. Only then call `POST /api/workflows/run/{phase}` for the stage you want to execute. diff --git a/roboclaw/embodied/service/__init__.py b/roboclaw/embodied/service/__init__.py index 3f3c53d2c..143e27fd3 100644 --- a/roboclaw/embodied/service/__init__.py +++ b/roboclaw/embodied/service/__init__.py @@ -254,18 +254,17 @@ async def start_workflow_phase( fps=workflow.record.fps, episode_time_s=workflow.record.episode_time_s, reset_time_s=workflow.record.reset_time_s, - dataset_name=workflow.record.dataset_name, + dataset_name=stage.dataset_name, use_cameras=workflow.hardware.use_cameras, arms=workflow.hardware.arms, ) return {"status": "recording", "dataset_name": dataset_name} if phase == "train": - train_dataset_name = workflow.train.dataset_name.strip() or plan.stage("record").dataset_name result = await self.train.train( manifest=self.manifest, kwargs={ - "dataset_name": train_dataset_name, + "dataset_name": stage.dataset_name, "policy_type": workflow.train.policy_type, "steps": workflow.train.steps, "device": workflow.train.device, @@ -277,9 +276,9 @@ async def start_workflow_phase( if phase == "infer": await self.start_inference( - checkpoint_path=workflow.infer.checkpoint_path, - source_dataset=workflow.infer.source_dataset or plan.stage("train").dataset_name or plan.stage("record").dataset_name, - dataset_name=workflow.infer.dataset_name, + checkpoint_path=stage.checkpoint_path, + source_dataset=stage.source_dataset, + dataset_name=stage.dataset_name, task=workflow.infer.task, num_episodes=workflow.infer.num_episodes, episode_time_s=workflow.infer.episode_time_s, diff --git a/roboclaw/http/routes/workflows.py b/roboclaw/http/routes/workflows.py index f4b8bab35..c6aa42f58 100644 --- a/roboclaw/http/routes/workflows.py +++ b/roboclaw/http/routes/workflows.py @@ -14,15 +14,36 @@ def register_workflow_routes(app: FastAPI, service: EmbodiedService) -> None: - @app.post("/api/workflows/validate") + @app.post( + "/api/workflows/validate", + summary="Validate a unified embodied workflow", + description=( + "Compile a record/train/infer workflow spec and return any issues, " + "derived datasets, and resolved commands without starting a job." + ), + ) async def workflow_validate(body: WorkflowSpec) -> dict[str, Any]: return service.plan_workflow(body).model_dump(by_alias=True) - @app.post("/api/workflows/plan") + @app.post( + "/api/workflows/plan", + summary="Preview the compiled workflow plan", + description=( + "Resolve cross-stage dataset flow, checkpoint paths, and concrete commands " + "for a unified embodied workflow spec." + ), + ) async def workflow_plan(body: WorkflowSpec) -> dict[str, Any]: return service.plan_workflow(body).model_dump(by_alias=True) - @app.post("/api/workflows/run/{phase}") + @app.post( + "/api/workflows/run/{phase}", + summary="Run one phase from a validated workflow", + description=( + "Execute a single workflow phase using the same derived dataset and " + "checkpoint values shown in the workflow plan." + ), + ) async def workflow_run(phase: WorkflowPhase, body: WorkflowSpec) -> dict[str, Any]: try: return await service.start_workflow_phase(body, phase) diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py index bb076cc94..4939fb6a3 100644 --- a/tests/test_workflow_service.py +++ b/tests/test_workflow_service.py @@ -53,6 +53,24 @@ def test_start_workflow_phase_train_inherits_record_dataset_name(tmp_path: Path) assert kwargs["dataset_name"] == "pick_cube_v1" +def test_start_workflow_phase_record_uses_planned_dataset_name(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service.start_recording = AsyncMock(side_effect=lambda **kwargs: kwargs["dataset_name"]) + + result = asyncio.run(service.start_workflow_phase({ + "record": { + "enabled": True, + "task": "pick cube", + }, + }, "record")) + + assert result["status"] == "recording" + assert result["dataset_name"].startswith("rec_") + service.start_recording.assert_awaited_once() + kwargs = service.start_recording.await_args.kwargs + assert kwargs["dataset_name"] == result["dataset_name"] + + def test_start_workflow_phase_infer_inherits_train_source_dataset(tmp_path: Path) -> None: service = _make_service(tmp_path) service.start_inference = AsyncMock(return_value=None) @@ -79,4 +97,6 @@ def test_start_workflow_phase_infer_inherits_train_source_dataset(tmp_path: Path assert result["checkpoint_path"].endswith("pick_cube_v1/checkpoints/last/pretrained_model") service.start_inference.assert_awaited_once() kwargs = service.start_inference.await_args.kwargs + assert kwargs["checkpoint_path"].endswith("pick_cube_v1/checkpoints/last/pretrained_model") assert kwargs["source_dataset"] == "pick_cube_v1" + assert kwargs["dataset_name"] == "eval_pick_cube_v1" From cedc72723afb858612c3e4f42ad28e7f05acbb90 Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sat, 9 May 2026 15:12:11 +0800 Subject: [PATCH 3/9] chore: keep workflow planning PR code-focused --- README.md | 10 -------- docs/WORKFLOWS.md | 65 ----------------------------------------------- 2 files changed, 75 deletions(-) delete mode 100644 docs/WORKFLOWS.md diff --git a/README.md b/README.md index 458c83246..df609fab0 100644 --- a/README.md +++ b/README.md @@ -58,16 +58,6 @@ Help me install RoboClaw from https://github.com/MINT-SJTU/RoboClaw - [Non-Docker Installation](./docs/INSTALLATION.md) - [Docker Installation](./docs/DOCKERINSTALLATION.md) -## 🧭 Workflow Planning - -RoboClaw includes a unified embodied workflow spec for `record`, `train`, and `infer`, so the same payload can be used to preview a pipeline before launching any hardware job. - -- `POST /api/workflows/validate`: compile the workflow and surface issues -- `POST /api/workflows/plan`: preview derived datasets, checkpoint paths, and commands -- `POST /api/workflows/run/{phase}`: execute one validated phase - -See [Workflow Planning API](./docs/WORKFLOWS.md) for the request format and a review-first flow using `/docs`. - ## 📢 Community Co-Creation RoboClaw is being built in the open. We want major direction-setting choices, such as embodiment support, simulator priorities, and roadmap focus, to be discussed with the community. diff --git a/docs/WORKFLOWS.md b/docs/WORKFLOWS.md deleted file mode 100644 index ef27bda15..000000000 --- a/docs/WORKFLOWS.md +++ /dev/null @@ -1,65 +0,0 @@ -# Workflow Planning API - -RoboClaw now exposes a unified workflow spec for the common embodied pipeline: - -- `record`: collect data from the robot -- `train`: fit a policy from a dataset -- `infer`: run a policy back on hardware and record the evaluation dataset - -The goal is not to add another dashboard. The goal is to make one workflow description reusable across planning, validation, and execution. - -## What The Planner Resolves - -Given a single `WorkflowSpec`, RoboClaw can: - -- validate whether each stage is runnable -- infer missing dataset names between stages -- infer the checkpoint path for inference from the planned training output -- compile the concrete command that each stage would run - -This makes `plan` the main review surface before anyone starts a real robot job. - -## API Surface - -- `POST /api/workflows/validate` - Returns the compiled workflow plus issues, without starting any stage. -- `POST /api/workflows/plan` - Returns the compiled stage plan, including derived dataset names, checkpoint paths, and commands. -- `POST /api/workflows/run/{phase}` - Starts a single validated phase, where `phase` is `record`, `train`, or `infer`. - -The run endpoints consume the same derived dataset and checkpoint values that appear in `plan`, so review and execution stay aligned. - -## Example Spec - -```json -{ - "name": "pick-cube-pipeline", - "hardware": { - "useCameras": true - }, - "record": { - "enabled": true, - "task": "pick cube", - "datasetName": "pick_cube_v1", - "numEpisodes": 5 - }, - "train": { - "enabled": true, - "policyType": "act", - "steps": 2000 - }, - "infer": { - "enabled": true, - "datasetName": "eval_pick_cube_v1", - "numEpisodes": 2 - } -} -``` - -## Review Flow - -1. Open the FastAPI docs page at `/docs`. -2. Use `POST /api/workflows/plan` with a workflow spec. -3. Check whether the returned stages are `ready`, whether datasets flow between stages as expected, and whether the checkpoint path matches the intended policy output. -4. Only then call `POST /api/workflows/run/{phase}` for the stage you want to execute. From 8835c43c9e5000b37e2b475c69ad883679230c97 Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sat, 9 May 2026 17:00:46 +0800 Subject: [PATCH 4/9] fix: make workflow planning deterministic and dependency-aware --- roboclaw/embodied/service/__init__.py | 9 +- roboclaw/embodied/workflow.py | 225 ++++++++++++++++++++------ tests/test_workflow_plan.py | 54 ++++++- tests/test_workflow_routes.py | 5 + tests/test_workflow_service.py | 98 +++++++++-- 5 files changed, 324 insertions(+), 67 deletions(-) diff --git a/roboclaw/embodied/service/__init__.py b/roboclaw/embodied/service/__init__.py index 143e27fd3..374e4b5f0 100644 --- a/roboclaw/embodied/service/__init__.py +++ b/roboclaw/embodied/service/__init__.py @@ -246,6 +246,11 @@ async def start_workflow_phase( raise RuntimeError(f"Workflow phase '{phase}' is disabled.") if stage.issues: raise RuntimeError(" · ".join(issue.message for issue in stage.issues)) + if not stage.ready: + if stage.blocked_by: + blocked = ", ".join(stage.blocked_by) + raise RuntimeError(f"Workflow phase '{phase}' is waiting on: {blocked}.") + raise RuntimeError(f"Workflow phase '{phase}' is not ready.") if phase == "record": dataset_name = await self.start_recording( @@ -360,7 +365,7 @@ async def start_inference( ) -> None: self._require_capability("infer" if use_cameras else "infer_without_cameras") output_dataset = self.datasets.prepare_recording_dataset(dataset_name, prefix="eval") - source = self.datasets.resolve_runtime_dataset(source_dataset) if source_dataset else None + source = self.datasets.resolve_runtime_dataset(source_dataset) if source_dataset and not checkpoint_path else None argv = CommandBuilder.infer( self.manifest, dataset=output_dataset.runtime, @@ -414,7 +419,7 @@ async def run_inference( ) -> str: self._require_capability("infer" if use_cameras else "infer_without_cameras") output_dataset = self.datasets.prepare_recording_dataset(dataset_name, prefix="eval") - source = self.datasets.resolve_runtime_dataset(source_dataset) if source_dataset else None + source = self.datasets.resolve_runtime_dataset(source_dataset) if source_dataset and not checkpoint_path else None argv = CommandBuilder.infer( self.manifest, dataset=output_dataset.runtime, diff --git a/roboclaw/embodied/workflow.py b/roboclaw/embodied/workflow.py index 1726325cb..e999ef6f8 100644 --- a/roboclaw/embodied/workflow.py +++ b/roboclaw/embodied/workflow.py @@ -2,16 +2,32 @@ from __future__ import annotations +import hashlib +import json +import re from pathlib import Path from typing import Any, Literal from pydantic import BaseModel, ConfigDict, Field from pydantic.alias_generators import to_camel -from roboclaw.data.datasets import DatasetCatalog, DatasetRuntimeRef +from roboclaw.data.datasets import DatasetCatalog, DatasetRuntimeRef, validate_dataset_slug from roboclaw.embodied.command import ActionError, CommandBuilder WorkflowStageName = Literal["record", "train", "infer"] +_CHECKPOINT_CONFIG_FILES = ( + "config.json", + "train_config.json", + "policy_config.json", + "preprocessor_config.json", +) +_CHECKPOINT_WEIGHT_PATTERNS = ( + "model.safetensors", + "*.safetensors", + "*.pt", + "*.pth", + "*.bin", +) class WorkflowModel(BaseModel): @@ -92,6 +108,7 @@ class WorkflowStagePlan(WorkflowModel): output_path: str = "" command: list[str] = Field(default_factory=list) notes: list[str] = Field(default_factory=list) + blocked_by: list[WorkflowStageName] = Field(default_factory=list) issues: list[WorkflowIssue] = Field(default_factory=list) @@ -155,11 +172,17 @@ def _plan_record(self, spec: WorkflowSpec) -> WorkflowStagePlan: if not spec.record.enabled: return stage - dataset = self._datasets.prepare_recording_dataset(spec.record.dataset_name, prefix="rec") + dataset = self._prepare_output_dataset( + spec, + dataset_name=spec.record.dataset_name, + prefix="rec", + ) stage.dataset_name = dataset.runtime.name stage.output_path = str(dataset.runtime.local_path) if not spec.record.dataset_name.strip(): - stage.notes.append("dataset_name is omitted and will be auto-generated at runtime.") + stage.notes.append( + f"record.dataset_name is omitted and resolves deterministically to '{stage.dataset_name}'." + ) if not spec.record.task.strip(): stage.issues.append(WorkflowIssue( stage="record", @@ -195,7 +218,12 @@ def _plan_record(self, spec: WorkflowSpec) -> WorkflowStagePlan: return stage def _plan_train(self, spec: WorkflowSpec, record_plan: WorkflowStagePlan) -> WorkflowStagePlan: - stage = WorkflowStagePlan(stage="train", enabled=spec.train.enabled, owner="training") + stage = WorkflowStagePlan( + stage="train", + enabled=spec.train.enabled, + owner="training", + capability="train", + ) if not spec.train.enabled: return stage @@ -210,18 +238,28 @@ def _plan_train(self, spec: WorkflowSpec, record_plan: WorkflowStagePlan) -> Wor )) return stage - dataset_runtime = self._resolve_runtime_dataset( - dataset_name, - allow_planned=record_plan.enabled and dataset_name == record_plan.dataset_name, + dataset_runtime = self._resolve_runtime_dataset(dataset_name) + inherited_from_record = ( + not spec.train.dataset_name.strip() + and record_plan.enabled + and dataset_name == record_plan.dataset_name ) if dataset_runtime is None: - stage.issues.append(WorkflowIssue( - stage="train", - code="dataset_not_found", - field="train.dataset_name", - message=f"Runtime dataset '{dataset_name}' was not found.", - )) - return stage + if inherited_from_record: + dataset_runtime = self._planned_runtime_dataset(dataset_name) + stage.blocked_by.append("record") + stage.notes.append("train.dataset_name is inherited from the record stage output.") + stage.notes.append("train cannot run until the record stage materializes its dataset.") + else: + stage.issues.append(WorkflowIssue( + stage="train", + code="dataset_not_found", + field="train.dataset_name", + message=f"Runtime dataset '{dataset_name}' was not found.", + )) + return stage + elif inherited_from_record: + stage.notes.append("train.dataset_name is inherited from the record stage output.") try: stage.command = CommandBuilder.train( @@ -244,10 +282,7 @@ def _plan_train(self, spec: WorkflowSpec, record_plan: WorkflowStagePlan) -> Wor stage.output_path = output_dir stage.checkpoint_path = str(Path(output_dir) / "checkpoints" / "last" / "pretrained_model") - if not spec.train.dataset_name.strip() and record_plan.enabled: - stage.notes.append("train.dataset_name is inherited from the record stage output.") - - stage.ready = True + stage.ready = not stage.blocked_by return stage def _plan_infer( @@ -265,21 +300,39 @@ def _plan_infer( if not spec.infer.enabled: return stage - output_dataset = self._datasets.prepare_recording_dataset(spec.infer.dataset_name, prefix="eval") + output_dataset = self._prepare_output_dataset( + spec, + dataset_name=spec.infer.dataset_name, + prefix="eval", + ) stage.dataset_name = output_dataset.runtime.name stage.output_path = str(output_dataset.runtime.local_path) + checkpoint_path = spec.infer.checkpoint_path.strip() + stage.checkpoint_path = checkpoint_path if not spec.infer.dataset_name.strip(): - stage.notes.append("infer.dataset_name is omitted and will be auto-generated at runtime.") + stage.notes.append( + f"infer.dataset_name is omitted and resolves deterministically to '{stage.dataset_name}'." + ) + if checkpoint_path: + checkpoint_issue = _checkpoint_validation_message(checkpoint_path) + if checkpoint_issue: + stage.issues.append(WorkflowIssue( + stage="infer", + code="invalid_checkpoint", + field="infer.checkpoint_path", + message=checkpoint_issue, + )) + return stage - source_dataset_name = ( - spec.infer.source_dataset.strip() - or train_plan.dataset_name - or record_plan.dataset_name - ) + source_dataset_name = spec.infer.source_dataset.strip() + derived_from_train = False + if not checkpoint_path and not source_dataset_name and train_plan.enabled: + source_dataset_name = train_plan.dataset_name + derived_from_train = True + elif not checkpoint_path and not source_dataset_name: + source_dataset_name = record_plan.dataset_name stage.source_dataset = source_dataset_name - checkpoint_path = spec.infer.checkpoint_path.strip() - stage.checkpoint_path = checkpoint_path if not checkpoint_path and not source_dataset_name: stage.issues.append(WorkflowIssue( stage="infer", @@ -290,15 +343,12 @@ def _plan_infer( return stage source_dataset = None - if source_dataset_name: - source_dataset = self._resolve_runtime_dataset( - source_dataset_name, - allow_planned=( - (record_plan.enabled and source_dataset_name == record_plan.dataset_name) - or (train_plan.enabled and source_dataset_name == train_plan.dataset_name) - ), - ) - if source_dataset is None and not checkpoint_path: + if not checkpoint_path and source_dataset_name: + source_dataset = self._resolve_runtime_dataset(source_dataset_name) + if source_dataset is None and derived_from_train: + source_dataset = self._planned_runtime_dataset(source_dataset_name) + stage.blocked_by.append("train") + elif source_dataset is None: stage.issues.append(WorkflowIssue( stage="infer", code="source_dataset_not_found", @@ -330,25 +380,59 @@ def _plan_infer( resolved_checkpoint = _argv_value(stage.command, "--policy.path=") if resolved_checkpoint: stage.checkpoint_path = resolved_checkpoint - if not checkpoint_path and train_plan.enabled and train_plan.checkpoint_path: + if not checkpoint_path and derived_from_train: stage.notes.append("infer.checkpoint_path is derived from the planned training output.") + if stage.checkpoint_path and not Path(stage.checkpoint_path).expanduser().exists(): + if "train" not in stage.blocked_by: + stage.blocked_by.append("train") + stage.notes.append("infer cannot run until the train stage produces the checkpoint.") + else: + checkpoint_issue = _checkpoint_validation_message(stage.checkpoint_path) + if checkpoint_issue: + stage.issues.append(WorkflowIssue( + stage="infer", + code="invalid_checkpoint", + field="infer.checkpoint_path", + message=checkpoint_issue, + )) + return stage elif not checkpoint_path and source_dataset_name: stage.notes.append("infer.checkpoint_path is derived from the source dataset policy directory.") + checkpoint_issue = _checkpoint_validation_message(stage.checkpoint_path) + if checkpoint_issue: + stage.issues.append(WorkflowIssue( + stage="infer", + code="invalid_checkpoint", + field="infer.checkpoint_path", + message=checkpoint_issue, + )) + return stage - stage.ready = True + stage.ready = not stage.blocked_by return stage - def _resolve_runtime_dataset(self, name: str, *, allow_planned: bool) -> DatasetRuntimeRef | None: + def _resolve_runtime_dataset(self, name: str) -> DatasetRuntimeRef | None: try: return self._datasets.resolve_runtime_dataset(name).runtime except ValueError: - if not allow_planned: - return None - return DatasetRuntimeRef( - name=name, - repo_id=f"local/{name}", - local_path=self._datasets.root / "local" / name, - ) + return None + + def _planned_runtime_dataset(self, name: str) -> DatasetRuntimeRef: + return DatasetRuntimeRef( + name=name, + repo_id=f"local/{name}", + local_path=self._datasets.root / "local" / name, + ) + + def _prepare_output_dataset( + self, + spec: WorkflowSpec, + *, + dataset_name: str, + prefix: str, + ) -> Any: + resolved_name = dataset_name.strip() or _default_dataset_name(spec, prefix=prefix) + return self._datasets.prepare_recording_dataset(resolved_name, prefix=prefix) def _argv_value(argv: list[str], prefix: str) -> str: @@ -356,3 +440,52 @@ def _argv_value(argv: list[str], prefix: str) -> str: if item.startswith(prefix): return item.split("=", 1)[1] return "" + + +def _default_dataset_name(spec: WorkflowSpec, *, prefix: str) -> str: + slug = _slugify_name(spec.name) + if slug: + candidate = f"{prefix}_{slug}" + else: + payload = json.dumps( + spec.model_dump(mode="json", by_alias=False), + sort_keys=True, + separators=(",", ":"), + ensure_ascii=True, + ) + candidate = f"{prefix}_{hashlib.sha1(payload.encode('utf-8')).hexdigest()[:10]}" + validate_dataset_slug(candidate) + return candidate + + +def _slugify_name(value: str) -> str: + slug = re.sub(r"[^A-Za-z0-9_-]+", "_", value.strip().lower()).strip("_-") + return slug[:48] + + +def _checkpoint_validation_message(raw_path: str) -> str: + if not raw_path or _looks_like_remote_policy_id(raw_path): + return "" + + path = Path(raw_path).expanduser() + if not path.exists(): + return f"Resolved checkpoint '{path}' was not found." + if not path.is_dir(): + return f"Resolved checkpoint '{path}' must be a directory." + if not any((path / name).is_file() for name in _CHECKPOINT_CONFIG_FILES): + joined = ", ".join(_CHECKPOINT_CONFIG_FILES) + return f"Resolved checkpoint '{path}' is missing a recognized config file ({joined})." + if not any(any(path.glob(pattern)) for pattern in _CHECKPOINT_WEIGHT_PATTERNS): + joined = ", ".join(_CHECKPOINT_WEIGHT_PATTERNS) + return f"Resolved checkpoint '{path}' is missing model weights ({joined})." + return "" + + +def _looks_like_remote_policy_id(raw_path: str) -> bool: + path = Path(raw_path).expanduser() + if path.exists() or path.is_absolute(): + return False + if raw_path.startswith(("~", ".", "/")): + return False + parts = raw_path.split("/") + return len(parts) == 2 and all(parts) and not any(part in {".", ".."} for part in parts) diff --git a/tests/test_workflow_plan.py b/tests/test_workflow_plan.py index 47c0d33ae..12c8d3499 100644 --- a/tests/test_workflow_plan.py +++ b/tests/test_workflow_plan.py @@ -29,6 +29,14 @@ def _make_service(tmp_path: Path) -> EmbodiedService: return service +def _materialize_checkpoint(tmp_path: Path, name: str = "checkpoint") -> Path: + checkpoint_dir = tmp_path / name + checkpoint_dir.mkdir(parents=True, exist_ok=True) + (checkpoint_dir / "config.json").write_text("{}", encoding="utf-8") + (checkpoint_dir / "model.safetensors").write_text("", encoding="utf-8") + return checkpoint_dir + + def test_workflow_planner_compiles_record_train_infer_chain(tmp_path: Path) -> None: service = _make_service(tmp_path) plan = WorkflowPlanner(service.manifest, service.datasets).plan(WorkflowSpec.model_validate({ @@ -60,7 +68,8 @@ def test_workflow_planner_compiles_record_train_infer_chain(tmp_path: Path) -> N assert "--dataset.repo_id=local/pick_cube_v1" in record_stage.command train_stage = plan.stage("train") - assert train_stage.ready is True + assert train_stage.ready is False + assert train_stage.blocked_by == ["record"] assert train_stage.dataset_name == "pick_cube_v1" assert "--policy.type=act" in train_stage.command assert train_stage.checkpoint_path.endswith( @@ -68,7 +77,8 @@ def test_workflow_planner_compiles_record_train_infer_chain(tmp_path: Path) -> N ) infer_stage = plan.stage("infer") - assert infer_stage.ready is True + assert infer_stage.ready is False + assert infer_stage.blocked_by == ["train"] assert infer_stage.dataset_name == "eval_pick_cube_v1" assert infer_stage.source_dataset == "pick_cube_v1" assert infer_stage.checkpoint_path.endswith( @@ -77,6 +87,46 @@ def test_workflow_planner_compiles_record_train_infer_chain(tmp_path: Path) -> N assert any(arg.startswith("--policy.path=") for arg in infer_stage.command) +def test_workflow_planner_generates_stable_default_dataset_names(tmp_path: Path) -> None: + service = _make_service(tmp_path) + planner = WorkflowPlanner(service.manifest, service.datasets) + checkpoint_dir = _materialize_checkpoint(tmp_path) + spec = { + "name": "Pick Cube Pipeline", + "record": { + "enabled": True, + "task": "pick cube", + }, + "infer": { + "enabled": True, + "checkpoint_path": str(checkpoint_dir), + }, + } + + first = planner.plan(spec) + second = planner.plan(spec) + + assert first.stage("record").dataset_name == "rec_pick_cube_pipeline" + assert first.stage("infer").dataset_name == "eval_pick_cube_pipeline" + assert first.stage("record").dataset_name == second.stage("record").dataset_name + assert first.stage("infer").dataset_name == second.stage("infer").dataset_name + + +def test_workflow_planner_reports_missing_explicit_checkpoint(tmp_path: Path) -> None: + service = _make_service(tmp_path) + plan = WorkflowPlanner(service.manifest, service.datasets).plan({ + "infer": { + "enabled": True, + "checkpoint_path": str(tmp_path / "missing_checkpoint"), + }, + }) + + assert plan.ok is False + infer_stage = plan.stage("infer") + assert infer_stage.ready is False + assert any(issue.code == "invalid_checkpoint" for issue in infer_stage.issues) + + def test_workflow_planner_reports_missing_train_dataset_without_record_stage(tmp_path: Path) -> None: service = _make_service(tmp_path) plan = WorkflowPlanner(service.manifest, service.datasets).plan({ diff --git a/tests/test_workflow_routes.py b/tests/test_workflow_routes.py index a1fd3075b..400f1f358 100644 --- a/tests/test_workflow_routes.py +++ b/tests/test_workflow_routes.py @@ -83,7 +83,12 @@ def test_workflow_plan_route_returns_compiled_stages(client): payload = resp.json() assert payload["ok"] is True assert [stage["stage"] for stage in payload["stages"]] == ["record", "train", "infer"] + assert payload["stages"][0]["ready"] is True + assert payload["stages"][1]["ready"] is False + assert payload["stages"][1]["blockedBy"] == ["record"] assert payload["stages"][1]["datasetName"] == "pick_cube_v1" + assert payload["stages"][2]["ready"] is False + assert payload["stages"][2]["blockedBy"] == ["train"] assert payload["stages"][2]["checkpointPath"].endswith( "pick_cube_v1/checkpoints/last/pretrained_model" ) diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py index 4939fb6a3..4c13dbd5c 100644 --- a/tests/test_workflow_service.py +++ b/tests/test_workflow_service.py @@ -9,6 +9,7 @@ from roboclaw.embodied.embodiment.interface.video import VideoInterface from roboclaw.embodied.embodiment.manifest import Manifest from roboclaw.embodied.service import EmbodiedService +from roboclaw.embodied.workflow import WorkflowPlanner def _make_service(tmp_path: Path) -> EmbodiedService: @@ -30,8 +31,23 @@ def _make_service(tmp_path: Path) -> EmbodiedService: return service -def test_start_workflow_phase_train_inherits_record_dataset_name(tmp_path: Path) -> None: +def _materialize_runtime_dataset(tmp_path: Path, name: str) -> None: + dataset_dir = tmp_path / "datasets" / "local" / name / "meta" + dataset_dir.mkdir(parents=True, exist_ok=True) + (dataset_dir / "info.json").write_text("{}", encoding="utf-8") + + +def _materialize_checkpoint(tmp_path: Path, name: str = "checkpoint") -> Path: + checkpoint_dir = tmp_path / name + checkpoint_dir.mkdir(parents=True, exist_ok=True) + (checkpoint_dir / "config.json").write_text("{}", encoding="utf-8") + (checkpoint_dir / "model.safetensors").write_text("", encoding="utf-8") + return checkpoint_dir + + +def test_start_workflow_phase_train_uses_materialized_record_dataset_name(tmp_path: Path) -> None: service = _make_service(tmp_path) + _materialize_runtime_dataset(tmp_path, "pick_cube_v1") service.train.train = AsyncMock(return_value="Training started. Job ID: job-1") result = asyncio.run(service.start_workflow_phase({ @@ -53,50 +69,98 @@ def test_start_workflow_phase_train_inherits_record_dataset_name(tmp_path: Path) assert kwargs["dataset_name"] == "pick_cube_v1" +def test_start_workflow_phase_train_rejects_blocked_stage(tmp_path: Path) -> None: + service = _make_service(tmp_path) + + try: + asyncio.run(service.start_workflow_phase({ + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 1000, + }, + }, "train")) + except RuntimeError as exc: + assert str(exc) == "Workflow phase 'train' is waiting on: record." + else: + raise AssertionError("expected blocked train stage to raise") + + def test_start_workflow_phase_record_uses_planned_dataset_name(tmp_path: Path) -> None: service = _make_service(tmp_path) service.start_recording = AsyncMock(side_effect=lambda **kwargs: kwargs["dataset_name"]) - - result = asyncio.run(service.start_workflow_phase({ + spec = { "record": { "enabled": True, "task": "pick cube", }, - }, "record")) + } + expected = WorkflowPlanner(service.manifest, service.datasets).plan(spec).stage("record").dataset_name + + result = asyncio.run(service.start_workflow_phase(spec, "record")) assert result["status"] == "recording" - assert result["dataset_name"].startswith("rec_") + assert result["dataset_name"] == expected service.start_recording.assert_awaited_once() kwargs = service.start_recording.await_args.kwargs + assert kwargs["dataset_name"] == expected assert kwargs["dataset_name"] == result["dataset_name"] -def test_start_workflow_phase_infer_inherits_train_source_dataset(tmp_path: Path) -> None: +def test_start_workflow_phase_infer_rejects_blocked_train_checkpoint(tmp_path: Path) -> None: + service = _make_service(tmp_path) + + try: + asyncio.run(service.start_workflow_phase({ + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 1000, + }, + "infer": { + "enabled": True, + "dataset_name": "eval_pick_cube_v1", + }, + }, "infer")) + except RuntimeError as exc: + assert str(exc) == "Workflow phase 'infer' is waiting on: train." + else: + raise AssertionError("expected blocked infer stage to raise") + + +def test_start_workflow_phase_infer_uses_explicit_checkpoint_without_source_dataset(tmp_path: Path) -> None: service = _make_service(tmp_path) service.start_inference = AsyncMock(return_value=None) + checkpoint_dir = _materialize_checkpoint(tmp_path, "model") result = asyncio.run(service.start_workflow_phase({ + "name": "pick-cube-pipeline", "record": { "enabled": True, "task": "pick cube", "dataset_name": "pick_cube_v1", }, - "train": { - "enabled": True, - "policy_type": "act", - "steps": 1000, - }, "infer": { "enabled": True, - "dataset_name": "eval_pick_cube_v1", + "checkpoint_path": str(checkpoint_dir), }, }, "infer")) assert result["status"] == "inferring" - assert result["dataset_name"] == "eval_pick_cube_v1" - assert result["checkpoint_path"].endswith("pick_cube_v1/checkpoints/last/pretrained_model") + assert result["dataset_name"] == "eval_pick-cube-pipeline" + assert result["checkpoint_path"] == str(checkpoint_dir) service.start_inference.assert_awaited_once() kwargs = service.start_inference.await_args.kwargs - assert kwargs["checkpoint_path"].endswith("pick_cube_v1/checkpoints/last/pretrained_model") - assert kwargs["source_dataset"] == "pick_cube_v1" - assert kwargs["dataset_name"] == "eval_pick_cube_v1" + assert kwargs["checkpoint_path"] == str(checkpoint_dir) + assert kwargs["source_dataset"] == "" + assert kwargs["dataset_name"] == "eval_pick-cube-pipeline" From 228cd6b00118a2fad0f8b9d3c141669952c02aed Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sat, 9 May 2026 18:09:20 +0800 Subject: [PATCH 5/9] fix: stabilize workflow defaults and route coverage --- roboclaw/embodied/workflow.py | 21 ++++++++++++++++----- tests/test_dashboard_routes.py | 20 ++++++++++++++++++++ tests/test_workflow_plan.py | 24 ++++++++++++++++++++++++ tests/test_workflow_routes.py | 19 +++++++++++++++++++ 4 files changed, 79 insertions(+), 5 deletions(-) diff --git a/roboclaw/embodied/workflow.py b/roboclaw/embodied/workflow.py index e999ef6f8..0a176d5ac 100644 --- a/roboclaw/embodied/workflow.py +++ b/roboclaw/embodied/workflow.py @@ -176,6 +176,11 @@ def _plan_record(self, spec: WorkflowSpec) -> WorkflowStagePlan: spec, dataset_name=spec.record.dataset_name, prefix="rec", + payload={ + "name": spec.name, + "hardware": spec.hardware.model_dump(mode="json", by_alias=False), + "record": spec.record.model_dump(mode="json", by_alias=False), + }, ) stage.dataset_name = dataset.runtime.name stage.output_path = str(dataset.runtime.local_path) @@ -304,6 +309,11 @@ def _plan_infer( spec, dataset_name=spec.infer.dataset_name, prefix="eval", + payload={ + "name": spec.name, + "hardware": spec.hardware.model_dump(mode="json", by_alias=False), + "infer": spec.infer.model_dump(mode="json", by_alias=False), + }, ) stage.dataset_name = output_dataset.runtime.name stage.output_path = str(output_dataset.runtime.local_path) @@ -430,8 +440,9 @@ def _prepare_output_dataset( *, dataset_name: str, prefix: str, + payload: dict[str, Any], ) -> Any: - resolved_name = dataset_name.strip() or _default_dataset_name(spec, prefix=prefix) + resolved_name = dataset_name.strip() or _default_dataset_name(spec, prefix=prefix, payload=payload) return self._datasets.prepare_recording_dataset(resolved_name, prefix=prefix) @@ -442,18 +453,18 @@ def _argv_value(argv: list[str], prefix: str) -> str: return "" -def _default_dataset_name(spec: WorkflowSpec, *, prefix: str) -> str: +def _default_dataset_name(spec: WorkflowSpec, *, prefix: str, payload: dict[str, Any]) -> str: slug = _slugify_name(spec.name) if slug: candidate = f"{prefix}_{slug}" else: - payload = json.dumps( - spec.model_dump(mode="json", by_alias=False), + encoded = json.dumps( + payload, sort_keys=True, separators=(",", ":"), ensure_ascii=True, ) - candidate = f"{prefix}_{hashlib.sha1(payload.encode('utf-8')).hexdigest()[:10]}" + candidate = f"{prefix}_{hashlib.sha1(encoded.encode('utf-8')).hexdigest()[:10]}" validate_dataset_slug(candidate) return candidate diff --git a/tests/test_dashboard_routes.py b/tests/test_dashboard_routes.py index 352b82870..5a24032f3 100644 --- a/tests/test_dashboard_routes.py +++ b/tests/test_dashboard_routes.py @@ -101,6 +101,26 @@ def test_status_fields(self, client): assert field in data +class TestWorkflowRoutes: + def test_workflow_plan_route_is_registered(self, client): + resp = client.post("/api/workflows/plan", json={ + "record": { + "enabled": True, + "task": "pick cube", + "datasetName": "pick_cube_v1", + }, + "train": { + "enabled": True, + }, + }) + + assert resp.status_code == 200 + payload = resp.json() + assert payload["stages"][0]["stage"] == "record" + assert payload["stages"][1]["stage"] == "train" + assert payload["stages"][1]["blockedBy"] == ["record"] + + # --------------------------------------------------------------------------- # Session lifecycle # --------------------------------------------------------------------------- diff --git a/tests/test_workflow_plan.py b/tests/test_workflow_plan.py index 12c8d3499..4f862c0ed 100644 --- a/tests/test_workflow_plan.py +++ b/tests/test_workflow_plan.py @@ -112,6 +112,30 @@ def test_workflow_planner_generates_stable_default_dataset_names(tmp_path: Path) assert first.stage("infer").dataset_name == second.stage("infer").dataset_name +def test_workflow_planner_default_record_name_ignores_unrelated_downstream_changes(tmp_path: Path) -> None: + service = _make_service(tmp_path) + planner = WorkflowPlanner(service.manifest, service.datasets) + + base = planner.plan({ + "record": { + "enabled": True, + "task": "pick cube", + }, + }) + with_train = planner.plan({ + "record": { + "enabled": True, + "task": "pick cube", + }, + "train": { + "enabled": True, + "steps": 2000, + }, + }) + + assert base.stage("record").dataset_name == with_train.stage("record").dataset_name + + def test_workflow_planner_reports_missing_explicit_checkpoint(tmp_path: Path) -> None: service = _make_service(tmp_path) plan = WorkflowPlanner(service.manifest, service.datasets).plan({ diff --git a/tests/test_workflow_routes.py b/tests/test_workflow_routes.py index 400f1f358..33820c942 100644 --- a/tests/test_workflow_routes.py +++ b/tests/test_workflow_routes.py @@ -109,3 +109,22 @@ def test_workflow_run_route_delegates_to_service(client, app): assert resp.status_code == 200 assert resp.json() == {"status": "recording", "dataset_name": "demo"} service.start_workflow_phase.assert_awaited_once() + + +def test_workflow_run_route_returns_400_for_blocked_stage(client, app): + service = app.state.embodied_service + service.start_workflow_phase = AsyncMock(side_effect=RuntimeError("Workflow phase 'train' is waiting on: record.")) + + resp = client.post("/api/workflows/run/train", json={ + "record": { + "enabled": True, + "task": "pick cube", + "datasetName": "pick_cube_v1", + }, + "train": { + "enabled": True, + }, + }) + + assert resp.status_code == 400 + assert resp.json() == {"detail": "Workflow phase 'train' is waiting on: record."} From 2a0109dc550cf23be54e0439cdc05886f4a806b7 Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sun, 10 May 2026 13:08:36 +0800 Subject: [PATCH 6/9] test: cover workflow command compilation --- tests/test_workflow_service.py | 108 ++++++++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py index 4c13dbd5c..0505070a4 100644 --- a/tests/test_workflow_service.py +++ b/tests/test_workflow_service.py @@ -3,7 +3,7 @@ import asyncio import json from pathlib import Path -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, patch from roboclaw.embodied.embodiment.interface.serial import SerialInterface from roboclaw.embodied.embodiment.interface.video import VideoInterface @@ -164,3 +164,109 @@ def test_start_workflow_phase_infer_uses_explicit_checkpoint_without_source_data assert kwargs["checkpoint_path"] == str(checkpoint_dir) assert kwargs["source_dataset"] == "" assert kwargs["dataset_name"] == "eval_pick-cube-pipeline" + + +def test_start_workflow_phase_record_builds_record_command_from_planned_defaults(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service._require_capability = lambda *_args, **_kwargs: None + starts: list[tuple[str, list[str]]] = [] + + async def fake_start_session(_session, *, owner: str, argv: list[str]) -> None: + starts.append((owner, argv)) + + service._start_managed_session = fake_start_session + spec = { + "name": "pick-cube-pipeline", + "record": { + "enabled": True, + "task": "pick cube", + }, + } + expected = WorkflowPlanner(service.manifest, service.datasets).plan(spec).stage("record").dataset_name + + result = asyncio.run(service.start_workflow_phase(spec, "record")) + + assert result == {"status": "recording", "dataset_name": expected} + assert starts[0][0] == "recording" + argv = starts[0][1] + assert argv[:4] == [ + "/Users/pearl/anaconda3/bin/python", + "-m", + "roboclaw.embodied.command.wrapper", + "record", + ] + assert f"--dataset.repo_id=local/{expected}" in argv + assert f"--dataset.root={tmp_path / 'datasets' / 'local' / expected}" in argv + assert "--dataset.single_task=pick cube" in argv + + +def test_start_workflow_phase_train_builds_train_command_for_runtime_dataset(tmp_path: Path) -> None: + service = _make_service(tmp_path) + _materialize_runtime_dataset(tmp_path, "pick_cube_v1") + captured: list[tuple[list[str], str]] = [] + + async def fake_run_detached(self, argv: list[str], log_dir: Path) -> str: + captured.append((argv, str(log_dir))) + return "job-42" + + with patch("roboclaw.embodied.executor.SubprocessExecutor.run_detached", new=fake_run_detached): + result = asyncio.run(service.start_workflow_phase({ + "train": { + "enabled": True, + "dataset_name": "pick_cube_v1", + "policy_type": "act", + "steps": 1234, + "device": "cpu", + }, + }, "train")) + + assert result == {"message": "Training started. Job ID: job-42", "job_id": "job-42"} + argv = captured[0][0] + assert argv[0] == "lerobot-train" + assert "--dataset.repo_id=local/pick_cube_v1" in argv + assert f"--dataset.root={tmp_path / 'datasets' / 'local' / 'pick_cube_v1'}" in argv + assert f"--output_dir={tmp_path / 'policies' / 'pick_cube_v1'}" in argv + assert "--steps=1234" in argv + assert "--policy.device=cpu" in argv + + +def test_start_workflow_phase_infer_builds_command_from_explicit_checkpoint(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service._require_capability = lambda *_args, **_kwargs: None + starts: list[tuple[str, list[str]]] = [] + + async def fake_start_session(_session, *, owner: str, argv: list[str]) -> None: + starts.append((owner, argv)) + + service._start_managed_session = fake_start_session + checkpoint_dir = _materialize_checkpoint(tmp_path, "model") + + result = asyncio.run(service.start_workflow_phase({ + "name": "pick-cube-pipeline", + "infer": { + "enabled": True, + "checkpoint_path": str(checkpoint_dir), + "task": "eval pick", + "num_episodes": 2, + "episode_time_s": 15, + }, + }, "infer")) + + assert result == { + "status": "inferring", + "dataset_name": "eval_pick-cube-pipeline", + "checkpoint_path": str(checkpoint_dir), + } + assert starts[0][0] == "inferring" + argv = starts[0][1] + assert argv[:4] == [ + "/Users/pearl/anaconda3/bin/python", + "-m", + "roboclaw.embodied.command.wrapper", + "record", + ] + assert f"--policy.path={checkpoint_dir}" in argv + assert "--dataset.single_task=eval pick" in argv + assert "--dataset.num_episodes=2" in argv + assert "--dataset.episode_time_s=15" in argv + assert "--dataset.repo_id=local/eval_pick-cube-pipeline" in argv From ce25530514661bb1d6bf41ac3e7f5ddeb57381d4 Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sun, 10 May 2026 13:15:25 +0800 Subject: [PATCH 7/9] refactor: drop redundant workflow validate route --- roboclaw/http/routes/workflows.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/roboclaw/http/routes/workflows.py b/roboclaw/http/routes/workflows.py index c6aa42f58..7d1454162 100644 --- a/roboclaw/http/routes/workflows.py +++ b/roboclaw/http/routes/workflows.py @@ -13,18 +13,6 @@ def register_workflow_routes(app: FastAPI, service: EmbodiedService) -> None: - - @app.post( - "/api/workflows/validate", - summary="Validate a unified embodied workflow", - description=( - "Compile a record/train/infer workflow spec and return any issues, " - "derived datasets, and resolved commands without starting a job." - ), - ) - async def workflow_validate(body: WorkflowSpec) -> dict[str, Any]: - return service.plan_workflow(body).model_dump(by_alias=True) - @app.post( "/api/workflows/plan", summary="Preview the compiled workflow plan", From 22cf34512001bf14aaf1ceaa90f7253e40805698 Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sun, 17 May 2026 19:47:10 +0800 Subject: [PATCH 8/9] Fix CI dev dependency install --- .github/workflows/ci.yml | 2 +- pyproject.toml | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9aef7291..8b2861e6d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install .[dev] + pip install -e ".[dev]" pytest pytest-asyncio pexpect || pip install -e . pytest pytest-asyncio pexpect - name: Run tests run: python -m pytest tests/ -v diff --git a/pyproject.toml b/pyproject.toml index edd8851cd..5a566d593 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,12 @@ dependencies = [ ] [project.optional-dependencies] +dev = [ + "pytest>=9.0.0,<10.0.0", + "pytest-asyncio>=1.3.0,<2.0.0", + "pexpect>=4.9.0,<5.0.0", + "ruff>=0.1.0", +] wecom = [ "wecom-aibot-sdk-python>=0.1.5", ] From f9978b07b98eb1f5734419db9af446bf5fa95bf5 Mon Sep 17 00:00:00 2001 From: Xiaofang Wu <3642115339@qq.com> Date: Sun, 17 May 2026 20:04:29 +0800 Subject: [PATCH 9/9] Fix PR 103 CI regressions --- roboclaw/http/routes/recovery.py | 5 +++++ roboclaw/providers/factory.py | 2 +- tests/integration/test_agent_pty.py | 2 +- tests/test_hardware_monitor.py | 1 + tests/test_workflow_service.py | 5 +++-- 5 files changed, 11 insertions(+), 4 deletions(-) diff --git a/roboclaw/http/routes/recovery.py b/roboclaw/http/routes/recovery.py index dd16b952d..0d8b5c39f 100644 --- a/roboclaw/http/routes/recovery.py +++ b/roboclaw/http/routes/recovery.py @@ -29,6 +29,11 @@ async def _restart() -> None: def register_recovery_routes(app: FastAPI) -> None: + @app.get("/api/recovery/faults") + async def recovery_faults() -> dict[str, Any]: + monitor: HardwareMonitor = app.state.hardware_monitor + return {"faults": [fault.to_dict() for fault in monitor.active_faults]} + @app.post("/api/recovery/check-hardware") async def recovery_check_hardware() -> dict[str, Any]: monitor: HardwareMonitor = app.state.hardware_monitor diff --git a/roboclaw/providers/factory.py b/roboclaw/providers/factory.py index 406a4ee2f..d1b46dc3d 100644 --- a/roboclaw/providers/factory.py +++ b/roboclaw/providers/factory.py @@ -40,7 +40,7 @@ def build_provider(config: Config) -> LLMProvider: ) from roboclaw.providers.custom_provider import CustomProvider provider = CustomProvider( - api_key=provider_config.api_key if provider_config else "no-key", + api_key=(provider_config.api_key if provider_config else "") or "no-key", api_base=provider_config.api_base, default_model=model, ) diff --git a/tests/integration/test_agent_pty.py b/tests/integration/test_agent_pty.py index c04df4a49..08708548e 100644 --- a/tests/integration/test_agent_pty.py +++ b/tests/integration/test_agent_pty.py @@ -48,7 +48,7 @@ def test_agent_ctrl_c(simulated_agent_child) -> None: child = simulated_agent_child child.expect(r"You:", timeout=15) child.sendintr() - child.expect(r"Received SIGINT, goodbye!", timeout=10) + child.expect([r"Received SIGINT, goodbye!", r"Goodbye!"], timeout=10) child.close(force=True) diff --git a/tests/test_hardware_monitor.py b/tests/test_hardware_monitor.py index 63d968c0c..c48f739b3 100644 --- a/tests/test_hardware_monitor.py +++ b/tests/test_hardware_monitor.py @@ -21,6 +21,7 @@ def _stub_profile_on_disk(monkeypatch): """Report calibration JSON present so ``arm.calibrated=True`` isn't downgraded.""" monkeypatch.setattr(monitor_mod, "_has_profile_on_disk", lambda arm: True) + monkeypatch.setattr(monitor_mod, "get_missing_arm_motors", lambda arm: []) # --------------------------------------------------------------------------- diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py index 0505070a4..43a30d77f 100644 --- a/tests/test_workflow_service.py +++ b/tests/test_workflow_service.py @@ -2,6 +2,7 @@ import asyncio import json +import sys from pathlib import Path from unittest.mock import AsyncMock, patch @@ -190,7 +191,7 @@ async def fake_start_session(_session, *, owner: str, argv: list[str]) -> None: assert starts[0][0] == "recording" argv = starts[0][1] assert argv[:4] == [ - "/Users/pearl/anaconda3/bin/python", + sys.executable, "-m", "roboclaw.embodied.command.wrapper", "record", @@ -260,7 +261,7 @@ async def fake_start_session(_session, *, owner: str, argv: list[str]) -> None: assert starts[0][0] == "inferring" argv = starts[0][1] assert argv[:4] == [ - "/Users/pearl/anaconda3/bin/python", + sys.executable, "-m", "roboclaw.embodied.command.wrapper", "record",