diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9aef7291..8b2861e6d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install .[dev] + pip install -e ".[dev]" pytest pytest-asyncio pexpect || pip install -e . pytest pytest-asyncio pexpect - name: Run tests run: python -m pytest tests/ -v diff --git a/pyproject.toml b/pyproject.toml index edd8851cd..5a566d593 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,12 @@ dependencies = [ ] [project.optional-dependencies] +dev = [ + "pytest>=9.0.0,<10.0.0", + "pytest-asyncio>=1.3.0,<2.0.0", + "pexpect>=4.9.0,<5.0.0", + "ruff>=0.1.0", +] wecom = [ "wecom-aibot-sdk-python>=0.1.5", ] diff --git a/roboclaw/embodied/service/__init__.py b/roboclaw/embodied/service/__init__.py index 910214c2d..374e4b5f0 100644 --- a/roboclaw/embodied/service/__init__.py +++ b/roboclaw/embodied/service/__init__.py @@ -37,6 +37,7 @@ VerificationRequest, Verifier, ) +from roboclaw.embodied.workflow import WorkflowPlan, WorkflowPlanner, WorkflowSpec class EmbodiedService: @@ -226,6 +227,77 @@ def _verify_inference_preflight( if not result.ok: raise ActionError(result.format_violations()) + def plan_workflow(self, spec: WorkflowSpec | dict[str, Any]) -> WorkflowPlan: + """Compile a workflow spec into concrete stage plans and validations.""" + return WorkflowPlanner(self.manifest, self.datasets).plan(spec) + + async def start_workflow_phase( + self, + spec: WorkflowSpec | dict[str, Any], + phase: str, + ) -> dict[str, Any]: + """Start a workflow phase using the unified workflow spec interface.""" + workflow = spec if isinstance(spec, WorkflowSpec) else WorkflowSpec.model_validate(spec) + plan = self.plan_workflow(workflow) + stage = next((item for item in plan.stages if item.stage == phase), None) + if stage is None: + raise RuntimeError(f"Unknown workflow phase '{phase}'.") + if not stage.enabled: + raise RuntimeError(f"Workflow phase '{phase}' is disabled.") + if stage.issues: + raise RuntimeError(" · ".join(issue.message for issue in stage.issues)) + if not stage.ready: + if stage.blocked_by: + blocked = ", ".join(stage.blocked_by) + raise RuntimeError(f"Workflow phase '{phase}' is waiting on: {blocked}.") + raise RuntimeError(f"Workflow phase '{phase}' is not ready.") + + if phase == "record": + dataset_name = await self.start_recording( + task=workflow.record.task, + num_episodes=workflow.record.num_episodes, + fps=workflow.record.fps, + episode_time_s=workflow.record.episode_time_s, + reset_time_s=workflow.record.reset_time_s, + dataset_name=stage.dataset_name, + use_cameras=workflow.hardware.use_cameras, + arms=workflow.hardware.arms, + ) + return {"status": "recording", "dataset_name": dataset_name} + + if phase == "train": + result = await self.train.train( + manifest=self.manifest, + kwargs={ + "dataset_name": stage.dataset_name, + "policy_type": workflow.train.policy_type, + "steps": workflow.train.steps, + "device": workflow.train.device, + }, + tty_handoff=None, + ) + job_id = result.rsplit("Job ID:", 1)[-1].strip() if "Job ID:" in result else "" + return {"message": result, "job_id": job_id} + + if phase == "infer": + await self.start_inference( + checkpoint_path=stage.checkpoint_path, + source_dataset=stage.source_dataset, + dataset_name=stage.dataset_name, + task=workflow.infer.task, + num_episodes=workflow.infer.num_episodes, + episode_time_s=workflow.infer.episode_time_s, + arms=workflow.hardware.arms, + use_cameras=workflow.hardware.use_cameras, + ) + return { + "status": "inferring", + "dataset_name": stage.dataset_name, + "checkpoint_path": stage.checkpoint_path, + } + + raise RuntimeError(f"Workflow phase '{phase}' is not supported.") + # -- Operations (Web entry points) -- async def start_teleop(self, *, fps: int = 30, arms: str = "") -> None: @@ -293,7 +365,7 @@ async def start_inference( ) -> None: self._require_capability("infer" if use_cameras else "infer_without_cameras") output_dataset = self.datasets.prepare_recording_dataset(dataset_name, prefix="eval") - source = self.datasets.resolve_runtime_dataset(source_dataset) if source_dataset else None + source = self.datasets.resolve_runtime_dataset(source_dataset) if source_dataset and not checkpoint_path else None argv = CommandBuilder.infer( self.manifest, dataset=output_dataset.runtime, @@ -347,7 +419,7 @@ async def run_inference( ) -> str: self._require_capability("infer" if use_cameras else "infer_without_cameras") output_dataset = self.datasets.prepare_recording_dataset(dataset_name, prefix="eval") - source = self.datasets.resolve_runtime_dataset(source_dataset) if source_dataset else None + source = self.datasets.resolve_runtime_dataset(source_dataset) if source_dataset and not checkpoint_path else None argv = CommandBuilder.infer( self.manifest, dataset=output_dataset.runtime, diff --git a/roboclaw/embodied/workflow.py b/roboclaw/embodied/workflow.py new file mode 100644 index 000000000..0a176d5ac --- /dev/null +++ b/roboclaw/embodied/workflow.py @@ -0,0 +1,502 @@ +"""Unified embodied workflow specification and planning helpers.""" + +from __future__ import annotations + +import hashlib +import json +import re +from pathlib import Path +from typing import Any, Literal + +from pydantic import BaseModel, ConfigDict, Field +from pydantic.alias_generators import to_camel + +from roboclaw.data.datasets import DatasetCatalog, DatasetRuntimeRef, validate_dataset_slug +from roboclaw.embodied.command import ActionError, CommandBuilder + +WorkflowStageName = Literal["record", "train", "infer"] +_CHECKPOINT_CONFIG_FILES = ( + "config.json", + "train_config.json", + "policy_config.json", + "preprocessor_config.json", +) +_CHECKPOINT_WEIGHT_PATTERNS = ( + "model.safetensors", + "*.safetensors", + "*.pt", + "*.pth", + "*.bin", +) + + +class WorkflowModel(BaseModel): + """Base workflow model with camelCase compatibility for JSON APIs.""" + + model_config = ConfigDict( + alias_generator=to_camel, + populate_by_name=True, + extra="ignore", + ) + + +class WorkflowHardwareSpec(WorkflowModel): + """Hardware-facing options shared across workflow stages.""" + + arms: str = "" + use_cameras: bool = True + + +class RecordWorkflowSpec(WorkflowModel): + enabled: bool = False + task: str = "" + dataset_name: str = "" + num_episodes: int = 10 + fps: int = 30 + episode_time_s: int = 300 + reset_time_s: int = 10 + + +class TrainWorkflowSpec(WorkflowModel): + enabled: bool = False + dataset_name: str = "" + policy_type: str = "act" + steps: int = 100_000 + device: str = "cuda" + + +class InferWorkflowSpec(WorkflowModel): + enabled: bool = False + checkpoint_path: str = "" + source_dataset: str = "" + dataset_name: str = "" + task: str = "eval" + num_episodes: int = 1 + episode_time_s: int = 60 + + +class WorkflowSpec(WorkflowModel): + """Single spec that describes an embodied data/train/infer workflow.""" + + name: str = "" + hardware: WorkflowHardwareSpec = Field(default_factory=WorkflowHardwareSpec) + record: RecordWorkflowSpec = Field(default_factory=RecordWorkflowSpec) + train: TrainWorkflowSpec = Field(default_factory=TrainWorkflowSpec) + infer: InferWorkflowSpec = Field(default_factory=InferWorkflowSpec) + + +class WorkflowIssue(WorkflowModel): + """Validation issue found while compiling or checking a workflow.""" + + stage: WorkflowStageName + code: str + message: str + field: str = "" + + +class WorkflowStagePlan(WorkflowModel): + """Planner output for one workflow stage.""" + + stage: WorkflowStageName + enabled: bool = False + ready: bool = False + owner: str = "" + capability: str = "" + dataset_name: str = "" + source_dataset: str = "" + checkpoint_path: str = "" + output_path: str = "" + command: list[str] = Field(default_factory=list) + notes: list[str] = Field(default_factory=list) + blocked_by: list[WorkflowStageName] = Field(default_factory=list) + issues: list[WorkflowIssue] = Field(default_factory=list) + + +class WorkflowPlan(WorkflowModel): + """Complete compiled view of a workflow spec.""" + + name: str = "" + ok: bool = False + stages: list[WorkflowStagePlan] = Field(default_factory=list) + issues: list[WorkflowIssue] = Field(default_factory=list) + + def stage(self, stage_name: WorkflowStageName) -> WorkflowStagePlan: + for stage in self.stages: + if stage.stage == stage_name: + return stage + raise KeyError(stage_name) + + +class WorkflowPlanner: + """Compile a workflow spec into concrete stage plans and validations.""" + + def __init__(self, manifest: Any, datasets: DatasetCatalog) -> None: + self._manifest = manifest + self._datasets = datasets + + def plan(self, spec: WorkflowSpec | dict[str, Any]) -> WorkflowPlan: + workflow = spec if isinstance(spec, WorkflowSpec) else WorkflowSpec.model_validate(spec) + stages: list[WorkflowStagePlan] = [] + + record_plan = self._plan_record(workflow) + stages.append(record_plan) + + train_plan = self._plan_train(workflow, record_plan) + stages.append(train_plan) + + infer_plan = self._plan_infer(workflow, record_plan, train_plan) + stages.append(infer_plan) + + issues = [issue for stage in stages for issue in stage.issues] + if not any(stage.enabled for stage in stages): + issues.append(WorkflowIssue( + stage="record", + code="empty_workflow", + message="Enable at least one workflow stage.", + )) + + return WorkflowPlan( + name=workflow.name, + ok=not issues, + stages=stages, + issues=issues, + ) + + def _plan_record(self, spec: WorkflowSpec) -> WorkflowStagePlan: + stage = WorkflowStagePlan( + stage="record", + enabled=spec.record.enabled, + owner="recording", + capability="record" if spec.hardware.use_cameras else "record_without_cameras", + ) + if not spec.record.enabled: + return stage + + dataset = self._prepare_output_dataset( + spec, + dataset_name=spec.record.dataset_name, + prefix="rec", + payload={ + "name": spec.name, + "hardware": spec.hardware.model_dump(mode="json", by_alias=False), + "record": spec.record.model_dump(mode="json", by_alias=False), + }, + ) + stage.dataset_name = dataset.runtime.name + stage.output_path = str(dataset.runtime.local_path) + if not spec.record.dataset_name.strip(): + stage.notes.append( + f"record.dataset_name is omitted and resolves deterministically to '{stage.dataset_name}'." + ) + if not spec.record.task.strip(): + stage.issues.append(WorkflowIssue( + stage="record", + code="missing_task", + field="record.task", + message="record.task is required when the record stage is enabled.", + )) + + if stage.issues: + return stage + + try: + stage.command = CommandBuilder.record( + self._manifest, + dataset=dataset.runtime, + task=spec.record.task, + num_episodes=spec.record.num_episodes, + fps=spec.record.fps, + episode_time_s=spec.record.episode_time_s, + reset_time_s=spec.record.reset_time_s, + arms=spec.hardware.arms, + use_cameras=spec.hardware.use_cameras, + ) + except (ActionError, ValueError) as exc: + stage.issues.append(WorkflowIssue( + stage="record", + code="compile_error", + message=str(exc), + )) + return stage + + stage.ready = True + return stage + + def _plan_train(self, spec: WorkflowSpec, record_plan: WorkflowStagePlan) -> WorkflowStagePlan: + stage = WorkflowStagePlan( + stage="train", + enabled=spec.train.enabled, + owner="training", + capability="train", + ) + if not spec.train.enabled: + return stage + + dataset_name = spec.train.dataset_name.strip() or record_plan.dataset_name + stage.dataset_name = dataset_name + if not dataset_name: + stage.issues.append(WorkflowIssue( + stage="train", + code="missing_dataset", + field="train.dataset_name", + message="train.dataset_name is required unless the record stage feeds training.", + )) + return stage + + dataset_runtime = self._resolve_runtime_dataset(dataset_name) + inherited_from_record = ( + not spec.train.dataset_name.strip() + and record_plan.enabled + and dataset_name == record_plan.dataset_name + ) + if dataset_runtime is None: + if inherited_from_record: + dataset_runtime = self._planned_runtime_dataset(dataset_name) + stage.blocked_by.append("record") + stage.notes.append("train.dataset_name is inherited from the record stage output.") + stage.notes.append("train cannot run until the record stage materializes its dataset.") + else: + stage.issues.append(WorkflowIssue( + stage="train", + code="dataset_not_found", + field="train.dataset_name", + message=f"Runtime dataset '{dataset_name}' was not found.", + )) + return stage + elif inherited_from_record: + stage.notes.append("train.dataset_name is inherited from the record stage output.") + + try: + stage.command = CommandBuilder.train( + self._manifest, + dataset=dataset_runtime, + policy_type=spec.train.policy_type, + steps=spec.train.steps, + device=spec.train.device, + ) + except (ActionError, ValueError) as exc: + stage.issues.append(WorkflowIssue( + stage="train", + code="compile_error", + message=str(exc), + )) + return stage + + output_dir = _argv_value(stage.command, "--output_dir=") + if output_dir: + stage.output_path = output_dir + stage.checkpoint_path = str(Path(output_dir) / "checkpoints" / "last" / "pretrained_model") + + stage.ready = not stage.blocked_by + return stage + + def _plan_infer( + self, + spec: WorkflowSpec, + record_plan: WorkflowStagePlan, + train_plan: WorkflowStagePlan, + ) -> WorkflowStagePlan: + stage = WorkflowStagePlan( + stage="infer", + enabled=spec.infer.enabled, + owner="inferring", + capability="infer" if spec.hardware.use_cameras else "infer_without_cameras", + ) + if not spec.infer.enabled: + return stage + + output_dataset = self._prepare_output_dataset( + spec, + dataset_name=spec.infer.dataset_name, + prefix="eval", + payload={ + "name": spec.name, + "hardware": spec.hardware.model_dump(mode="json", by_alias=False), + "infer": spec.infer.model_dump(mode="json", by_alias=False), + }, + ) + stage.dataset_name = output_dataset.runtime.name + stage.output_path = str(output_dataset.runtime.local_path) + checkpoint_path = spec.infer.checkpoint_path.strip() + stage.checkpoint_path = checkpoint_path + if not spec.infer.dataset_name.strip(): + stage.notes.append( + f"infer.dataset_name is omitted and resolves deterministically to '{stage.dataset_name}'." + ) + if checkpoint_path: + checkpoint_issue = _checkpoint_validation_message(checkpoint_path) + if checkpoint_issue: + stage.issues.append(WorkflowIssue( + stage="infer", + code="invalid_checkpoint", + field="infer.checkpoint_path", + message=checkpoint_issue, + )) + return stage + + source_dataset_name = spec.infer.source_dataset.strip() + derived_from_train = False + if not checkpoint_path and not source_dataset_name and train_plan.enabled: + source_dataset_name = train_plan.dataset_name + derived_from_train = True + elif not checkpoint_path and not source_dataset_name: + source_dataset_name = record_plan.dataset_name + stage.source_dataset = source_dataset_name + + if not checkpoint_path and not source_dataset_name: + stage.issues.append(WorkflowIssue( + stage="infer", + code="missing_checkpoint_source", + field="infer.checkpoint_path", + message="Provide infer.checkpoint_path or a source dataset/train stage for checkpoint resolution.", + )) + return stage + + source_dataset = None + if not checkpoint_path and source_dataset_name: + source_dataset = self._resolve_runtime_dataset(source_dataset_name) + if source_dataset is None and derived_from_train: + source_dataset = self._planned_runtime_dataset(source_dataset_name) + stage.blocked_by.append("train") + elif source_dataset is None: + stage.issues.append(WorkflowIssue( + stage="infer", + code="source_dataset_not_found", + field="infer.source_dataset", + message=f"Runtime dataset '{source_dataset_name}' was not found for checkpoint resolution.", + )) + return stage + + try: + stage.command = CommandBuilder.infer( + self._manifest, + dataset=output_dataset.runtime, + checkpoint_path=checkpoint_path, + source_dataset=source_dataset, + task=spec.infer.task, + num_episodes=spec.infer.num_episodes, + episode_time_s=spec.infer.episode_time_s, + arms=spec.hardware.arms, + use_cameras=spec.hardware.use_cameras, + ) + except (ActionError, ValueError) as exc: + stage.issues.append(WorkflowIssue( + stage="infer", + code="compile_error", + message=str(exc), + )) + return stage + + resolved_checkpoint = _argv_value(stage.command, "--policy.path=") + if resolved_checkpoint: + stage.checkpoint_path = resolved_checkpoint + if not checkpoint_path and derived_from_train: + stage.notes.append("infer.checkpoint_path is derived from the planned training output.") + if stage.checkpoint_path and not Path(stage.checkpoint_path).expanduser().exists(): + if "train" not in stage.blocked_by: + stage.blocked_by.append("train") + stage.notes.append("infer cannot run until the train stage produces the checkpoint.") + else: + checkpoint_issue = _checkpoint_validation_message(stage.checkpoint_path) + if checkpoint_issue: + stage.issues.append(WorkflowIssue( + stage="infer", + code="invalid_checkpoint", + field="infer.checkpoint_path", + message=checkpoint_issue, + )) + return stage + elif not checkpoint_path and source_dataset_name: + stage.notes.append("infer.checkpoint_path is derived from the source dataset policy directory.") + checkpoint_issue = _checkpoint_validation_message(stage.checkpoint_path) + if checkpoint_issue: + stage.issues.append(WorkflowIssue( + stage="infer", + code="invalid_checkpoint", + field="infer.checkpoint_path", + message=checkpoint_issue, + )) + return stage + + stage.ready = not stage.blocked_by + return stage + + def _resolve_runtime_dataset(self, name: str) -> DatasetRuntimeRef | None: + try: + return self._datasets.resolve_runtime_dataset(name).runtime + except ValueError: + return None + + def _planned_runtime_dataset(self, name: str) -> DatasetRuntimeRef: + return DatasetRuntimeRef( + name=name, + repo_id=f"local/{name}", + local_path=self._datasets.root / "local" / name, + ) + + def _prepare_output_dataset( + self, + spec: WorkflowSpec, + *, + dataset_name: str, + prefix: str, + payload: dict[str, Any], + ) -> Any: + resolved_name = dataset_name.strip() or _default_dataset_name(spec, prefix=prefix, payload=payload) + return self._datasets.prepare_recording_dataset(resolved_name, prefix=prefix) + + +def _argv_value(argv: list[str], prefix: str) -> str: + for item in argv: + if item.startswith(prefix): + return item.split("=", 1)[1] + return "" + + +def _default_dataset_name(spec: WorkflowSpec, *, prefix: str, payload: dict[str, Any]) -> str: + slug = _slugify_name(spec.name) + if slug: + candidate = f"{prefix}_{slug}" + else: + encoded = json.dumps( + payload, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=True, + ) + candidate = f"{prefix}_{hashlib.sha1(encoded.encode('utf-8')).hexdigest()[:10]}" + validate_dataset_slug(candidate) + return candidate + + +def _slugify_name(value: str) -> str: + slug = re.sub(r"[^A-Za-z0-9_-]+", "_", value.strip().lower()).strip("_-") + return slug[:48] + + +def _checkpoint_validation_message(raw_path: str) -> str: + if not raw_path or _looks_like_remote_policy_id(raw_path): + return "" + + path = Path(raw_path).expanduser() + if not path.exists(): + return f"Resolved checkpoint '{path}' was not found." + if not path.is_dir(): + return f"Resolved checkpoint '{path}' must be a directory." + if not any((path / name).is_file() for name in _CHECKPOINT_CONFIG_FILES): + joined = ", ".join(_CHECKPOINT_CONFIG_FILES) + return f"Resolved checkpoint '{path}' is missing a recognized config file ({joined})." + if not any(any(path.glob(pattern)) for pattern in _CHECKPOINT_WEIGHT_PATTERNS): + joined = ", ".join(_CHECKPOINT_WEIGHT_PATTERNS) + return f"Resolved checkpoint '{path}' is missing model weights ({joined})." + return "" + + +def _looks_like_remote_policy_id(raw_path: str) -> bool: + path = Path(raw_path).expanduser() + if path.exists() or path.is_absolute(): + return False + if raw_path.startswith(("~", ".", "/")): + return False + parts = raw_path.split("/") + return len(parts) == 2 and all(parts) and not any(part in {".", ".."} for part in parts) diff --git a/roboclaw/http/routes/__init__.py b/roboclaw/http/routes/__init__.py index c30d91a01..37a947943 100644 --- a/roboclaw/http/routes/__init__.py +++ b/roboclaw/http/routes/__init__.py @@ -29,6 +29,7 @@ def register_all_routes( from roboclaw.http.routes.train import register_train_routes from roboclaw.http.routes.infer import register_infer_routes from roboclaw.http.routes.hub import register_hub_routes + from roboclaw.http.routes.workflows import register_workflow_routes from roboclaw.http.routes.chat_uploads import register_chat_upload_routes register_chat_upload_routes(app) @@ -45,6 +46,7 @@ def register_all_routes( register_train_routes(app, service) register_infer_routes(app, service) register_hub_routes(app, service) + register_workflow_routes(app, service) from roboclaw.http.routes.curation import register_curation_routes from roboclaw.http.routes.explorer import register_explorer_routes diff --git a/roboclaw/http/routes/recovery.py b/roboclaw/http/routes/recovery.py index dd16b952d..0d8b5c39f 100644 --- a/roboclaw/http/routes/recovery.py +++ b/roboclaw/http/routes/recovery.py @@ -29,6 +29,11 @@ async def _restart() -> None: def register_recovery_routes(app: FastAPI) -> None: + @app.get("/api/recovery/faults") + async def recovery_faults() -> dict[str, Any]: + monitor: HardwareMonitor = app.state.hardware_monitor + return {"faults": [fault.to_dict() for fault in monitor.active_faults]} + @app.post("/api/recovery/check-hardware") async def recovery_check_hardware() -> dict[str, Any]: monitor: HardwareMonitor = app.state.hardware_monitor diff --git a/roboclaw/http/routes/workflows.py b/roboclaw/http/routes/workflows.py new file mode 100644 index 000000000..7d1454162 --- /dev/null +++ b/roboclaw/http/routes/workflows.py @@ -0,0 +1,39 @@ +"""Workflow routes — unified embodied workflow planning and execution.""" + +from __future__ import annotations + +from typing import Any, Literal + +from fastapi import FastAPI, HTTPException + +from roboclaw.embodied.service import EmbodiedService +from roboclaw.embodied.workflow import WorkflowSpec + +WorkflowPhase = Literal["record", "train", "infer"] + + +def register_workflow_routes(app: FastAPI, service: EmbodiedService) -> None: + @app.post( + "/api/workflows/plan", + summary="Preview the compiled workflow plan", + description=( + "Resolve cross-stage dataset flow, checkpoint paths, and concrete commands " + "for a unified embodied workflow spec." + ), + ) + async def workflow_plan(body: WorkflowSpec) -> dict[str, Any]: + return service.plan_workflow(body).model_dump(by_alias=True) + + @app.post( + "/api/workflows/run/{phase}", + summary="Run one phase from a validated workflow", + description=( + "Execute a single workflow phase using the same derived dataset and " + "checkpoint values shown in the workflow plan." + ), + ) + async def workflow_run(phase: WorkflowPhase, body: WorkflowSpec) -> dict[str, Any]: + try: + return await service.start_workflow_phase(body, phase) + except (RuntimeError, ValueError) as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc diff --git a/roboclaw/providers/factory.py b/roboclaw/providers/factory.py index 406a4ee2f..d1b46dc3d 100644 --- a/roboclaw/providers/factory.py +++ b/roboclaw/providers/factory.py @@ -40,7 +40,7 @@ def build_provider(config: Config) -> LLMProvider: ) from roboclaw.providers.custom_provider import CustomProvider provider = CustomProvider( - api_key=provider_config.api_key if provider_config else "no-key", + api_key=(provider_config.api_key if provider_config else "") or "no-key", api_base=provider_config.api_base, default_model=model, ) diff --git a/tests/integration/test_agent_pty.py b/tests/integration/test_agent_pty.py index c04df4a49..08708548e 100644 --- a/tests/integration/test_agent_pty.py +++ b/tests/integration/test_agent_pty.py @@ -48,7 +48,7 @@ def test_agent_ctrl_c(simulated_agent_child) -> None: child = simulated_agent_child child.expect(r"You:", timeout=15) child.sendintr() - child.expect(r"Received SIGINT, goodbye!", timeout=10) + child.expect([r"Received SIGINT, goodbye!", r"Goodbye!"], timeout=10) child.close(force=True) diff --git a/tests/test_dashboard_routes.py b/tests/test_dashboard_routes.py index 352b82870..5a24032f3 100644 --- a/tests/test_dashboard_routes.py +++ b/tests/test_dashboard_routes.py @@ -101,6 +101,26 @@ def test_status_fields(self, client): assert field in data +class TestWorkflowRoutes: + def test_workflow_plan_route_is_registered(self, client): + resp = client.post("/api/workflows/plan", json={ + "record": { + "enabled": True, + "task": "pick cube", + "datasetName": "pick_cube_v1", + }, + "train": { + "enabled": True, + }, + }) + + assert resp.status_code == 200 + payload = resp.json() + assert payload["stages"][0]["stage"] == "record" + assert payload["stages"][1]["stage"] == "train" + assert payload["stages"][1]["blockedBy"] == ["record"] + + # --------------------------------------------------------------------------- # Session lifecycle # --------------------------------------------------------------------------- diff --git a/tests/test_hardware_monitor.py b/tests/test_hardware_monitor.py index 63d968c0c..c48f739b3 100644 --- a/tests/test_hardware_monitor.py +++ b/tests/test_hardware_monitor.py @@ -21,6 +21,7 @@ def _stub_profile_on_disk(monkeypatch): """Report calibration JSON present so ``arm.calibrated=True`` isn't downgraded.""" monkeypatch.setattr(monitor_mod, "_has_profile_on_disk", lambda arm: True) + monkeypatch.setattr(monitor_mod, "get_missing_arm_motors", lambda arm: []) # --------------------------------------------------------------------------- diff --git a/tests/test_workflow_plan.py b/tests/test_workflow_plan.py new file mode 100644 index 000000000..4f862c0ed --- /dev/null +++ b/tests/test_workflow_plan.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from roboclaw.embodied.embodiment.interface.serial import SerialInterface +from roboclaw.embodied.embodiment.interface.video import VideoInterface +from roboclaw.embodied.embodiment.manifest import Manifest +from roboclaw.embodied.service import EmbodiedService +from roboclaw.embodied.workflow import WorkflowPlanner, WorkflowSpec + + +def _make_service(tmp_path: Path) -> EmbodiedService: + manifest_data = { + "version": 2, + "arms": [], + "hands": [], + "cameras": [], + "datasets": {"root": str(tmp_path / "datasets")}, + "policies": {"root": str(tmp_path / "policies")}, + } + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text(json.dumps(manifest_data), encoding="utf-8") + manifest = Manifest(path=manifest_path) + service = EmbodiedService(manifest=manifest) + service.bind_arm("follower", "so101_follower", SerialInterface(dev="/tmp/follower")) + service.bind_arm("leader", "so101_leader", SerialInterface(dev="/tmp/leader")) + service.bind_camera("wrist", VideoInterface(dev="/tmp/wrist")) + return service + + +def _materialize_checkpoint(tmp_path: Path, name: str = "checkpoint") -> Path: + checkpoint_dir = tmp_path / name + checkpoint_dir.mkdir(parents=True, exist_ok=True) + (checkpoint_dir / "config.json").write_text("{}", encoding="utf-8") + (checkpoint_dir / "model.safetensors").write_text("", encoding="utf-8") + return checkpoint_dir + + +def test_workflow_planner_compiles_record_train_infer_chain(tmp_path: Path) -> None: + service = _make_service(tmp_path) + plan = WorkflowPlanner(service.manifest, service.datasets).plan(WorkflowSpec.model_validate({ + "name": "pick-cube-pipeline", + "hardware": {"use_cameras": True}, + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + "num_episodes": 5, + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 2000, + }, + "infer": { + "enabled": True, + "dataset_name": "eval_pick_cube_v1", + "num_episodes": 2, + }, + })) + + assert plan.ok is True + + record_stage = plan.stage("record") + assert record_stage.ready is True + assert record_stage.dataset_name == "pick_cube_v1" + assert "--dataset.repo_id=local/pick_cube_v1" in record_stage.command + + train_stage = plan.stage("train") + assert train_stage.ready is False + assert train_stage.blocked_by == ["record"] + assert train_stage.dataset_name == "pick_cube_v1" + assert "--policy.type=act" in train_stage.command + assert train_stage.checkpoint_path.endswith( + "pick_cube_v1/checkpoints/last/pretrained_model" + ) + + infer_stage = plan.stage("infer") + assert infer_stage.ready is False + assert infer_stage.blocked_by == ["train"] + assert infer_stage.dataset_name == "eval_pick_cube_v1" + assert infer_stage.source_dataset == "pick_cube_v1" + assert infer_stage.checkpoint_path.endswith( + "pick_cube_v1/checkpoints/last/pretrained_model" + ) + assert any(arg.startswith("--policy.path=") for arg in infer_stage.command) + + +def test_workflow_planner_generates_stable_default_dataset_names(tmp_path: Path) -> None: + service = _make_service(tmp_path) + planner = WorkflowPlanner(service.manifest, service.datasets) + checkpoint_dir = _materialize_checkpoint(tmp_path) + spec = { + "name": "Pick Cube Pipeline", + "record": { + "enabled": True, + "task": "pick cube", + }, + "infer": { + "enabled": True, + "checkpoint_path": str(checkpoint_dir), + }, + } + + first = planner.plan(spec) + second = planner.plan(spec) + + assert first.stage("record").dataset_name == "rec_pick_cube_pipeline" + assert first.stage("infer").dataset_name == "eval_pick_cube_pipeline" + assert first.stage("record").dataset_name == second.stage("record").dataset_name + assert first.stage("infer").dataset_name == second.stage("infer").dataset_name + + +def test_workflow_planner_default_record_name_ignores_unrelated_downstream_changes(tmp_path: Path) -> None: + service = _make_service(tmp_path) + planner = WorkflowPlanner(service.manifest, service.datasets) + + base = planner.plan({ + "record": { + "enabled": True, + "task": "pick cube", + }, + }) + with_train = planner.plan({ + "record": { + "enabled": True, + "task": "pick cube", + }, + "train": { + "enabled": True, + "steps": 2000, + }, + }) + + assert base.stage("record").dataset_name == with_train.stage("record").dataset_name + + +def test_workflow_planner_reports_missing_explicit_checkpoint(tmp_path: Path) -> None: + service = _make_service(tmp_path) + plan = WorkflowPlanner(service.manifest, service.datasets).plan({ + "infer": { + "enabled": True, + "checkpoint_path": str(tmp_path / "missing_checkpoint"), + }, + }) + + assert plan.ok is False + infer_stage = plan.stage("infer") + assert infer_stage.ready is False + assert any(issue.code == "invalid_checkpoint" for issue in infer_stage.issues) + + +def test_workflow_planner_reports_missing_train_dataset_without_record_stage(tmp_path: Path) -> None: + service = _make_service(tmp_path) + plan = WorkflowPlanner(service.manifest, service.datasets).plan({ + "train": {"enabled": True}, + }) + + assert plan.ok is False + train_stage = plan.stage("train") + assert train_stage.ready is False + assert any(issue.code == "missing_dataset" for issue in train_stage.issues) diff --git a/tests/test_workflow_routes.py b/tests/test_workflow_routes.py new file mode 100644 index 000000000..33820c942 --- /dev/null +++ b/tests/test_workflow_routes.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +pytest.importorskip("fastapi") +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from roboclaw.embodied.board import Board +from roboclaw.embodied.embodiment.hardware.monitor import HardwareMonitor +from roboclaw.embodied.embodiment.interface.serial import SerialInterface +from roboclaw.embodied.embodiment.interface.video import VideoInterface +from roboclaw.embodied.embodiment.manifest import Manifest +from roboclaw.embodied.service import EmbodiedService +from roboclaw.http.routes.workflows import register_workflow_routes + + +@pytest.fixture(autouse=True) +def isolated_roboclaw_home(tmp_path): + with patch( + "roboclaw.embodied.embodiment.lock.get_roboclaw_home", + return_value=tmp_path, + ), patch( + "roboclaw.embodied.embodiment.manifest.helpers.get_roboclaw_home", + return_value=tmp_path, + ): + yield + + +@pytest.fixture() +def app(tmp_path): + app = FastAPI() + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text(json.dumps({ + "version": 2, + "arms": [], + "hands": [], + "cameras": [], + "datasets": {"root": str(tmp_path / "datasets")}, + "policies": {"root": str(tmp_path / "policies")}, + }), encoding="utf-8") + board = Board() + manifest = Manifest(path=manifest_path, board=board) + hw_monitor = HardwareMonitor(board=board, manifest=manifest) + service = EmbodiedService(hardware_monitor=hw_monitor, board=board, manifest=manifest) + service.bind_arm("follower", "so101_follower", SerialInterface(dev="/tmp/follower")) + service.bind_arm("leader", "so101_leader", SerialInterface(dev="/tmp/leader")) + service.bind_camera("wrist", VideoInterface(dev="/tmp/wrist")) + register_workflow_routes(app, service) + app.state.embodied_service = service + return app + + +@pytest.fixture() +def client(app): + return TestClient(app, raise_server_exceptions=False) + + +def test_workflow_plan_route_returns_compiled_stages(client): + resp = client.post("/api/workflows/plan", json={ + "name": "pick-cube-pipeline", + "hardware": {"useCameras": True}, + "record": { + "enabled": True, + "task": "pick cube", + "datasetName": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policyType": "act", + }, + "infer": { + "enabled": True, + "datasetName": "eval_pick_cube_v1", + }, + }) + + assert resp.status_code == 200 + payload = resp.json() + assert payload["ok"] is True + assert [stage["stage"] for stage in payload["stages"]] == ["record", "train", "infer"] + assert payload["stages"][0]["ready"] is True + assert payload["stages"][1]["ready"] is False + assert payload["stages"][1]["blockedBy"] == ["record"] + assert payload["stages"][1]["datasetName"] == "pick_cube_v1" + assert payload["stages"][2]["ready"] is False + assert payload["stages"][2]["blockedBy"] == ["train"] + assert payload["stages"][2]["checkpointPath"].endswith( + "pick_cube_v1/checkpoints/last/pretrained_model" + ) + + +def test_workflow_run_route_delegates_to_service(client, app): + service = app.state.embodied_service + service.start_workflow_phase = AsyncMock(return_value={"status": "recording", "dataset_name": "demo"}) + + resp = client.post("/api/workflows/run/record", json={ + "record": { + "enabled": True, + "task": "pick cube", + "datasetName": "demo", + }, + }) + + assert resp.status_code == 200 + assert resp.json() == {"status": "recording", "dataset_name": "demo"} + service.start_workflow_phase.assert_awaited_once() + + +def test_workflow_run_route_returns_400_for_blocked_stage(client, app): + service = app.state.embodied_service + service.start_workflow_phase = AsyncMock(side_effect=RuntimeError("Workflow phase 'train' is waiting on: record.")) + + resp = client.post("/api/workflows/run/train", json={ + "record": { + "enabled": True, + "task": "pick cube", + "datasetName": "pick_cube_v1", + }, + "train": { + "enabled": True, + }, + }) + + assert resp.status_code == 400 + assert resp.json() == {"detail": "Workflow phase 'train' is waiting on: record."} diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py new file mode 100644 index 000000000..43a30d77f --- /dev/null +++ b/tests/test_workflow_service.py @@ -0,0 +1,273 @@ +from __future__ import annotations + +import asyncio +import json +import sys +from pathlib import Path +from unittest.mock import AsyncMock, patch + +from roboclaw.embodied.embodiment.interface.serial import SerialInterface +from roboclaw.embodied.embodiment.interface.video import VideoInterface +from roboclaw.embodied.embodiment.manifest import Manifest +from roboclaw.embodied.service import EmbodiedService +from roboclaw.embodied.workflow import WorkflowPlanner + + +def _make_service(tmp_path: Path) -> EmbodiedService: + manifest_data = { + "version": 2, + "arms": [], + "hands": [], + "cameras": [], + "datasets": {"root": str(tmp_path / "datasets")}, + "policies": {"root": str(tmp_path / "policies")}, + } + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text(json.dumps(manifest_data), encoding="utf-8") + manifest = Manifest(path=manifest_path) + service = EmbodiedService(manifest=manifest) + service.bind_arm("follower", "so101_follower", SerialInterface(dev="/tmp/follower")) + service.bind_arm("leader", "so101_leader", SerialInterface(dev="/tmp/leader")) + service.bind_camera("wrist", VideoInterface(dev="/tmp/wrist")) + return service + + +def _materialize_runtime_dataset(tmp_path: Path, name: str) -> None: + dataset_dir = tmp_path / "datasets" / "local" / name / "meta" + dataset_dir.mkdir(parents=True, exist_ok=True) + (dataset_dir / "info.json").write_text("{}", encoding="utf-8") + + +def _materialize_checkpoint(tmp_path: Path, name: str = "checkpoint") -> Path: + checkpoint_dir = tmp_path / name + checkpoint_dir.mkdir(parents=True, exist_ok=True) + (checkpoint_dir / "config.json").write_text("{}", encoding="utf-8") + (checkpoint_dir / "model.safetensors").write_text("", encoding="utf-8") + return checkpoint_dir + + +def test_start_workflow_phase_train_uses_materialized_record_dataset_name(tmp_path: Path) -> None: + service = _make_service(tmp_path) + _materialize_runtime_dataset(tmp_path, "pick_cube_v1") + service.train.train = AsyncMock(return_value="Training started. Job ID: job-1") + + result = asyncio.run(service.start_workflow_phase({ + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 1000, + }, + }, "train")) + + assert result == {"message": "Training started. Job ID: job-1", "job_id": "job-1"} + service.train.train.assert_awaited_once() + kwargs = service.train.train.await_args.kwargs["kwargs"] + assert kwargs["dataset_name"] == "pick_cube_v1" + + +def test_start_workflow_phase_train_rejects_blocked_stage(tmp_path: Path) -> None: + service = _make_service(tmp_path) + + try: + asyncio.run(service.start_workflow_phase({ + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 1000, + }, + }, "train")) + except RuntimeError as exc: + assert str(exc) == "Workflow phase 'train' is waiting on: record." + else: + raise AssertionError("expected blocked train stage to raise") + + +def test_start_workflow_phase_record_uses_planned_dataset_name(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service.start_recording = AsyncMock(side_effect=lambda **kwargs: kwargs["dataset_name"]) + spec = { + "record": { + "enabled": True, + "task": "pick cube", + }, + } + expected = WorkflowPlanner(service.manifest, service.datasets).plan(spec).stage("record").dataset_name + + result = asyncio.run(service.start_workflow_phase(spec, "record")) + + assert result["status"] == "recording" + assert result["dataset_name"] == expected + service.start_recording.assert_awaited_once() + kwargs = service.start_recording.await_args.kwargs + assert kwargs["dataset_name"] == expected + assert kwargs["dataset_name"] == result["dataset_name"] + + +def test_start_workflow_phase_infer_rejects_blocked_train_checkpoint(tmp_path: Path) -> None: + service = _make_service(tmp_path) + + try: + asyncio.run(service.start_workflow_phase({ + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + }, + "train": { + "enabled": True, + "policy_type": "act", + "steps": 1000, + }, + "infer": { + "enabled": True, + "dataset_name": "eval_pick_cube_v1", + }, + }, "infer")) + except RuntimeError as exc: + assert str(exc) == "Workflow phase 'infer' is waiting on: train." + else: + raise AssertionError("expected blocked infer stage to raise") + + +def test_start_workflow_phase_infer_uses_explicit_checkpoint_without_source_dataset(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service.start_inference = AsyncMock(return_value=None) + checkpoint_dir = _materialize_checkpoint(tmp_path, "model") + + result = asyncio.run(service.start_workflow_phase({ + "name": "pick-cube-pipeline", + "record": { + "enabled": True, + "task": "pick cube", + "dataset_name": "pick_cube_v1", + }, + "infer": { + "enabled": True, + "checkpoint_path": str(checkpoint_dir), + }, + }, "infer")) + + assert result["status"] == "inferring" + assert result["dataset_name"] == "eval_pick-cube-pipeline" + assert result["checkpoint_path"] == str(checkpoint_dir) + service.start_inference.assert_awaited_once() + kwargs = service.start_inference.await_args.kwargs + assert kwargs["checkpoint_path"] == str(checkpoint_dir) + assert kwargs["source_dataset"] == "" + assert kwargs["dataset_name"] == "eval_pick-cube-pipeline" + + +def test_start_workflow_phase_record_builds_record_command_from_planned_defaults(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service._require_capability = lambda *_args, **_kwargs: None + starts: list[tuple[str, list[str]]] = [] + + async def fake_start_session(_session, *, owner: str, argv: list[str]) -> None: + starts.append((owner, argv)) + + service._start_managed_session = fake_start_session + spec = { + "name": "pick-cube-pipeline", + "record": { + "enabled": True, + "task": "pick cube", + }, + } + expected = WorkflowPlanner(service.manifest, service.datasets).plan(spec).stage("record").dataset_name + + result = asyncio.run(service.start_workflow_phase(spec, "record")) + + assert result == {"status": "recording", "dataset_name": expected} + assert starts[0][0] == "recording" + argv = starts[0][1] + assert argv[:4] == [ + sys.executable, + "-m", + "roboclaw.embodied.command.wrapper", + "record", + ] + assert f"--dataset.repo_id=local/{expected}" in argv + assert f"--dataset.root={tmp_path / 'datasets' / 'local' / expected}" in argv + assert "--dataset.single_task=pick cube" in argv + + +def test_start_workflow_phase_train_builds_train_command_for_runtime_dataset(tmp_path: Path) -> None: + service = _make_service(tmp_path) + _materialize_runtime_dataset(tmp_path, "pick_cube_v1") + captured: list[tuple[list[str], str]] = [] + + async def fake_run_detached(self, argv: list[str], log_dir: Path) -> str: + captured.append((argv, str(log_dir))) + return "job-42" + + with patch("roboclaw.embodied.executor.SubprocessExecutor.run_detached", new=fake_run_detached): + result = asyncio.run(service.start_workflow_phase({ + "train": { + "enabled": True, + "dataset_name": "pick_cube_v1", + "policy_type": "act", + "steps": 1234, + "device": "cpu", + }, + }, "train")) + + assert result == {"message": "Training started. Job ID: job-42", "job_id": "job-42"} + argv = captured[0][0] + assert argv[0] == "lerobot-train" + assert "--dataset.repo_id=local/pick_cube_v1" in argv + assert f"--dataset.root={tmp_path / 'datasets' / 'local' / 'pick_cube_v1'}" in argv + assert f"--output_dir={tmp_path / 'policies' / 'pick_cube_v1'}" in argv + assert "--steps=1234" in argv + assert "--policy.device=cpu" in argv + + +def test_start_workflow_phase_infer_builds_command_from_explicit_checkpoint(tmp_path: Path) -> None: + service = _make_service(tmp_path) + service._require_capability = lambda *_args, **_kwargs: None + starts: list[tuple[str, list[str]]] = [] + + async def fake_start_session(_session, *, owner: str, argv: list[str]) -> None: + starts.append((owner, argv)) + + service._start_managed_session = fake_start_session + checkpoint_dir = _materialize_checkpoint(tmp_path, "model") + + result = asyncio.run(service.start_workflow_phase({ + "name": "pick-cube-pipeline", + "infer": { + "enabled": True, + "checkpoint_path": str(checkpoint_dir), + "task": "eval pick", + "num_episodes": 2, + "episode_time_s": 15, + }, + }, "infer")) + + assert result == { + "status": "inferring", + "dataset_name": "eval_pick-cube-pipeline", + "checkpoint_path": str(checkpoint_dir), + } + assert starts[0][0] == "inferring" + argv = starts[0][1] + assert argv[:4] == [ + sys.executable, + "-m", + "roboclaw.embodied.command.wrapper", + "record", + ] + assert f"--policy.path={checkpoint_dir}" in argv + assert "--dataset.single_task=eval pick" in argv + assert "--dataset.num_episodes=2" in argv + assert "--dataset.episode_time_s=15" in argv + assert "--dataset.repo_id=local/eval_pick-cube-pipeline" in argv