Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,6 @@ class MySkillContainer(Module):

If multiple modules match the spec, use `.remappings()` to resolve. Source: `dimos/spec/utils.py`, `dimos/core/blueprints.py`.

**Legacy**: existing skill containers use `rpc_calls: list[str]` + `get_rpc_calls("ClassName.method")`. This still works but wiring failures are silent and only surface at runtime. Don't use it in new code.

### Adding a New Skill

1. Pick the right container (robot-specific or `dimos/agents/skills/`).
Expand Down
13 changes: 4 additions & 9 deletions dimos/agents/skills/gps_nav_skill.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
from dimos.agents.annotation import skill
from dimos.core.core import rpc
from dimos.core.module import Module
from dimos.core.rpc_client import RpcCall
from dimos.core.stream import In, Out
from dimos.mapping.models import LatLon
from dimos.mapping.utils.distance import distance_in_meters
from dimos.utils.logging_config import setup_logger
from dimos.web.websocket_vis_spec import WebsocketVisSpec

logger = setup_logger()


class GpsNavSkillContainer(Module):
_latest_location: LatLon | None = None
_max_valid_distance: int = 50000
_set_gps_travel_goal_points: RpcCall | None = None

_vis: WebsocketVisSpec

gps_location: In[LatLon]
gps_goal: Out[LatLon]
Expand All @@ -43,11 +44,6 @@ def start(self) -> None:
def stop(self) -> None:
super().stop()

@rpc
def set_WebsocketVisModule_set_gps_travel_goal_points(self, callable: RpcCall) -> None:
self._set_gps_travel_goal_points = callable
self._set_gps_travel_goal_points.set_rpc(self.rpc) # type: ignore[arg-type]

def _on_gps_location(self, location: LatLon) -> None:
self._latest_location = location

Expand Down Expand Up @@ -83,8 +79,7 @@ def set_gps_travel_points(self, points: list[dict[str, float]]) -> str:
if self.gps_goal._transport is not None:
self.gps_goal.publish(new_points) # type: ignore[arg-type]

if self._set_gps_travel_goal_points:
self._set_gps_travel_goal_points(new_points)
self._vis.set_gps_travel_goal_points(new_points) # type: ignore[arg-type]

return "I've successfully set the travel points."

Expand Down
87 changes: 22 additions & 65 deletions dimos/agents/skills/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
from dimos.msgs.geometry_msgs.Vector3 import Vector3, make_vector3
from dimos.msgs.sensor_msgs.Image import Image
from dimos.navigation.base import NavigationState
from dimos.navigation.navigation_spec import NavigationInterfaceSpec
from dimos.navigation.visual.query import get_object_bbox_from_image
from dimos.perception.object_tracking_spec import ObjectTrackingSpec
from dimos.perception.spatial_memory_spec import SpatialMemorySpec
from dimos.types.robot_location import RobotLocation
from dimos.utils.logging_config import setup_logger

Expand All @@ -40,18 +43,9 @@ class NavigationSkillContainer(Module):
_skill_started: bool = False
_similarity_threshold: float = 0.23

rpc_calls: list[str] = [
"SpatialMemory.tag_location",
"SpatialMemory.query_tagged_location",
"SpatialMemory.query_by_text",
"NavigationInterface.set_goal",
"NavigationInterface.get_state",
"NavigationInterface.is_goal_reached",
"NavigationInterface.cancel_goal",
"ObjectTracking.track",
"ObjectTracking.stop_track",
"ObjectTracking.is_tracking",
]
_spatial_memory: SpatialMemorySpec
_navigation: NavigationInterfaceSpec
_object_tracking: ObjectTrackingSpec | None = None

color_image: In[Image]
odom: In[PoseStamped]
Expand Down Expand Up @@ -110,8 +104,7 @@ def tag_location(self, location_name: str) -> str:
rotation=(rotation.x, rotation.y, rotation.z),
)

tag_location_rpc = self.get_rpc_calls("SpatialMemory.tag_location")
if not tag_location_rpc(location):
if not self._spatial_memory.tag_location(location):
return f"Error: Failed to store '{location_name}' in the spatial memory"

logger.info(f"Tagged {location}")
Expand Down Expand Up @@ -151,13 +144,7 @@ def navigate_with_text(self, query: str) -> str:
return f"No tagged location called '{query}'. No object in view matching '{query}'. No matching location found in semantic map for '{query}'."

def _navigate_by_tagged_location(self, query: str) -> str | None:
try:
query_tagged_location_rpc = self.get_rpc_calls("SpatialMemory.query_tagged_location")
except Exception:
logger.warning("SpatialMemory module not connected, cannot query tagged locations")
return None

robot_location = query_tagged_location_rpc(query)
robot_location = self._spatial_memory.query_tagged_location(query)

if not robot_location:
return None
Expand All @@ -172,23 +159,20 @@ def _navigate_by_tagged_location(self, query: str) -> str | None:
return self._navigate_to(goal_pose, f"Found a tagged location called '{query}'.")

def _navigate_to(self, pose: PoseStamped, message: str) -> str:
try:
set_goal_rpc = self.get_rpc_calls("NavigationInterface.set_goal")
except Exception:
logger.error("Navigation module not connected properly")
return "Error: Navigation module is not connected, cannot set goal."

logger.info(
f"Navigating to pose: ({pose.position.x:.2f}, {pose.position.y:.2f}, {pose.position.z:.2f})"
)
set_goal_rpc(pose)
self._navigation.set_goal(pose)

return (
f"{message}. Started navigating to that position. "
f"To cancel movement call the 'stop_navigation' tool."
)

def _navigate_to_object(self, query: str) -> str | None:
if self._object_tracking is None:
return None

try:
bbox = self._get_bbox_for_current_frame(query)
except Exception:
Expand All @@ -198,58 +182,42 @@ def _navigate_to_object(self, query: str) -> str | None:
if bbox is None:
return None

try:
track_rpc, stop_track_rpc, is_tracking_rpc = self.get_rpc_calls(
"ObjectTracking.track", "ObjectTracking.stop_track", "ObjectTracking.is_tracking"
)
except Exception:
logger.error("ObjectTracking module not connected properly")
return None

try:
get_state_rpc, is_goal_reached_rpc = self.get_rpc_calls(
"NavigationInterface.get_state", "NavigationInterface.is_goal_reached"
)
except Exception:
logger.error("Navigation module not connected properly")
return None

logger.info(f"Found {query} at {bbox}")

# Start tracking - BBoxNavigationModule automatically generates goals
track_rpc(bbox)
self._object_tracking.track(bbox) # type: ignore[arg-type]

start_time = time.time()
timeout = 30.0
goal_set = False

while time.time() - start_time < timeout:
# Check if navigator finished
if get_state_rpc() == NavigationState.IDLE and goal_set:
if self._navigation.get_state() == NavigationState.IDLE and goal_set:
logger.info("Waiting for goal result")
time.sleep(1.0)
if not is_goal_reached_rpc():
if not self._navigation.is_goal_reached():
logger.info(f"Goal cancelled, tracking '{query}' failed")
stop_track_rpc()
self._object_tracking.stop_track()
return None
else:
logger.info(f"Reached '{query}'")
stop_track_rpc()
self._object_tracking.stop_track()
return f"Successfully arrived at '{query}'"

# If goal set and tracking lost, just continue (tracker will resume or timeout)
if goal_set and not is_tracking_rpc():
if goal_set and not self._object_tracking.is_tracking():
continue

# BBoxNavigationModule automatically sends goals when tracker publishes
# Just check if we have any detections to mark goal_set
if is_tracking_rpc():
if self._object_tracking.is_tracking():
goal_set = True

time.sleep(0.25)

logger.warning(f"Navigation to '{query}' timed out after {timeout}s")
stop_track_rpc()
self._object_tracking.stop_track()
return None

def _get_bbox_for_current_frame(self, query: str) -> BBox | None:
Expand All @@ -259,12 +227,7 @@ def _get_bbox_for_current_frame(self, query: str) -> BBox | None:
return get_object_bbox_from_image(self._vl_model, self._latest_image, query)

def _navigate_using_semantic_map(self, query: str) -> str:
try:
query_by_text_rpc = self.get_rpc_calls("SpatialMemory.query_by_text")
except Exception:
return "Error: The SpatialMemory module is not connected."

results = query_by_text_rpc(query)
results = self._spatial_memory.query_by_text(query)

if not results:
return f"No matching location found in semantic map for '{query}'"
Expand Down Expand Up @@ -292,13 +255,7 @@ def stop_navigation(self) -> str:
return "Stopped"

def _cancel_goal_and_stop(self) -> None:
try:
cancel_goal_rpc = self.get_rpc_calls("NavigationInterface.cancel_goal")
except Exception:
logger.warning("Navigation module not connected, cannot cancel goal")
return

cancel_goal_rpc()
self._navigation.cancel_goal()

def _get_goal_pose_from_result(self, result: dict[str, Any]) -> PoseStamped | None:
similarity = 1.0 - (result.get("distance") or 1)
Expand Down
19 changes: 17 additions & 2 deletions dimos/agents/skills/test_gps_nav_skills.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pytest

from dimos.agents.skills.gps_nav_skill import GpsNavSkillContainer
from dimos.core.core import rpc
from dimos.core.module import Module
from dimos.core.stream import Out
from dimos.mapping.models import LatLon
Expand All @@ -27,6 +28,12 @@ class FakeGPS(Module):
gps_location: Out[LatLon]


class StubWebsocketVis(Module):
@rpc
def set_gps_travel_goal_points(self, points: list[LatLon]) -> None:
pass


class MockedGpsNavSkill(GpsNavSkillContainer):
_latest_location = LatLon(lat=37.782654, lon=-122.413273)
_started = True
Expand All @@ -36,7 +43,11 @@ class MockedGpsNavSkill(GpsNavSkillContainer):
@pytest.mark.slow
def test_set_gps_travel_points(agent_setup) -> None:
history = agent_setup(
blueprints=[FakeGPS.blueprint(), MockedGpsNavSkill.blueprint()],
blueprints=[
FakeGPS.blueprint(),
MockedGpsNavSkill.blueprint(),
StubWebsocketVis.blueprint(),
],
messages=[
HumanMessage(
'Set GPS travel points to [{"lat": 37.782654, "lon": -122.413273}]. '
Expand All @@ -51,7 +62,11 @@ def test_set_gps_travel_points(agent_setup) -> None:
@pytest.mark.slow
def test_set_gps_travel_points_multiple(agent_setup) -> None:
history = agent_setup(
blueprints=[FakeGPS.blueprint(), MockedGpsNavSkill.blueprint()],
blueprints=[
FakeGPS.blueprint(),
MockedGpsNavSkill.blueprint(),
StubWebsocketVis.blueprint(),
],
messages=[
HumanMessage(
"Set GPS travel points to these locations in order: "
Expand Down
Loading
Loading