From 064037c84a0d50d77be9b3b6a89c5ac4e6d43dcd Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sat, 28 Mar 2026 08:41:13 +0200 Subject: [PATCH] chore(rpc): remove old rpc linking --- AGENTS.md | 2 - dimos/agents/skills/gps_nav_skill.py | 13 +- dimos/agents/skills/navigation.py | 87 ++++--------- dimos/agents/skills/test_gps_nav_skills.py | 19 ++- dimos/agents/skills/test_navigation.py | 64 +++++++++- .../skills/test_unitree_skill_container.py | 41 +++++-- dimos/agents/vlm_agent_spec.py | 24 ++++ dimos/agents/vlm_stream_tester.py | 14 +-- dimos/core/blueprints.py | 116 +++--------------- dimos/core/docker_module.py | 24 +--- dimos/core/introspection/module/info.py | 2 - dimos/core/module.py | 29 ----- dimos/core/rpc_client.py | 3 - dimos/core/test_blueprints.py | 15 +-- dimos/core/test_core.py | 2 +- dimos/manipulation/control/arm_driver_spec.py | 24 ++++ .../cartesian_motion_controller.py | 49 +------- dimos/manipulation/grasping/grasp_gen_spec.py | 27 ++++ dimos/manipulation/grasping/grasping.py | 34 ++--- dimos/navigation/navigation_spec.py | 26 ++++ .../object_scene_registration_spec.py | 29 +++++ dimos/perception/object_tracking_spec.py | 23 ++++ dimos/perception/spatial_memory_spec.py | 24 ++++ dimos/robot/unitree/g1/connection_spec.py | 23 ++++ dimos/robot/unitree/g1/skill_container.py | 13 +- dimos/robot/unitree/go2/connection_spec.py | 21 ++++ .../robot/unitree/unitree_skill_container.py | 35 ++---- dimos/web/websocket_vis_spec.py | 22 ++++ docs/usage/blueprints.md | 14 +++ 29 files changed, 449 insertions(+), 370 deletions(-) create mode 100644 dimos/agents/vlm_agent_spec.py create mode 100644 dimos/manipulation/control/arm_driver_spec.py create mode 100644 dimos/manipulation/grasping/grasp_gen_spec.py create mode 100644 dimos/navigation/navigation_spec.py create mode 100644 dimos/perception/object_scene_registration_spec.py create mode 100644 dimos/perception/object_tracking_spec.py create mode 100644 dimos/perception/spatial_memory_spec.py create mode 100644 dimos/robot/unitree/g1/connection_spec.py create mode 100644 dimos/robot/unitree/go2/connection_spec.py create mode 100644 dimos/web/websocket_vis_spec.py diff --git a/AGENTS.md b/AGENTS.md index 3c18fd685d..63e3eadf18 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -303,8 +303,6 @@ class MySkillContainer(Module): If multiple modules match the spec, use `.remappings()` to resolve. Source: `dimos/spec/utils.py`, `dimos/core/blueprints.py`. -**Legacy**: existing skill containers use `rpc_calls: list[str]` + `get_rpc_calls("ClassName.method")`. This still works but wiring failures are silent and only surface at runtime. Don't use it in new code. - ### Adding a New Skill 1. Pick the right container (robot-specific or `dimos/agents/skills/`). diff --git a/dimos/agents/skills/gps_nav_skill.py b/dimos/agents/skills/gps_nav_skill.py index c6f86951be..52f4e726c5 100644 --- a/dimos/agents/skills/gps_nav_skill.py +++ b/dimos/agents/skills/gps_nav_skill.py @@ -17,11 +17,11 @@ from dimos.agents.annotation import skill from dimos.core.core import rpc from dimos.core.module import Module -from dimos.core.rpc_client import RpcCall from dimos.core.stream import In, Out from dimos.mapping.models import LatLon from dimos.mapping.utils.distance import distance_in_meters from dimos.utils.logging_config import setup_logger +from dimos.web.websocket_vis_spec import WebsocketVisSpec logger = setup_logger() @@ -29,7 +29,8 @@ class GpsNavSkillContainer(Module): _latest_location: LatLon | None = None _max_valid_distance: int = 50000 - _set_gps_travel_goal_points: RpcCall | None = None + + _vis: WebsocketVisSpec gps_location: In[LatLon] gps_goal: Out[LatLon] @@ -43,11 +44,6 @@ def start(self) -> None: def stop(self) -> None: super().stop() - @rpc - def set_WebsocketVisModule_set_gps_travel_goal_points(self, callable: RpcCall) -> None: - self._set_gps_travel_goal_points = callable - self._set_gps_travel_goal_points.set_rpc(self.rpc) # type: ignore[arg-type] - def _on_gps_location(self, location: LatLon) -> None: self._latest_location = location @@ -83,8 +79,7 @@ def set_gps_travel_points(self, points: list[dict[str, float]]) -> str: if self.gps_goal._transport is not None: self.gps_goal.publish(new_points) # type: ignore[arg-type] - if self._set_gps_travel_goal_points: - self._set_gps_travel_goal_points(new_points) + self._vis.set_gps_travel_goal_points(new_points) # type: ignore[arg-type] return "I've successfully set the travel points." diff --git a/dimos/agents/skills/navigation.py b/dimos/agents/skills/navigation.py index e366465959..d028f9847c 100644 --- a/dimos/agents/skills/navigation.py +++ b/dimos/agents/skills/navigation.py @@ -27,7 +27,10 @@ from dimos.msgs.geometry_msgs.Vector3 import Vector3, make_vector3 from dimos.msgs.sensor_msgs.Image import Image from dimos.navigation.base import NavigationState +from dimos.navigation.navigation_spec import NavigationInterfaceSpec from dimos.navigation.visual.query import get_object_bbox_from_image +from dimos.perception.object_tracking_spec import ObjectTrackingSpec +from dimos.perception.spatial_memory_spec import SpatialMemorySpec from dimos.types.robot_location import RobotLocation from dimos.utils.logging_config import setup_logger @@ -40,18 +43,9 @@ class NavigationSkillContainer(Module): _skill_started: bool = False _similarity_threshold: float = 0.23 - rpc_calls: list[str] = [ - "SpatialMemory.tag_location", - "SpatialMemory.query_tagged_location", - "SpatialMemory.query_by_text", - "NavigationInterface.set_goal", - "NavigationInterface.get_state", - "NavigationInterface.is_goal_reached", - "NavigationInterface.cancel_goal", - "ObjectTracking.track", - "ObjectTracking.stop_track", - "ObjectTracking.is_tracking", - ] + _spatial_memory: SpatialMemorySpec + _navigation: NavigationInterfaceSpec + _object_tracking: ObjectTrackingSpec | None = None color_image: In[Image] odom: In[PoseStamped] @@ -110,8 +104,7 @@ def tag_location(self, location_name: str) -> str: rotation=(rotation.x, rotation.y, rotation.z), ) - tag_location_rpc = self.get_rpc_calls("SpatialMemory.tag_location") - if not tag_location_rpc(location): + if not self._spatial_memory.tag_location(location): return f"Error: Failed to store '{location_name}' in the spatial memory" logger.info(f"Tagged {location}") @@ -151,13 +144,7 @@ def navigate_with_text(self, query: str) -> str: return f"No tagged location called '{query}'. No object in view matching '{query}'. No matching location found in semantic map for '{query}'." def _navigate_by_tagged_location(self, query: str) -> str | None: - try: - query_tagged_location_rpc = self.get_rpc_calls("SpatialMemory.query_tagged_location") - except Exception: - logger.warning("SpatialMemory module not connected, cannot query tagged locations") - return None - - robot_location = query_tagged_location_rpc(query) + robot_location = self._spatial_memory.query_tagged_location(query) if not robot_location: return None @@ -172,16 +159,10 @@ def _navigate_by_tagged_location(self, query: str) -> str | None: return self._navigate_to(goal_pose, f"Found a tagged location called '{query}'.") def _navigate_to(self, pose: PoseStamped, message: str) -> str: - try: - set_goal_rpc = self.get_rpc_calls("NavigationInterface.set_goal") - except Exception: - logger.error("Navigation module not connected properly") - return "Error: Navigation module is not connected, cannot set goal." - logger.info( f"Navigating to pose: ({pose.position.x:.2f}, {pose.position.y:.2f}, {pose.position.z:.2f})" ) - set_goal_rpc(pose) + self._navigation.set_goal(pose) return ( f"{message}. Started navigating to that position. " @@ -189,6 +170,9 @@ def _navigate_to(self, pose: PoseStamped, message: str) -> str: ) def _navigate_to_object(self, query: str) -> str | None: + if self._object_tracking is None: + return None + try: bbox = self._get_bbox_for_current_frame(query) except Exception: @@ -198,26 +182,10 @@ def _navigate_to_object(self, query: str) -> str | None: if bbox is None: return None - try: - track_rpc, stop_track_rpc, is_tracking_rpc = self.get_rpc_calls( - "ObjectTracking.track", "ObjectTracking.stop_track", "ObjectTracking.is_tracking" - ) - except Exception: - logger.error("ObjectTracking module not connected properly") - return None - - try: - get_state_rpc, is_goal_reached_rpc = self.get_rpc_calls( - "NavigationInterface.get_state", "NavigationInterface.is_goal_reached" - ) - except Exception: - logger.error("Navigation module not connected properly") - return None - logger.info(f"Found {query} at {bbox}") # Start tracking - BBoxNavigationModule automatically generates goals - track_rpc(bbox) + self._object_tracking.track(bbox) # type: ignore[arg-type] start_time = time.time() timeout = 30.0 @@ -225,31 +193,31 @@ def _navigate_to_object(self, query: str) -> str | None: while time.time() - start_time < timeout: # Check if navigator finished - if get_state_rpc() == NavigationState.IDLE and goal_set: + if self._navigation.get_state() == NavigationState.IDLE and goal_set: logger.info("Waiting for goal result") time.sleep(1.0) - if not is_goal_reached_rpc(): + if not self._navigation.is_goal_reached(): logger.info(f"Goal cancelled, tracking '{query}' failed") - stop_track_rpc() + self._object_tracking.stop_track() return None else: logger.info(f"Reached '{query}'") - stop_track_rpc() + self._object_tracking.stop_track() return f"Successfully arrived at '{query}'" # If goal set and tracking lost, just continue (tracker will resume or timeout) - if goal_set and not is_tracking_rpc(): + if goal_set and not self._object_tracking.is_tracking(): continue # BBoxNavigationModule automatically sends goals when tracker publishes # Just check if we have any detections to mark goal_set - if is_tracking_rpc(): + if self._object_tracking.is_tracking(): goal_set = True time.sleep(0.25) logger.warning(f"Navigation to '{query}' timed out after {timeout}s") - stop_track_rpc() + self._object_tracking.stop_track() return None def _get_bbox_for_current_frame(self, query: str) -> BBox | None: @@ -259,12 +227,7 @@ def _get_bbox_for_current_frame(self, query: str) -> BBox | None: return get_object_bbox_from_image(self._vl_model, self._latest_image, query) def _navigate_using_semantic_map(self, query: str) -> str: - try: - query_by_text_rpc = self.get_rpc_calls("SpatialMemory.query_by_text") - except Exception: - return "Error: The SpatialMemory module is not connected." - - results = query_by_text_rpc(query) + results = self._spatial_memory.query_by_text(query) if not results: return f"No matching location found in semantic map for '{query}'" @@ -292,13 +255,7 @@ def stop_navigation(self) -> str: return "Stopped" def _cancel_goal_and_stop(self) -> None: - try: - cancel_goal_rpc = self.get_rpc_calls("NavigationInterface.cancel_goal") - except Exception: - logger.warning("Navigation module not connected, cannot cancel goal") - return - - cancel_goal_rpc() + self._navigation.cancel_goal() def _get_goal_pose_from_result(self, result: dict[str, Any]) -> PoseStamped | None: similarity = 1.0 - (result.get("distance") or 1) diff --git a/dimos/agents/skills/test_gps_nav_skills.py b/dimos/agents/skills/test_gps_nav_skills.py index c1e380ccd1..e4b593c307 100644 --- a/dimos/agents/skills/test_gps_nav_skills.py +++ b/dimos/agents/skills/test_gps_nav_skills.py @@ -16,6 +16,7 @@ import pytest from dimos.agents.skills.gps_nav_skill import GpsNavSkillContainer +from dimos.core.core import rpc from dimos.core.module import Module from dimos.core.stream import Out from dimos.mapping.models import LatLon @@ -27,6 +28,12 @@ class FakeGPS(Module): gps_location: Out[LatLon] +class StubWebsocketVis(Module): + @rpc + def set_gps_travel_goal_points(self, points: list[LatLon]) -> None: + pass + + class MockedGpsNavSkill(GpsNavSkillContainer): _latest_location = LatLon(lat=37.782654, lon=-122.413273) _started = True @@ -36,7 +43,11 @@ class MockedGpsNavSkill(GpsNavSkillContainer): @pytest.mark.slow def test_set_gps_travel_points(agent_setup) -> None: history = agent_setup( - blueprints=[FakeGPS.blueprint(), MockedGpsNavSkill.blueprint()], + blueprints=[ + FakeGPS.blueprint(), + MockedGpsNavSkill.blueprint(), + StubWebsocketVis.blueprint(), + ], messages=[ HumanMessage( 'Set GPS travel points to [{"lat": 37.782654, "lon": -122.413273}]. ' @@ -51,7 +62,11 @@ def test_set_gps_travel_points(agent_setup) -> None: @pytest.mark.slow def test_set_gps_travel_points_multiple(agent_setup) -> None: history = agent_setup( - blueprints=[FakeGPS.blueprint(), MockedGpsNavSkill.blueprint()], + blueprints=[ + FakeGPS.blueprint(), + MockedGpsNavSkill.blueprint(), + StubWebsocketVis.blueprint(), + ], messages=[ HumanMessage( "Set GPS travel points to these locations in order: " diff --git a/dimos/agents/skills/test_navigation.py b/dimos/agents/skills/test_navigation.py index e4a60db081..c837492d50 100644 --- a/dimos/agents/skills/test_navigation.py +++ b/dimos/agents/skills/test_navigation.py @@ -12,14 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any + from langchain_core.messages import HumanMessage import pytest from dimos.agents.skills.navigation import NavigationSkillContainer +from dimos.core.core import rpc from dimos.core.module import Module from dimos.core.stream import Out from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.sensor_msgs.Image import Image +from dimos.navigation.base import NavigationState +from dimos.types.robot_location import RobotLocation class FakeCamera(Module): @@ -30,9 +35,61 @@ class FakeOdom(Module): odom: Out[PoseStamped] +class StubSpatialMemory(Module): + @rpc + def tag_location(self, robot_location: RobotLocation) -> bool: + return True + + @rpc + def query_tagged_location(self, query: str) -> RobotLocation | None: + return None + + @rpc + def query_by_text(self, text: str, limit: int = 5) -> list[dict[str, Any]]: + return [] + + +class StubNavigation(Module): + @rpc + def set_goal(self, goal: PoseStamped) -> bool: + return True + + @rpc + def get_state(self) -> NavigationState: + return NavigationState.IDLE + + @rpc + def is_goal_reached(self) -> bool: + return False + + @rpc + def cancel_goal(self) -> bool: + return True + + +class StubObjectTracking(Module): + @rpc + def track(self, bbox: list[float]) -> dict[str, Any]: + return {} + + @rpc + def stop_track(self) -> bool: + return True + + @rpc + def is_tracking(self) -> bool: + return False + + +_STUB_BLUEPRINTS = [ + StubSpatialMemory.blueprint(), + StubNavigation.blueprint(), + StubObjectTracking.blueprint(), +] + + class MockedStopNavSkill(NavigationSkillContainer): _skill_started = True - rpc_calls: list[str] = [] def _cancel_goal_and_stop(self): pass @@ -40,7 +97,6 @@ def _cancel_goal_and_stop(self): class MockedExploreNavSkill(NavigationSkillContainer): _skill_started = True - rpc_calls: list[str] = [] def _start_exploration(self, timeout): return "Exploration completed successfuly" @@ -51,7 +107,6 @@ def _cancel_goal_and_stop(self): class MockedSemanticNavSkill(NavigationSkillContainer): _skill_started = True - rpc_calls: list[str] = [] def _navigate_by_tagged_location(self, query): return None @@ -70,6 +125,7 @@ def test_stop_movement(agent_setup) -> None: FakeCamera.blueprint(), FakeOdom.blueprint(), MockedStopNavSkill.blueprint(), + *_STUB_BLUEPRINTS, ], messages=[HumanMessage("Stop moving. Use the stop_movement tool.")], ) @@ -84,6 +140,7 @@ def test_start_exploration(agent_setup) -> None: FakeCamera.blueprint(), FakeOdom.blueprint(), MockedExploreNavSkill.blueprint(), + *_STUB_BLUEPRINTS, ], messages=[ HumanMessage("Take a look around for 10 seconds. Use the start_exploration tool.") @@ -100,6 +157,7 @@ def test_go_to_semantic_location(agent_setup) -> None: FakeCamera.blueprint(), FakeOdom.blueprint(), MockedSemanticNavSkill.blueprint(), + *_STUB_BLUEPRINTS, ], messages=[HumanMessage("Go to the bookshelf. Use the navigate_with_text tool.")], ) diff --git a/dimos/agents/skills/test_unitree_skill_container.py b/dimos/agents/skills/test_unitree_skill_container.py index 92b006dce5..612f9c2e9d 100644 --- a/dimos/agents/skills/test_unitree_skill_container.py +++ b/dimos/agents/skills/test_unitree_skill_container.py @@ -18,22 +18,49 @@ from langchain_core.messages import HumanMessage import pytest +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.navigation.base import NavigationState from dimos.robot.unitree.unitree_skill_container import _UNITREE_COMMANDS, UnitreeSkillContainer -class MockedUnitreeSkill(UnitreeSkillContainer): - rpc_calls: list[str] = [] +class StubNavigation(Module): + @rpc + def set_goal(self, goal: PoseStamped) -> bool: + return True + + @rpc + def get_state(self) -> NavigationState: + return NavigationState.IDLE + + @rpc + def is_goal_reached(self) -> bool: + return False + + @rpc + def cancel_goal(self) -> bool: + return True - def __init__(self, **kwargs: Any): - super().__init__(**kwargs) - # Provide a fake RPC so the real execute_sport_command runs end-to-end. - self._bound_rpc_calls["GO2Connection.publish_request"] = lambda *args, **kwargs: None + +class StubGO2Connection(Module): + @rpc + def publish_request(self, topic: str, data: dict[str, Any]) -> dict[Any, Any]: + return {} + + +class MockedUnitreeSkill(UnitreeSkillContainer): + pass @pytest.mark.slow def test_pounce(agent_setup) -> None: history = agent_setup( - blueprints=[MockedUnitreeSkill.blueprint()], + blueprints=[ + MockedUnitreeSkill.blueprint(), + StubNavigation.blueprint(), + StubGO2Connection.blueprint(), + ], messages=[HumanMessage("Pounce! Use the execute_sport_command tool.")], ) diff --git a/dimos/agents/vlm_agent_spec.py b/dimos/agents/vlm_agent_spec.py new file mode 100644 index 0000000000..a1bebbfe20 --- /dev/null +++ b/dimos/agents/vlm_agent_spec.py @@ -0,0 +1,24 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Protocol + +from dimos.msgs.sensor_msgs.Image import Image +from dimos.spec.utils import Spec + + +class VLMAgentSpec(Spec, Protocol): + def query_image( + self, image: Image, query: str, response_format: dict[str, Any] | None = None + ) -> str: ... diff --git a/dimos/agents/vlm_stream_tester.py b/dimos/agents/vlm_stream_tester.py index 80353dbfe0..8e7ea65dbf 100644 --- a/dimos/agents/vlm_stream_tester.py +++ b/dimos/agents/vlm_stream_tester.py @@ -17,6 +17,7 @@ from langchain_core.messages import AIMessage, HumanMessage +from dimos.agents.vlm_agent_spec import VLMAgentSpec from dimos.core.core import rpc from dimos.core.module import Module from dimos.core.stream import In, Out @@ -33,9 +34,7 @@ class VlmStreamTester(Module): query_stream: Out[HumanMessage] answer_stream: In[AIMessage] - rpc_calls: list[str] = [ - "VLMAgent.query_image", - ] + _vlm_agent: VLMAgentSpec def __init__( # type: ignore[no-untyped-def] self, @@ -134,13 +133,6 @@ def _run_stream_queries(self) -> None: time.sleep(self._query_interval_s) def _run_rpc_queries(self) -> None: - rpc_query = None - try: - rpc_query = self.get_rpc_calls("VLMAgent.query_image") - except Exception as exc: - logger.warning("RPC query_image lookup failed", error=str(exc)) - return - for idx in range(self._num_queries): if self._stop_event.is_set(): break @@ -160,7 +152,7 @@ def _run_rpc_queries(self) -> None: logger.info("Sending RPC query", index=idx + 1, total=self._num_queries) try: - response = rpc_query( + response = self._vlm_agent.query_image( self._latest_image, f"{self._prompt} (rpc query {idx + 1}/{self._num_queries})", ) diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py index 1fda5f90d3..389c63cea7 100644 --- a/dimos/core/blueprints.py +++ b/dimos/core/blueprints.py @@ -12,21 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC from collections import defaultdict from collections.abc import Callable, Mapping from dataclasses import dataclass, field, replace from functools import cached_property, reduce import operator import sys +import types as types_mod from types import MappingProxyType -from typing import TYPE_CHECKING, Any, Literal, get_args, get_origin, get_type_hints +from typing import TYPE_CHECKING, Any, Literal, Union, get_args, get_origin, get_type_hints if TYPE_CHECKING: from dimos.protocol.service.system_configurator.base import SystemConfigurator from dimos.core.global_config import GlobalConfig, global_config -from dimos.core.module import Module, ModuleBase, ModuleSpec, is_module_type +from dimos.core.module import ModuleBase, ModuleSpec, is_module_type from dimos.core.module_coordinator import ModuleCoordinator from dimos.core.stream import In, Out from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport @@ -53,6 +53,7 @@ class StreamRef: class ModuleRef: name: str spec: type[Spec] | type[ModuleBase] + optional: bool = False @dataclass(frozen=True) @@ -99,6 +100,15 @@ def create(cls, module: type[ModuleBase[Any]], kwargs: dict[str, Any]) -> Self: # linking to specific/known module directly elif is_module_type(annotation): module_refs.append(ModuleRef(name=name, spec=annotation)) + # Optional Spec or Module: SomeSpec | None + elif origin in (Union, types_mod.UnionType): + args = [a for a in get_args(annotation) if a is not type(None)] + if len(args) == 1: + inner = args[0] + if is_spec(inner): + module_refs.append(ModuleRef(name=name, spec=inner, optional=True)) + elif is_module_type(inner): + module_refs.append(ModuleRef(name=name, spec=inner, optional=True)) return cls( module=module, @@ -163,25 +173,6 @@ def _active_blueprints(self) -> tuple[_BlueprintAtom, ...]: disabled = set(self.disabled_modules_tuple) return tuple(bp for bp in self.blueprints if bp.module not in disabled) - def _check_ambiguity( - self, - requested_method_name: str, - interface_methods: Mapping[str, list[tuple[type[ModuleBase], Callable[..., Any]]]], - requesting_module: type[ModuleBase], - ) -> None: - if ( - requested_method_name in interface_methods - and len(interface_methods[requested_method_name]) > 1 - ): - modules_str = ", ".join( - impl[0].__name__ for impl in interface_methods[requested_method_name] - ) - raise ValueError( - f"Ambiguous RPC method '{requested_method_name}' requested by " - f"{requesting_module.__name__}. Multiple implementations found: " - f"{modules_str}. Please use a concrete class name instead." - ) - def _get_transport_for(self, name: str, stream_type: type) -> PubSubTransport[Any]: transport = self.transport_map.get((name, stream_type), None) if transport: @@ -349,6 +340,8 @@ def _connect_module_refs(self, module_coordinator: ModuleCoordinator) -> None: ] # none if len(possible_module_candidates) == 0: + if each_module_ref.optional: + continue raise Exception( f"""The {blueprint.module.__name__} has a module reference ({each_module_ref}) which requested a module that fills out the {each_module_ref.spec.__name__} spec. But I couldn't find a module that met that spec.\n""" ) @@ -393,84 +386,6 @@ def _connect_module_refs(self, module_coordinator: ModuleCoordinator) -> None: # Ensure the remote module instance can use the module ref inside its own RPC handlers. base_module_proxy.set_module_ref(module_ref_name, target_module_proxy) - def _connect_rpc_methods(self, module_coordinator: ModuleCoordinator) -> None: - # Gather all RPC methods. - rpc_methods = {} - rpc_methods_dot = {} - - # Track interface methods to detect ambiguity. - interface_methods: defaultdict[str, list[tuple[type[ModuleBase], Callable[..., Any]]]] = ( - defaultdict(list) - ) # interface_name_method -> [(module_class, method)] - interface_methods_dot: defaultdict[ - str, list[tuple[type[ModuleBase], Callable[..., Any]]] - ] = defaultdict(list) # interface_name.method -> [(module_class, method)] - - for blueprint in self._active_blueprints: - for method_name in blueprint.module.rpcs.keys(): # type: ignore[attr-defined] - module_proxy = module_coordinator.get_instance(blueprint.module) # type: ignore[assignment] - method_for_rpc_client = getattr(module_proxy, method_name) - # Register under concrete class name (backward compatibility) - rpc_methods[f"{blueprint.module.__name__}_{method_name}"] = method_for_rpc_client - rpc_methods_dot[f"{blueprint.module.__name__}.{method_name}"] = ( - method_for_rpc_client - ) - - # Also register under any interface names - for base in blueprint.module.mro(): - # Check if this base is an abstract interface with the method - if ( - base is not Module - and issubclass(base, ABC) - and hasattr(base, method_name) - and getattr(base, method_name, None) is not None - ): - interface_key = f"{base.__name__}.{method_name}" - interface_methods_dot[interface_key].append( - (blueprint.module, method_for_rpc_client) - ) - interface_key_underscore = f"{base.__name__}_{method_name}" - interface_methods[interface_key_underscore].append( - (blueprint.module, method_for_rpc_client) - ) - - # Check for ambiguity in interface methods and add non-ambiguous ones - for interface_key, implementations in interface_methods_dot.items(): - if len(implementations) == 1: - rpc_methods_dot[interface_key] = implementations[0][1] - for interface_key, implementations in interface_methods.items(): - if len(implementations) == 1: - rpc_methods[interface_key] = implementations[0][1] - - # Fulfil method requests (so modules can call each other). - for blueprint in self._active_blueprints: - instance = module_coordinator.get_instance(blueprint.module) # type: ignore[assignment] - - for method_name in blueprint.module.rpcs.keys(): # type: ignore[attr-defined] - if not method_name.startswith("set_"): - continue - - linked_name = method_name.removeprefix("set_") - - self._check_ambiguity(linked_name, interface_methods, blueprint.module) - - if linked_name not in rpc_methods: - continue - - getattr(instance, method_name)(rpc_methods[linked_name]) - - for requested_method_name in instance.get_rpc_method_names(): # type: ignore[union-attr] - self._check_ambiguity( - requested_method_name, interface_methods_dot, blueprint.module - ) - - if requested_method_name not in rpc_methods_dot: - continue - - instance.set_rpc_method( # type: ignore[union-attr] - requested_method_name, rpc_methods_dot[requested_method_name] - ) - def build( self, cli_config_overrides: Mapping[str, Any] | None = None, @@ -491,7 +406,6 @@ def build( # all module constructors are called here (each of them setup their own) self._deploy_all_modules(module_coordinator, global_config) self._connect_streams(module_coordinator) - self._connect_rpc_methods(module_coordinator) self._connect_module_refs(module_coordinator) module_coordinator.build_all_modules() diff --git a/dimos/core/docker_module.py b/dimos/core/docker_module.py index 9ca30f8a12..ae6b685312 100644 --- a/dimos/core/docker_module.py +++ b/dimos/core/docker_module.py @@ -154,9 +154,7 @@ def __init__(self, module_class: type[ModuleBase], *args: Any, **kwargs: Any) -> default_rpc_timeout=self.config.default_rpc_timeout, ) self.rpcs = set(module_class.rpcs.keys()) # type: ignore[attr-defined] - self.rpc_calls: list[str] = getattr(module_class, "rpc_calls", []) self._unsub_fns: list[Callable[[], None]] = [] - self._bound_rpc_calls: dict[str, RpcCall] = {} def build(self) -> None: """Build/pull docker image, launch container, wait for RPC readiness. @@ -218,26 +216,6 @@ def build(self) -> None: self._cleanup() raise - def get_rpc_method_names(self) -> list[str]: - return self.rpc_calls - - def set_rpc_method(self, method: str, callable: RpcCall) -> None: - callable.set_rpc(self.rpc) - self._bound_rpc_calls[method] = callable - # Forward to container — Module.set_rpc_method unpickles the RpcCall - # and wires it with the container's own LCMRPC - self.rpc.call_sync( - f"{self.remote_name}/set_rpc_method", - ([method, callable], {}), - ) - - def get_rpc_calls(self, *methods: str) -> RpcCall | tuple[RpcCall, ...]: - missing = set(methods) - self._bound_rpc_calls.keys() - if missing: - raise ValueError(f"RPC methods not found: {missing}") - calls = tuple(self._bound_rpc_calls[m] for m in methods) - return calls[0] if len(calls) == 1 else calls - def start(self) -> None: """Invoke the remote module's start() RPC.""" try: @@ -462,7 +440,7 @@ def _wait_for_rpc(self) -> None: try: self.rpc.call_sync( - f"{self.remote_name}/get_rpc_method_names", + f"{self.remote_name}/get_skills", ([], {}), rpc_timeout=3.0, # short timeout for polling readiness ) diff --git a/dimos/core/introspection/module/info.py b/dimos/core/introspection/module/info.py index 8fcad76006..3551c15c5c 100644 --- a/dimos/core/introspection/module/info.py +++ b/dimos/core/introspection/module/info.py @@ -22,8 +22,6 @@ # Internal RPCs to hide from io() output INTERNAL_RPCS = { "dynamic_skills", - "get_rpc_method_names", - "set_rpc_method", "skills", "_io_instance", } diff --git a/dimos/core/module.py b/dimos/core/module.py index 46f07ab8e8..8436b9f19f 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -28,7 +28,6 @@ get_args, get_origin, get_type_hints, - overload, ) from langchain_core.tools import tool @@ -40,7 +39,6 @@ from dimos.core.introspection.module.info import extract_module_info from dimos.core.introspection.module.render import render_module_io from dimos.core.resource import Resource -from dimos.core.rpc_client import RpcCall from dimos.core.stream import In, Out, RemoteOut, Transport from dimos.protocol.rpc.pubsubrpc import LCMRPC from dimos.protocol.rpc.spec import DEFAULT_RPC_TIMEOUT, DEFAULT_RPC_TIMEOUTS, RPCSpec @@ -113,13 +111,10 @@ class ModuleBase(Configurable[ModuleConfigT], Resource): _loop: asyncio.AbstractEventLoop | None = None _loop_thread: threading.Thread | None _disposables: CompositeDisposable - _bound_rpc_calls: dict[str, RpcCall] = {} _module_closed: bool = False _module_closed_lock: threading.Lock _loop_thread_timeout: float = 2.0 - rpc_calls: list[str] = [] - def __init__(self, config_args: dict[str, Any]): super().__init__(**config_args) self._module_closed_lock = threading.Lock() @@ -381,34 +376,10 @@ def blueprint(self) -> _BlueprintPartial: return partial(Blueprint.create, self) # type: ignore[arg-type] - @rpc - def get_rpc_method_names(self) -> list[str]: - return self.rpc_calls - - @rpc - def set_rpc_method(self, method: str, callable: RpcCall) -> None: - callable.set_rpc(self.rpc) # type: ignore[arg-type] - self._bound_rpc_calls[method] = callable - @rpc def set_module_ref(self, name: str, module_ref: "RPCClient") -> None: setattr(self, name, module_ref) - @overload - def get_rpc_calls(self, method: str) -> RpcCall: ... - - @overload - def get_rpc_calls(self, method1: str, method2: str, *methods: str) -> tuple[RpcCall, ...]: ... - - def get_rpc_calls(self, *methods: str) -> RpcCall | tuple[RpcCall, ...]: # type: ignore[misc] - missing = [m for m in methods if m not in self._bound_rpc_calls] - if missing: - raise ValueError( - f"RPC methods not found. Class: {self.__class__.__name__}, RPC methods: {', '.join(missing)}" - ) - result = tuple(self._bound_rpc_calls[m] for m in methods) - return result[0] if len(result) == 1 else result - @rpc def get_skills(self) -> list[SkillInfo]: skills: list[SkillInfo] = [] diff --git a/dimos/core/rpc_client.py b/dimos/core/rpc_client.py index 089ea5ed06..3caa8ae08f 100644 --- a/dimos/core/rpc_client.py +++ b/dimos/core/rpc_client.py @@ -91,9 +91,6 @@ def build(self) -> None: ... def start(self) -> None: ... def stop(self) -> None: ... def set_transport(self, stream_name: str, transport: Any) -> bool: ... - def get_rpc_method_names(self) -> list[str]: ... - def set_rpc_method(self, method: str, callable: RpcCall) -> None: ... - def get_rpc_calls(self, *methods: str) -> RpcCall | tuple[RpcCall, ...]: ... class RPCClient: diff --git a/dimos/core/test_blueprints.py b/dimos/core/test_blueprints.py index b61e34c5f9..53c882703e 100644 --- a/dimos/core/test_blueprints.py +++ b/dimos/core/test_blueprints.py @@ -23,6 +23,7 @@ ) from dimos.core.blueprints import ( Blueprint, + ModuleRef, StreamRef, _BlueprintAtom, autoconnect, @@ -30,7 +31,6 @@ from dimos.core.core import rpc from dimos.core.module import Module from dimos.core.module_coordinator import ModuleCoordinator -from dimos.core.rpc_client import RpcCall from dimos.core.stream import In, Out from dimos.core.transport import LCMTransport from dimos.msgs.sensor_msgs.Image import Image @@ -89,18 +89,11 @@ class ModuleB(Module): data2: In[Data2] data3: Out[Data3] - _module_a_get_name: callable = None - - @rpc - def set_ModuleA_get_name(self, callable: RpcCall) -> None: - self._module_a_get_name = callable - self._module_a_get_name.set_rpc(self.rpc) + module_a: ModuleA @rpc def what_is_as_name(self) -> str: - if self._module_a_get_name is None: - return "ModuleA.get_name not set" - return self._module_a_get_name() + return self.module_a.get_name() class ModuleC(Module): @@ -140,7 +133,7 @@ def test_autoconnect() -> None: StreamRef(name="data2", type=Data2, direction="in"), StreamRef(name="data3", type=Data3, direction="out"), ), - module_refs=(), + module_refs=(ModuleRef(name="module_a", spec=ModuleA),), kwargs={}, ), ) diff --git a/dimos/core/test_core.py b/dimos/core/test_core.py index f9a89829d5..f8cc019af0 100644 --- a/dimos/core/test_core.py +++ b/dimos/core/test_core.py @@ -77,7 +77,7 @@ def test_classmethods() -> None: # Check that we have the expected RPC methods assert "navigate_to" in class_rpcs, "navigate_to should be in rpcs" assert "start" in class_rpcs, "start should be in rpcs" - assert len(class_rpcs) == 9 + assert len(class_rpcs) == 7 # Check that the values are callable assert callable(class_rpcs["navigate_to"]), "navigate_to should be callable" diff --git a/dimos/manipulation/control/arm_driver_spec.py b/dimos/manipulation/control/arm_driver_spec.py new file mode 100644 index 0000000000..f8d57db5ed --- /dev/null +++ b/dimos/manipulation/control/arm_driver_spec.py @@ -0,0 +1,24 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from dimos.spec.utils import Spec + + +class ArmDriverSpec(Spec, Protocol): + def get_forward_kinematics( + self, joint_positions: list[float] + ) -> tuple[int, list[float] | None]: ... + def get_inverse_kinematics(self, pose: list[float]) -> tuple[int, list[float] | None]: ... diff --git a/dimos/manipulation/control/servo_control/cartesian_motion_controller.py b/dimos/manipulation/control/servo_control/cartesian_motion_controller.py index 6b702495e2..303ef5fc31 100644 --- a/dimos/manipulation/control/servo_control/cartesian_motion_controller.py +++ b/dimos/manipulation/control/servo_control/cartesian_motion_controller.py @@ -34,6 +34,7 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig from dimos.core.stream import In, Out +from dimos.manipulation.control.arm_driver_spec import ArmDriverSpec from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion @@ -51,8 +52,6 @@ class CartesianMotionControllerConfig(ModuleConfig): """Configuration for Cartesian motion controller.""" - arm_driver: Any = None - # Control loop parameters control_frequency: float = 20.0 # Hz - Cartesian control loop rate command_timeout: float = 30.0 # seconds - timeout for stale targets (RPC mode needs longer) @@ -101,11 +100,7 @@ class CartesianMotionController(Module[CartesianMotionControllerConfig]): default_config = CartesianMotionControllerConfig - # RPC methods to request from other modules (resolved at blueprint build time) - rpc_calls = [ - "XArmDriver.get_forward_kinematics", - "XArmDriver.get_inverse_kinematics", - ] + _arm_driver: ArmDriverSpec # Input topics (initialized by Module base class) joint_state: In[JointState] = None # type: ignore[assignment] @@ -118,18 +113,8 @@ class CartesianMotionController(Module[CartesianMotionControllerConfig]): current_pose: Out[PoseStamped] = None # type: ignore[assignment] def __init__(self, **kwargs: Any) -> None: - """ - Initialize the Cartesian motion controller. - - Args: - arm_driver: (Optional) Hardware driver reference (legacy mode). - When using blueprints, this is resolved automatically via rpc_calls. - """ super().__init__(**kwargs) - # Hardware driver reference - set via arm_driver param (legacy) or RPC wiring (blueprint) - self._arm_driver_legacy = self.config.arm_driver - # State tracking self._latest_joint_state: JointState | None = None self._latest_robot_state: RobotState | None = None @@ -197,34 +182,12 @@ def __init__(self, **kwargs: Any) -> None: ) def _call_fk(self, joint_positions: list[float]) -> tuple[int, list[float] | None]: - """Call FK - uses blueprint RPC wiring or legacy arm_driver reference.""" - try: - result: tuple[int, list[float] | None] = self.get_rpc_calls( - "XArmDriver.get_forward_kinematics" - )(joint_positions) - return result - except (ValueError, KeyError): - if self._arm_driver_legacy: - result_fk: tuple[int, list[float] | None] = ( - self._arm_driver_legacy.get_forward_kinematics(joint_positions) # type: ignore[attr-defined] - ) - return result_fk - raise RuntimeError("No arm driver available - use blueprint or pass arm_driver param") + """Call FK via the arm driver spec (resolved at blueprint build time).""" + return self._arm_driver.get_forward_kinematics(joint_positions) def _call_ik(self, pose: list[float]) -> tuple[int, list[float] | None]: - """Call IK - uses blueprint RPC wiring or legacy arm_driver reference.""" - try: - result: tuple[int, list[float] | None] = self.get_rpc_calls( - "XArmDriver.get_inverse_kinematics" - )(pose) - return result - except (ValueError, KeyError): - if self._arm_driver_legacy: - result_ik: tuple[int, list[float] | None] = ( - self._arm_driver_legacy.get_inverse_kinematics(pose) # type: ignore[attr-defined] - ) - return result_ik - raise RuntimeError("No arm driver available - use blueprint or pass arm_driver param") + """Call IK via the arm driver spec (resolved at blueprint build time).""" + return self._arm_driver.get_inverse_kinematics(pose) @rpc def start(self) -> None: diff --git a/dimos/manipulation/grasping/grasp_gen_spec.py b/dimos/manipulation/grasping/grasp_gen_spec.py new file mode 100644 index 0000000000..37c81c85bc --- /dev/null +++ b/dimos/manipulation/grasping/grasp_gen_spec.py @@ -0,0 +1,27 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from dimos.msgs.geometry_msgs.PoseArray import PoseArray +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.spec.utils import Spec + + +class GraspGenSpec(Spec, Protocol): + def generate_grasps( + self, + pointcloud: PointCloud2, + scene_pointcloud: PointCloud2 | None = None, + ) -> PoseArray | None: ... diff --git a/dimos/manipulation/grasping/grasping.py b/dimos/manipulation/grasping/grasping.py index 50671777c0..9cde090d4a 100644 --- a/dimos/manipulation/grasping/grasping.py +++ b/dimos/manipulation/grasping/grasping.py @@ -25,7 +25,9 @@ from dimos.core.core import rpc from dimos.core.module import Module from dimos.core.stream import Out +from dimos.manipulation.grasping.grasp_gen_spec import GraspGenSpec from dimos.msgs.geometry_msgs.PoseArray import PoseArray +from dimos.perception.object_scene_registration_spec import ObjectSceneRegistrationSpec from dimos.utils.logging_config import setup_logger from dimos.utils.transform_utils import quaternion_to_euler @@ -40,12 +42,8 @@ class GraspingModule(Module): grasps: Out[PoseArray] - rpc_calls: list[str] = [ - "ObjectSceneRegistrationModule.get_object_pointcloud_by_name", - "ObjectSceneRegistrationModule.get_object_pointcloud_by_object_id", - "ObjectSceneRegistrationModule.get_full_scene_pointcloud", - "GraspGenModule.generate_grasps", - ] + _scene_registration: ObjectSceneRegistrationSpec + _grasp_gen: GraspGenSpec @rpc def start(self) -> None: @@ -85,10 +83,9 @@ def generate_grasps( if filter_collisions: scene_pc = self._get_scene_pointcloud(exclude_object_id=object_id) - # Call GraspGenModule RPC (running in Docker) + # Call GraspGenModule (running in Docker) try: - generate = self.get_rpc_calls("GraspGenModule.generate_grasps") - result = generate(pc, scene_pc) + result = self._grasp_gen.generate_grasps(pc, scene_pc) except Exception as e: msg = f"Grasp generation failed: {e}" logger.error(msg) @@ -111,15 +108,9 @@ def _get_object_pointcloud( """Fetch object pointcloud from perception.""" try: if object_id is not None: - get_pc = self.get_rpc_calls( - "ObjectSceneRegistrationModule.get_object_pointcloud_by_object_id" - ) - return get_pc(object_id) # type: ignore[no-any-return] - - get_pc = self.get_rpc_calls( - "ObjectSceneRegistrationModule.get_object_pointcloud_by_name" - ) - return get_pc(object_name) # type: ignore[no-any-return] + return self._scene_registration.get_object_pointcloud_by_object_id(object_id) # type: ignore[no-any-return] + + return self._scene_registration.get_object_pointcloud_by_name(object_name) # type: ignore[no-any-return] except Exception as e: logger.error(f"Failed to get object pointcloud: {e}") return None @@ -127,10 +118,9 @@ def _get_object_pointcloud( def _get_scene_pointcloud(self, exclude_object_id: str | None = None) -> PointCloud2 | None: """Fetch scene pointcloud from perception for collision filtering.""" try: - get_scene = self.get_rpc_calls( - "ObjectSceneRegistrationModule.get_full_scene_pointcloud" - ) - return get_scene(exclude_object_id=exclude_object_id) # type: ignore[no-any-return] + return self._scene_registration.get_full_scene_pointcloud( + exclude_object_id=exclude_object_id + ) # type: ignore[no-any-return] except Exception as e: logger.debug(f"Could not get scene pointcloud: {e}") return None diff --git a/dimos/navigation/navigation_spec.py b/dimos/navigation/navigation_spec.py new file mode 100644 index 0000000000..d7aaab0ba7 --- /dev/null +++ b/dimos/navigation/navigation_spec.py @@ -0,0 +1,26 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.navigation.base import NavigationState +from dimos.spec.utils import Spec + + +class NavigationInterfaceSpec(Spec, Protocol): + def set_goal(self, goal: PoseStamped) -> bool: ... + def get_state(self) -> NavigationState: ... + def is_goal_reached(self) -> bool: ... + def cancel_goal(self) -> bool: ... diff --git a/dimos/perception/object_scene_registration_spec.py b/dimos/perception/object_scene_registration_spec.py new file mode 100644 index 0000000000..59aae79cab --- /dev/null +++ b/dimos/perception/object_scene_registration_spec.py @@ -0,0 +1,29 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.spec.utils import Spec + + +class ObjectSceneRegistrationSpec(Spec, Protocol): + def get_object_pointcloud_by_name(self, name: str) -> PointCloud2 | None: ... + def get_object_pointcloud_by_object_id(self, object_id: str) -> PointCloud2 | None: ... + def get_full_scene_pointcloud( + self, + exclude_object_id: str | None = None, + depth_trunc: float = 2.0, + voxel_size: float = 0.01, + ) -> PointCloud2 | None: ... diff --git a/dimos/perception/object_tracking_spec.py b/dimos/perception/object_tracking_spec.py new file mode 100644 index 0000000000..4b92ff0003 --- /dev/null +++ b/dimos/perception/object_tracking_spec.py @@ -0,0 +1,23 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from dimos.spec.utils import Spec + + +class ObjectTrackingSpec(Spec, Protocol): + def track(self, bbox: list[float]) -> dict: ... # type: ignore[type-arg] + def stop_track(self) -> bool: ... + def is_tracking(self) -> bool: ... diff --git a/dimos/perception/spatial_memory_spec.py b/dimos/perception/spatial_memory_spec.py new file mode 100644 index 0000000000..e2b7154224 --- /dev/null +++ b/dimos/perception/spatial_memory_spec.py @@ -0,0 +1,24 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from dimos.spec.utils import Spec +from dimos.types.robot_location import RobotLocation + + +class SpatialMemorySpec(Spec, Protocol): + def tag_location(self, robot_location: RobotLocation) -> bool: ... + def query_tagged_location(self, query: str) -> RobotLocation | None: ... + def query_by_text(self, text: str, limit: int = 5) -> list[dict]: ... # type: ignore[type-arg] diff --git a/dimos/robot/unitree/g1/connection_spec.py b/dimos/robot/unitree/g1/connection_spec.py new file mode 100644 index 0000000000..ed2d572df2 --- /dev/null +++ b/dimos/robot/unitree/g1/connection_spec.py @@ -0,0 +1,23 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Protocol + +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.spec.utils import Spec + + +class G1ConnectionSpec(Spec, Protocol): + def move(self, twist: Twist, duration: float = 0.0) -> None: ... + def publish_request(self, topic: str, data: dict[str, Any]) -> dict[Any, Any]: ... diff --git a/dimos/robot/unitree/g1/skill_container.py b/dimos/robot/unitree/g1/skill_container.py index ffe8dae5f0..c3825ed29c 100644 --- a/dimos/robot/unitree/g1/skill_container.py +++ b/dimos/robot/unitree/g1/skill_container.py @@ -24,6 +24,7 @@ from dimos.core.module import Module from dimos.msgs.geometry_msgs.Twist import Twist from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.robot.unitree.g1.connection_spec import G1ConnectionSpec from dimos.utils.logging_config import setup_logger logger = setup_logger() @@ -63,10 +64,7 @@ class UnitreeG1SkillContainer(Module): - rpc_calls: list[str] = [ - "G1ConnectionBase.move", - "G1ConnectionBase.publish_request", - ] + _connection: G1ConnectionSpec @rpc def start(self) -> None: @@ -91,9 +89,8 @@ def move(self, x: float, y: float = 0.0, yaw: float = 0.0, duration: float = 0.0 duration: How long to move (seconds) """ - move_rpc = self.get_rpc_calls("G1ConnectionBase.move") twist = Twist(linear=Vector3(x, y, 0), angular=Vector3(0, 0, yaw)) - move_rpc(twist, duration=duration) + self._connection.move(twist, duration=duration) return f"Started moving with velocity=({x}, {y}, {yaw}) for {duration} seconds" @skill @@ -111,8 +108,6 @@ def _execute_g1_command( topic: str, command_name: str, ) -> str: - publish_request_rpc = self.get_rpc_calls("G1ConnectionBase.publish_request") - if command_name not in command_dict: suggestions = difflib.get_close_matches( command_name, command_dict.keys(), n=3, cutoff=0.6 @@ -122,7 +117,7 @@ def _execute_g1_command( id_, _ = command_dict[command_name] try: - publish_request_rpc(topic, {"api_id": api_id, "parameter": {"data": id_}}) + self._connection.publish_request(topic, {"api_id": api_id, "parameter": {"data": id_}}) return f"'{command_name}' command executed successfully." except Exception as e: logger.error(f"Failed to execute {command_name}: {e}") diff --git a/dimos/robot/unitree/go2/connection_spec.py b/dimos/robot/unitree/go2/connection_spec.py new file mode 100644 index 0000000000..dd6aab9c40 --- /dev/null +++ b/dimos/robot/unitree/go2/connection_spec.py @@ -0,0 +1,21 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Protocol + +from dimos.spec.utils import Spec + + +class GO2ConnectionSpec(Spec, Protocol): + def publish_request(self, topic: str, data: dict[str, Any]) -> dict[Any, Any]: ... diff --git a/dimos/robot/unitree/unitree_skill_container.py b/dimos/robot/unitree/unitree_skill_container.py index f536d8b45c..88194473e6 100644 --- a/dimos/robot/unitree/unitree_skill_container.py +++ b/dimos/robot/unitree/unitree_skill_container.py @@ -28,6 +28,8 @@ from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.navigation.base import NavigationState +from dimos.navigation.navigation_spec import NavigationInterfaceSpec +from dimos.robot.unitree.go2.connection_spec import GO2ConnectionSpec from dimos.utils.logging_config import setup_logger logger = setup_logger() @@ -192,13 +194,8 @@ class UnitreeSkillContainer(Module): """Container for Unitree Go2 robot skills using the new framework.""" - rpc_calls: list[str] = [ - "NavigationInterface.set_goal", - "NavigationInterface.get_state", - "NavigationInterface.is_goal_reached", - "NavigationInterface.cancel_goal", - "GO2Connection.publish_request", - ] + _navigation: NavigationInterfaceSpec + _connection: GO2ConnectionSpec @rpc def start(self) -> None: @@ -236,33 +233,23 @@ def relative_move(self, forward: float = 0.0, left: float = 0.0, degrees: float if tf is None: return "Failed to get the position of the robot." - try: - set_goal_rpc, get_state_rpc, is_goal_reached_rpc = self.get_rpc_calls( - "NavigationInterface.set_goal", - "NavigationInterface.get_state", - "NavigationInterface.is_goal_reached", - ) - except Exception: - logger.error("Navigation module not connected properly") - return "Failed to connect to navigation module." - # TODO: Improve this. This is not a nice way to do it. I should # subscribe to arrival/cancellation events instead. - set_goal_rpc(self._generate_new_goal(tf.to_pose(), forward, left, degrees)) + self._navigation.set_goal(self._generate_new_goal(tf.to_pose(), forward, left, degrees)) time.sleep(1.0) start_time = time.monotonic() timeout = 100.0 - while get_state_rpc() == NavigationState.FOLLOWING_PATH: + while self._navigation.get_state() == NavigationState.FOLLOWING_PATH: if time.monotonic() - start_time > timeout: return "Navigation timed out" time.sleep(0.1) time.sleep(1.0) - if not is_goal_reached_rpc(): + if not self._navigation.is_goal_reached(): return "Navigation was cancelled or failed" else: return "Navigation goal reached" @@ -298,12 +285,6 @@ def current_time(self) -> str: @skill def execute_sport_command(self, command_name: str) -> str: - try: - publish_request = self.get_rpc_calls("GO2Connection.publish_request") - except Exception: - logger.error("GO2Connection not connected properly") - return "Failed to connect to GO2Connection." - if command_name not in _UNITREE_COMMANDS: suggestions = difflib.get_close_matches( command_name, _UNITREE_COMMANDS.keys(), n=3, cutoff=0.6 @@ -313,7 +294,7 @@ def execute_sport_command(self, command_name: str) -> str: id_, _ = _UNITREE_COMMANDS[command_name] try: - publish_request(RTC_TOPIC["SPORT_MOD"], {"api_id": id_}) + self._connection.publish_request(RTC_TOPIC["SPORT_MOD"], {"api_id": id_}) return f"'{command_name}' command executed successfully." except Exception as e: logger.error(f"Failed to execute {command_name}: {e}") diff --git a/dimos/web/websocket_vis_spec.py b/dimos/web/websocket_vis_spec.py new file mode 100644 index 0000000000..41d8ac8737 --- /dev/null +++ b/dimos/web/websocket_vis_spec.py @@ -0,0 +1,22 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from dimos.mapping.models import LatLon +from dimos.spec.utils import Spec + + +class WebsocketVisSpec(Spec, Protocol): + def set_gps_travel_goal_points(self, points: list[LatLon]) -> None: ... diff --git a/docs/usage/blueprints.md b/docs/usage/blueprints.md index 80a6b24b19..0a8371032a 100644 --- a/docs/usage/blueprints.md +++ b/docs/usage/blueprints.md @@ -290,6 +290,20 @@ class ModuleB(Module): print(self.device.get_time()) ``` +### Optional module references + +If a dependency might not be present in every blueprint, annotate it as `SomeSpec | None = None`. The blueprint will try to resolve it but won't raise if no matching module is found: + +```python session=blueprint-ex3 +class ModuleC(Module): + device: AnyModuleWithGetTime | None = None + + def maybe_get_time(self) -> str: + if self.device is None: + return "No clock available" + return self.device.get_time() +``` + ## Defining skills Skills are methods on a `Module` decorated with `@skill`. The agent automatically discovers all skills from launched modules at startup.