diff --git a/AGENTS.md b/AGENTS.md index 3c18fd685d..ee1eba538a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -27,6 +27,7 @@ dimos run unitree-g1-agentic --robot-ip 192.168.123.161 # real G1 hardware # --- Inspect & control --- dimos status +dimos repl # live Python shell into the running instance dimos log # last 50 lines, human-readable dimos log -f # follow/tail in real time dimos agent-send "say hello" @@ -90,6 +91,43 @@ Reference: `dimos/robot/unitree/go2/blueprints/agentic/unitree_go2_agentic.py` --- +## REPL (interactive debugging) + +The REPL gives you a live Python shell connected to a running DimOS instance. Enabled by default with `dimos run`. + +```bash +# In one terminal: +dimos --replay run unitree-go2-agentic --daemon + +# In another: +dimos repl +``` + +Inside the REPL: + +```python +>>> modules() # list deployed module class names +['GO2Connection', 'RerunBridge', 'McpServer', ...] + +>>> wfe = get('WavefrontFrontierExplorer') # get a live module instance +>>> wfe.begin_exploration() +"Started exploring." + +>>> coordinator.list_modules() # access the coordinator directly +``` + +| Helper | Description | +|--------|-------------| +| `coordinator` | The `ModuleCoordinator` instance | +| `modules()` | List deployed module names | +| `get(name)` | Get a live module instance by class name (connects to its worker process via RPyC) | + +Options: `--repl/--no-repl` on `dimos run`, `--repl-port` (default `18861`), `--host`/`--port` on `dimos repl`. Port is auto-detected from the run registry. + +Full docs: `docs/usage/repl.md` + +--- + ## Repo Structure ``` @@ -209,6 +247,7 @@ Every `GlobalConfig` field is a CLI flag: `--robot-ip`, `--simulation/--no-simul | `dimos list` | List all non-demo blueprints | | `dimos show-config` | Print resolved GlobalConfig values | | `dimos log [-f] [-n N] [--json] [-r ]` | View per-run logs | +| `dimos repl` | Interactive Python shell connected to a running instance ([docs](/docs/usage/repl.md)) | | `dimos mcp list-tools / call / status / modules` | MCP tools (requires McpServer in blueprint) | | `dimos agent-send ""` | Send text to the running agent via LCM | | `dimos lcmspy / agentspy / humancli / top` | Debug/diagnostic tools | @@ -379,6 +418,7 @@ CI asserts the file is current — if it's stale, CI fails. - Visualization: `docs/usage/visualization.md` - Configuration: `docs/usage/configuration.md` - Testing: `docs/development/testing.md` +- REPL: `docs/usage/repl.md` - CLI / dimos run: `docs/development/dimos_run.md` - LFS data: `docs/development/large_file_management.md` - Agent system: `docs/agents/` diff --git a/dimos/conftest.py b/dimos/conftest.py index 4ab8a401f8..9cafb26f94 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -14,10 +14,13 @@ import asyncio import os +import socket import threading +import time from dotenv import load_dotenv import pytest +import rpyc from dimos.core.module_coordinator import ModuleCoordinator from dimos.protocol.service.lcmservice import autoconf @@ -177,3 +180,49 @@ def monitor_threads(request): f"Non-closed threads created during this test. Thread names: {thread_names}. " "Please look at the first test that fails and fix that." ) + + +@pytest.fixture +def find_free_port(): + def _find_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + return _find_free_port + + +@pytest.fixture +def wait_until_rpyc_connectable(): + def _wait_connectable(host: str, port: int, timeout: float = 2.0) -> None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + c = rpyc.connect(host, port) + c.close() + return + except ConnectionRefusedError: + time.sleep(0.01) + raise TimeoutError(f"Server at {host}:{port} did not become connectable") + + return _wait_connectable + + +class _StubCoordinator: + def __init__(self, modules=None, locations=None): + self._modules = modules or [] + self._locations = locations or {} + + def list_modules(self): + return list(self._modules) + + def get_module_location(self, name): + return self._locations.get(name) + + +@pytest.fixture +def make_stub_coordinator(): + def _make(**modules): + return _StubCoordinator(**modules) + + return _make diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 10227eae93..60e304c16b 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -20,6 +20,7 @@ from dimos.core.global_config import GlobalConfig, global_config from dimos.core.module import ModuleBase, ModuleSpec +from dimos.core.repl_server import DEFAULT_REPL_PORT, ReplServer from dimos.core.resource import Resource from dimos.core.worker_manager import WorkerManager from dimos.utils.logging_config import setup_logger @@ -39,6 +40,8 @@ class ModuleCoordinator(Resource): # type: ignore[misc] _memory_limit: str = "auto" _deployed_modules: dict[type[ModuleBase], ModuleProxy] _stats_monitor: StatsMonitor | None = None + _repl_server: ReplServer | None = None + _module_locations: dict[str, tuple[str, int]] def __init__( self, @@ -49,6 +52,7 @@ def __init__( self._memory_limit = cfg.memory_limit self._global_config = cfg self._deployed_modules = {} + self._module_locations = {} @property def workers(self) -> list[Worker]: @@ -84,6 +88,12 @@ def n_modules(self) -> int: """Number of deployed modules.""" return len(self._deployed_modules) + @property + def client(self) -> WorkerManager: + if self._client is None: + raise ValueError("Client not started yet") + return self._client + def suppress_console(self) -> None: """Silence console output in all worker processes.""" if self._client is not None: @@ -101,6 +111,10 @@ def start(self) -> None: self._stats_monitor.start() def stop(self) -> None: + if self._repl_server is not None: + self._repl_server.stop() + self._repl_server = None + if self._stats_monitor is not None: self._stats_monitor.stop() self._stats_monitor = None @@ -121,18 +135,12 @@ def deploy( global_config: GlobalConfig = global_config, **kwargs: Any, ) -> ModuleProxy: - if not self._client: - raise ValueError("Trying to dimos.deploy before the client has started") - - module = self._client.deploy(module_class, global_config, kwargs) + module = self.client.deploy(module_class, global_config, kwargs) self._deployed_modules[module_class] = module # type: ignore[assignment] return module # type: ignore[return-value] def deploy_parallel(self, module_specs: list[ModuleSpec]) -> list[ModuleProxy]: - if not self._client: - raise ValueError("Not started") - - modules = self._client.deploy_parallel(module_specs) + modules = self.client.deploy_parallel(module_specs) for (module_class, _, _), module in zip(module_specs, modules, strict=True): self._deployed_modules[module_class] = module # type: ignore[assignment] return modules # type: ignore[return-value] @@ -154,6 +162,39 @@ def start_all_modules(self) -> None: def get_instance(self, module: type[ModuleBase]) -> ModuleProxy: return self._deployed_modules.get(module) # type: ignore[return-value, no-any-return] + def list_modules(self) -> list[str]: + """Return the class names of all deployed modules.""" + return [cls.__name__ for cls in self._deployed_modules] + + def get_module(self, name: str) -> ModuleProxy: + """Look up a deployed module by class name.""" + for cls, proxy in self._deployed_modules.items(): + if cls.__name__ == name: + return proxy + raise KeyError(f"Module '{name}' not found. Available: {self.list_modules()}") + + def get_module_location(self, name: str) -> tuple[str, int] | None: + """Return ``(host, port)`` of the worker RPyC server hosting *name*.""" + return self._module_locations.get(name) + + def start_repl_server(self, port: int | None = None) -> None: + """Start RPyC REPL servers in every worker and the main coordinator.""" + port = port if port is not None else DEFAULT_REPL_PORT + + # Start an RPyC server inside each worker process. + for worker in self.client.workers: + worker_port = worker.start_repl_server() + if worker_port is None: + logger.error( + "Worker failed to start REPL server, skipping...", worker_id=worker.worker_id + ) + continue + for module_name in worker.module_names: + self._module_locations[module_name] = ("localhost", worker_port) + + self._repl_server = ReplServer(self, port=port) + self._repl_server.start() + def loop(self) -> None: stop = threading.Event() try: diff --git a/dimos/core/repl_server.py b/dimos/core/repl_server.py new file mode 100644 index 0000000000..7a7bb8af49 --- /dev/null +++ b/dimos/core/repl_server.py @@ -0,0 +1,130 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import threading +from typing import TYPE_CHECKING, Any + +import rpyc +from rpyc.utils.server import ThreadedServer + +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.core.module_coordinator import ModuleCoordinator + +logger = setup_logger() + +DEFAULT_REPL_PORT = 18861 + +_RPYC_PROTOCOL_CONFIG = { + "allow_public_attrs": True, + "allow_all_attrs": True, + "allow_getattr": True, + "allow_setattr": True, + "allow_delattr": True, +} + +_THREAD_NAME = "worker-repl-server" + + +def start_worker_repl_server(instances: dict[int, Any], host: str = "localhost") -> int: + """Start an RPyC server inside a worker process. + + Returns the port the server is listening on (uses port 0 for auto-assign). + """ + + class WorkerReplService(rpyc.Service): # type: ignore[misc] + def on_connect(self, conn: rpyc.Connection) -> None: + conn._config.update(_RPYC_PROTOCOL_CONFIG) + + def exposed_get_instance_by_name(self, name: str) -> Any: + for inst in instances.values(): + if type(inst).__name__ == name: + return inst + available = [type(inst).__name__ for inst in instances.values()] + raise KeyError(f"'{name}' not found on this worker. Available: {available}") + + def exposed_list_instances(self) -> dict[int, str]: + return {mid: type(inst).__name__ for mid, inst in instances.items()} + + server = ThreadedServer( + WorkerReplService, + hostname=host, + port=0, + protocol_config=_RPYC_PROTOCOL_CONFIG, + ) + thread = threading.Thread(target=server.start, daemon=True, name=_THREAD_NAME) + thread.start() + return int(server.port) + + +def _make_service(coordinator: ModuleCoordinator) -> type[rpyc.Service]: + """Create an RPyC service class bound to *coordinator*.""" + + class DimosReplService(rpyc.Service): # type: ignore[misc] + ALIASES = ["dimos"] + + def on_connect(self, conn: rpyc.Connection) -> None: + conn._config.update(_RPYC_PROTOCOL_CONFIG) + + def exposed_get_coordinator(self) -> ModuleCoordinator: + return coordinator + + def exposed_list_modules(self) -> list[str]: + return coordinator.list_modules() + + def exposed_get_module_location(self, name: str) -> tuple[str, int] | None: + """Return (host, port) of the worker RPyC server hosting *name*.""" + return coordinator.get_module_location(name) + + return DimosReplService + + +class ReplServer: + """Manages an RPyC server for interactive REPL access to a running coordinator.""" + + def __init__( + self, + coordinator: ModuleCoordinator, + port: int = DEFAULT_REPL_PORT, + host: str = "localhost", + ) -> None: + self._coordinator = coordinator + self._port = port + self._host = host + self._server: ThreadedServer | None = None + self._thread: threading.Thread | None = None + + @property + def port(self) -> int: + return self._port + + def start(self) -> None: + service_cls = _make_service(self._coordinator) + self._server = ThreadedServer( + service_cls, + hostname=self._host, + port=self._port, + protocol_config=_RPYC_PROTOCOL_CONFIG, + ) + self._thread = threading.Thread(target=self._server.start, daemon=True, name="repl-server") + self._thread.start() + + def stop(self) -> None: + if self._server is not None: + self._server.close() + self._server = None + self._thread = None diff --git a/dimos/core/run_registry.py b/dimos/core/run_registry.py index 617872011c..efe8fec977 100644 --- a/dimos/core/run_registry.py +++ b/dimos/core/run_registry.py @@ -53,6 +53,7 @@ class RunEntry: config_overrides: dict[str, object] = field(default_factory=dict) grpc_port: int = 9877 original_argv: list[str] = field(default_factory=list) + repl_port: int | None = None @property def registry_path(self) -> Path: diff --git a/dimos/core/test_module_coordinator.py b/dimos/core/test_module_coordinator.py new file mode 100644 index 0000000000..6ef43df28c --- /dev/null +++ b/dimos/core/test_module_coordinator.py @@ -0,0 +1,130 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# type: ignore +from __future__ import annotations + +import pytest + +from dimos.core.module_coordinator import ModuleCoordinator + + +class _SensorModule: + pass + + +class _MotorModule: + pass + + +def test_health_check_fails_with_no_workers(): + coord = ModuleCoordinator() + assert coord.health_check() is False + + +def test_health_check_fails_when_worker_died(mocker): + coord = ModuleCoordinator() + dead_worker = mocker.MagicMock(pid=None, worker_id=1) + coord._client = mocker.MagicMock(workers=[dead_worker]) + assert coord.health_check() is False + + +def test_health_check_passes_when_all_alive(mocker): + coord = ModuleCoordinator() + coord._client = mocker.MagicMock( + workers=[mocker.MagicMock(pid=100, worker_id=1), mocker.MagicMock(pid=101, worker_id=2)] + ) + assert coord.health_check() is True + + +def test_list_modules(): + coord = ModuleCoordinator() + coord._deployed_modules = {_SensorModule: object(), _MotorModule: object()} + assert set(coord.list_modules()) == {"_SensorModule", "_MotorModule"} + + +def test_get_module_by_name(mocker): + coord = ModuleCoordinator() + proxy = mocker.MagicMock() + coord._deployed_modules = {_SensorModule: proxy} + assert coord.get_module("_SensorModule") is proxy + + +def test_get_module_unknown_raises(): + coord = ModuleCoordinator() + coord._deployed_modules = {_SensorModule: object()} + with pytest.raises(KeyError, match="NoSuch"): + coord.get_module("NoSuch") + + +def test_get_module_location(): + coord = ModuleCoordinator() + coord._module_locations = {"Sensor": ("localhost", 5000)} + assert coord.get_module_location("Sensor") == ("localhost", 5000) + assert coord.get_module_location("Unknown") is None + + +def test_stop_calls_stop_on_all_modules(mocker): + coord = ModuleCoordinator() + proxy_a = mocker.MagicMock() + proxy_b = mocker.MagicMock() + coord._deployed_modules = {_SensorModule: proxy_a, _MotorModule: proxy_b} + coord._client = mocker.MagicMock() + + coord.stop() + + proxy_a.stop.assert_called_once() + proxy_b.stop.assert_called_once() + coord._client.close_all.assert_called_once() + + +def test_stop_resilient_to_module_error(mocker): + """A module raising during stop() must not prevent other modules from stopping.""" + coord = ModuleCoordinator() + proxy_a = mocker.MagicMock() + proxy_a.stop.side_effect = RuntimeError("boom") + proxy_b = mocker.MagicMock() + coord._deployed_modules = {_SensorModule: proxy_a, _MotorModule: proxy_b} + coord._client = mocker.MagicMock() + + coord.stop() + + proxy_b.stop.assert_called_once() + + +def test_start_repl_server_populates_locations(mocker): + coord = ModuleCoordinator() + worker = mocker.MagicMock() + worker.start_repl_server.return_value = 9999 + worker.module_names = ["Sensor", "Motor"] + coord._client = mocker.MagicMock(workers=[worker]) + mocker.patch("dimos.core.module_coordinator.ReplServer") + + coord.start_repl_server(port=12345) + + assert coord.get_module_location("Sensor") == ("localhost", 9999) + assert coord.get_module_location("Motor") == ("localhost", 9999) + + +def test_start_repl_server_skips_failed_worker(mocker): + coord = ModuleCoordinator() + worker = mocker.MagicMock() + worker.start_repl_server.return_value = None + worker.module_names = ["Sensor"] + coord._client = mocker.MagicMock(workers=[worker]) + mocker.patch("dimos.core.module_coordinator.ReplServer") + + coord.start_repl_server() + + assert coord.get_module_location("Sensor") is None diff --git a/dimos/core/test_repl_server.py b/dimos/core/test_repl_server.py new file mode 100644 index 0000000000..031d755b58 --- /dev/null +++ b/dimos/core/test_repl_server.py @@ -0,0 +1,132 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import threading + +import pytest +import rpyc +from rpyc.utils.server import ThreadedServer + +from dimos.core.repl_server import _THREAD_NAME, ReplServer, start_worker_repl_server + + +@pytest.fixture +def coordinator_server(find_free_port, wait_until_rpyc_connectable, make_stub_coordinator): + port = find_free_port() + coord = make_stub_coordinator( + modules=["Sensor", "Motor"], + locations={"Sensor": ("10.0.0.1", 5000)}, + ) + server = ReplServer(coord, port=port, host="127.0.0.1") + server.start() + wait_until_rpyc_connectable("127.0.0.1", port) + yield port + thread = server._thread + server.stop() + if thread is not None: + thread.join(timeout=2.0) + + +@pytest.fixture +def coordinator_conn(coordinator_server): + conn = rpyc.connect("127.0.0.1", coordinator_server, config={"sync_request_timeout": 5}) + yield conn + conn.close() + + +@pytest.fixture +def worker_server(mocker, wait_until_rpyc_connectable): + # Keep track of all the servers so we can close them. + servers = [] + real_init = ThreadedServer.__init__ + + def tracking_init(self, *args, **kwargs): + real_init(self, *args, **kwargs) + servers.append(self) + + mocker.patch.object(ThreadedServer, "__init__", tracking_init) + + class ModuleA: + value = 42 + + class ModuleB: + pass + + instances = {1: ModuleA(), 2: ModuleB()} + port = start_worker_repl_server(instances, host="127.0.0.1") + wait_until_rpyc_connectable("127.0.0.1", port) + + yield port + + for srv in servers: + srv.close() + + for t in threading.enumerate(): + if t.name == _THREAD_NAME: + t.join(timeout=2.0) + break + + +@pytest.fixture +def worker_conn(worker_server): + conn = rpyc.connect("127.0.0.1", worker_server, config={"sync_request_timeout": 5}) + yield conn + conn.close() + + +def test_list_modules(coordinator_conn): + assert set(coordinator_conn.root.list_modules()) == {"Sensor", "Motor"} + + +def test_get_module_location_known(coordinator_conn): + loc = coordinator_conn.root.get_module_location("Sensor") + assert (str(loc[0]), int(loc[1])) == ("10.0.0.1", 5000) + + +def test_get_module_location_unknown(coordinator_conn): + assert coordinator_conn.root.get_module_location("NoSuch") is None + + +def test_stop_prevents_new_connections( + find_free_port, wait_until_rpyc_connectable, make_stub_coordinator +): + port = find_free_port() + server = ReplServer(make_stub_coordinator(), port=port, host="127.0.0.1") + server.start() + wait_until_rpyc_connectable("127.0.0.1", port) + thread = server._thread + server.stop() + if thread is not None: + thread.join(timeout=2.0) + + with pytest.raises(ConnectionRefusedError): + rpyc.connect("127.0.0.1", port) + + +def test_worker_get_instance_by_name(worker_conn): + result = worker_conn.root.get_instance_by_name("ModuleA") + assert result.value == 42 + + +def test_worker_get_instance_unknown_raises(worker_conn): + with pytest.raises(KeyError, match="NoSuchModule"): + worker_conn.root.get_instance_by_name("NoSuchModule") + + +def test_worker_list_instances(worker_conn): + mapping = worker_conn.root.list_instances() + assert str(mapping[1]) == "ModuleA" + assert str(mapping[2]) == "ModuleB" diff --git a/dimos/core/worker.py b/dimos/core/worker.py index 8f3beee7ec..191e48d83b 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -235,6 +235,25 @@ def deploy_module( ) return actor + def start_repl_server(self, host: str = "localhost") -> int | None: + """Ask the worker process to start its RPyC REPL server. Returns the port.""" + if self._conn is None: + return None + try: + with self._lock: + self._conn.send({"type": "start_repl_server", "host": host}) + response = self._conn.recv() + if response.get("error"): + logger.warning( + "Worker failed to start REPL server", + worker_id=self._worker_id, + error=response["error"], + ) + return None + return int(response["result"]) + except (BrokenPipeError, EOFError, ConnectionResetError): + return None + def suppress_console(self) -> None: if self._conn is None: return @@ -256,6 +275,7 @@ def shutdown(self) -> None: logger.warning( "Worker did not respond to shutdown within 5s, closing pipe.", worker_id=self._worker_id, + module_names=self.module_names, ) except (BrokenPipeError, EOFError, ConnectionResetError): pass @@ -364,6 +384,12 @@ def _worker_loop(conn: Connection, instances: dict[int, Any], worker_id: int) -> result = method(*request.get("args", ()), **request.get("kwargs", {})) response["result"] = result + elif req_type == "start_repl_server": + from dimos.core.repl_server import start_worker_repl_server + + port = start_worker_repl_server(instances, request.get("host", "localhost")) + response["result"] = port + elif req_type == "suppress_console": _suppress_console_output() response["result"] = True diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 1137a612f3..497859cd32 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -30,6 +30,7 @@ from dimos.agents.mcp.mcp_adapter import McpAdapter, McpError from dimos.core.global_config import GlobalConfig, global_config from dimos.core.run_registry import get_most_recent, is_pid_alive, stop_entry +from dimos.robot.cli.repl import repl_command from dimos.utils.logging_config import setup_logger logger = setup_logger() @@ -114,6 +115,8 @@ def run( robot_types: list[str] = typer.Argument(..., help="Blueprints or modules to run"), daemon: bool = typer.Option(False, "--daemon", "-d", help="Run in background"), disable: list[str] = typer.Option([], "--disable", help="Module names to disable"), + repl: bool = typer.Option(True, "--repl/--no-repl", help="Enable RPyC REPL server"), + repl_port: int = typer.Option(18861, "--repl-port", help="REPL server port"), ) -> None: """Start a robot blueprint""" logger.info("Starting DimOS") @@ -165,15 +168,27 @@ def run( coordinator = blueprint.build(cli_config_overrides=cli_config_overrides) + entry = RunEntry( + run_id=run_id, + pid=os.getpid(), + blueprint=blueprint_name, + started_at=datetime.now(timezone.utc).isoformat(), + log_dir=str(log_dir), + cli_args=list(robot_types), + config_overrides=cli_config_overrides, + original_argv=sys.argv, + repl_port=repl_port if repl else None, + ) + if daemon: from dimos.core.daemon import ( daemonize, install_signal_handlers, ) - # Health check before daemonizing — catch early crashes + # Health check before daemonizing to catch early crashes. if not coordinator.health_check(): - typer.echo("Error: health check failed — a worker process died.", err=True) + typer.echo("Error: health check failed because a worker process died.", err=True) coordinator.stop() raise typer.Exit(1) @@ -184,37 +199,27 @@ def run( typer.echo("✓ DimOS running in background\n") typer.echo(f" Run ID: {run_id}") typer.echo(f" Log: {log_dir}") + if repl: + typer.echo(f" REPL: dimos repl (port {repl_port})") typer.echo(" Stop: dimos stop") typer.echo(" Status: dimos status") coordinator.suppress_console() daemonize(log_dir) + entry.pid = os.getpid() # update to daemon's PID after double-fork + + # Start REPL server after daemonize (threads don't survive fork). + if repl: + coordinator.start_repl_server(port=repl_port) - entry = RunEntry( - run_id=run_id, - pid=os.getpid(), - blueprint=blueprint_name, - started_at=datetime.now(timezone.utc).isoformat(), - log_dir=str(log_dir), - cli_args=list(robot_types), - config_overrides=cli_config_overrides, - original_argv=sys.argv, - ) entry.save() install_signal_handlers(entry, coordinator) coordinator.loop() else: - entry = RunEntry( - run_id=run_id, - pid=os.getpid(), - blueprint=blueprint_name, - started_at=datetime.now(timezone.utc).isoformat(), - log_dir=str(log_dir), - cli_args=list(robot_types), - config_overrides=cli_config_overrides, - original_argv=sys.argv, - ) + if repl: + coordinator.start_repl_server(port=repl_port) + entry.save() try: coordinator.loop() @@ -466,6 +471,15 @@ def restart( raise typer.Exit(1) +@main.command() +def repl( + host: str = typer.Option("localhost", "--host", "-H", help="Host to connect to"), + port: int | None = typer.Option(None, "--port", "-p", help="REPL server port (auto-detected)"), +) -> None: + """Connect to a running DimOS instance for interactive debugging.""" + repl_command(host, port) + + @main.command() def show_config(ctx: typer.Context) -> None: """Show current config settings and their values.""" diff --git a/dimos/robot/cli/repl.py b/dimos/robot/cli/repl.py new file mode 100644 index 0000000000..13f6122fcc --- /dev/null +++ b/dimos/robot/cli/repl.py @@ -0,0 +1,115 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import sys +from typing import Any + +import rpyc +import typer + +from dimos.core.repl_server import DEFAULT_REPL_PORT +from dimos.core.run_registry import get_most_recent + + +def repl_command(host: str, port: int | None) -> None: + if port is None: + entry = get_most_recent(alive_only=True) + if entry and entry.repl_port: + port = entry.repl_port + else: + port = DEFAULT_REPL_PORT + + try: + conn = rpyc.connect(host, port, config={"sync_request_timeout": None}) + except ConnectionRefusedError: + typer.echo( + f"Error: cannot connect to {host}:{port}\n" + "Is DimOS running? (REPL is enabled by default with 'dimos run')", + err=True, + ) + raise typer.Exit(1) + + coordinator = conn.root.get_coordinator() + + # Cache worker connections so each worker is connected to at most once. + _worker_conns: dict[tuple[str, int], rpyc.Connection] = {} + + def modules() -> list[str]: + """List deployed module names.""" + return list(conn.root.list_modules()) + + def get(name: str): # type: ignore[no-untyped-def] + """Get a module instance by class name (connects directly to its worker).""" + location = conn.root.get_module_location(name) + if location is None: + available = modules() + raise KeyError(f"Module '{name}' not found. Available: {available}") + w_host = str(location[0]) + w_port = int(location[1]) + key = (w_host, w_port) + if key not in _worker_conns or _worker_conns[key].closed: + _worker_conns[key] = rpyc.connect(w_host, w_port, config={"sync_request_timeout": None}) + return _worker_conns[key].root.get_instance_by_name(name) + + ns: dict[str, Any] = { + "coordinator": coordinator, + "modules": modules, + "get": get, + "conn": conn, + "rpyc": rpyc, + } + + banner = ( + "DimOS REPL\n" + f"Connected to {host}:{port}\n" + "\n" + " coordinator ModuleCoordinator instance\n" + " modules() List deployed module names\n" + " get(name) Get module instance by class name\n" + ) + + use_ipython = _has_ipython() and _is_interactive() + + try: + if use_ipython: + import IPython + + print(banner) + IPython.start_ipython(argv=[], user_ns=ns, display_banner=False) # type: ignore[no-untyped-call] + + else: + import code + + code.interact(banner, local=ns) + finally: + for wc in _worker_conns.values(): + try: + wc.close() + except Exception: + pass + conn.close() + + +def _has_ipython() -> bool: + try: + import IPython # noqa: F401 + except ImportError: + return False + return True + + +def _is_interactive() -> bool: + return bool(hasattr(sys, "ps1") or sys.flags.interactive or sys.stdin.isatty()) diff --git a/dimos/robot/cli/test_repl.py b/dimos/robot/cli/test_repl.py new file mode 100644 index 0000000000..eef708d052 --- /dev/null +++ b/dimos/robot/cli/test_repl.py @@ -0,0 +1,134 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import pytest +import typer + +from dimos.core.repl_server import DEFAULT_REPL_PORT, ReplServer +from dimos.robot.cli.repl import repl_command + + +@pytest.fixture +def force_stdlib_repl(mocker): + mocker.patch("dimos.robot.cli.repl._has_ipython", return_value=False) + + +@pytest.fixture +def no_registry(mocker): + mocker.patch("dimos.robot.cli.repl.get_most_recent", return_value=None) + + +@pytest.fixture +def mock_connect(mocker): + return mocker.patch("dimos.robot.cli.repl.rpyc.connect", side_effect=ConnectionRefusedError) + + +@pytest.fixture +def repl_server(find_free_port, wait_until_rpyc_connectable, make_stub_coordinator): + """Start a real ReplServer on a free port and tear it down after the test.""" + coordinator = make_stub_coordinator( + modules={"ModuleA": ("127.0.0.1", 0), "ModuleB": ("127.0.0.1", 0)} + ) + port = find_free_port() + server = ReplServer(coordinator, port=port, host="127.0.0.1") + server.start() + + wait_until_rpyc_connectable("127.0.0.1", port) + + yield port + + thread = server._thread + server.stop() + if thread is not None: + thread.join(timeout=2.0) + + +def test_explicit_port_is_used(mock_connect): + """An explicit port is forwarded to rpyc.connect as-is.""" + with pytest.raises(typer.Exit): + repl_command(host="127.0.0.1", port=12345) + + assert mock_connect.call_args[0] == ("127.0.0.1", 12345) + + +def test_port_from_registry(mocker, mock_connect): + """When port is None the registry entry's repl_port is used.""" + entry = mocker.MagicMock(repl_port=9999) + mocker.patch("dimos.robot.cli.repl.get_most_recent", return_value=entry) + + with pytest.raises(typer.Exit): + repl_command(host="127.0.0.1", port=None) + + assert mock_connect.call_args[0] == ("127.0.0.1", 9999) + + +def test_port_defaults_when_no_registry(mock_connect, no_registry): + """Falls back to DEFAULT_REPL_PORT when no registry entry exists.""" + + with pytest.raises(typer.Exit): + repl_command(host="127.0.0.1", port=None) + + assert mock_connect.call_args[0] == ("127.0.0.1", DEFAULT_REPL_PORT) + + +def test_port_defaults_when_entry_has_no_repl_port(mocker, mock_connect): + """Registry entry with repl_port=None still falls back to DEFAULT_REPL_PORT.""" + entry = mocker.MagicMock(repl_port=None) + mocker.patch("dimos.robot.cli.repl.get_most_recent", return_value=entry) + + with pytest.raises(typer.Exit): + repl_command(host="127.0.0.1", port=None) + + assert mock_connect.call_args[0] == ("127.0.0.1", DEFAULT_REPL_PORT) + + +def test_connection_refused_exits_with_helpful_message(capsys, find_free_port): + """Real refused connection exits with code 1 and shows host:port.""" + port = find_free_port() + + with pytest.raises(typer.Exit) as exc_info: + repl_command(host="127.0.0.1", port=port) + + assert exc_info.value.exit_code == 1 + assert f"127.0.0.1:{port}" in capsys.readouterr().err + + +def test_modules_lists_deployed_names(repl_server, mocker, force_stdlib_repl, no_registry): + """modules() returns the names provided by the coordinator.""" + result = {} + + def _interact(banner, local): + result["modules"] = local["modules"]() + + mocker.patch("code.interact", side_effect=_interact) + repl_command(host="127.0.0.1", port=repl_server) + + assert set(result["modules"]) == {"ModuleA", "ModuleB"} + + +def test_get_raises_for_unknown_module(repl_server, mocker, force_stdlib_repl, no_registry): + """get() raises KeyError when the module is not deployed.""" + ran = [] + + def _interact(banner, local): + with pytest.raises(KeyError, match="NoSuchModule"): + local["get"]("NoSuchModule") + ran.append(True) + + mocker.patch("code.interact", side_effect=_interact) + repl_command(host="127.0.0.1", port=repl_server) + + assert ran # guard: the assertion inside _interact actually executed diff --git a/dimos/simulation/unity/test_unity_sim.py b/dimos/simulation/unity/test_unity_sim.py index 7ac9c49296..9cef8121fb 100644 --- a/dimos/simulation/unity/test_unity_sim.py +++ b/dimos/simulation/unity/test_unity_sim.py @@ -83,12 +83,6 @@ def _wire(module) -> dict[str, _MockTransport]: return ts -def _find_free_port() -> int: - with socket.socket() as s: - s.bind(("", 0)) - return s.getsockname()[1] - - def _build_ros1_pointcloud2(points: np.ndarray, frame_id: str = "map") -> bytes: w = ROS1Writer() w.u32(0) @@ -213,9 +207,9 @@ def test_serialize_pose_stamped_round_trip(self): class TestTCPBridge: - def test_handshake_and_data_flow(self): + def test_handshake_and_data_flow(self, find_free_port): """Mock Unity connects, sends a PointCloud2, verifies bridge publishes it.""" - port = _find_free_port() + port = find_free_port() m = UnityBridgeModule(unity_binary="", unity_port=port) ts = _wire(m) diff --git a/docs/usage/README.md b/docs/usage/README.md index 071b6fc0b2..f6e4d2b3ae 100644 --- a/docs/usage/README.md +++ b/docs/usage/README.md @@ -10,3 +10,4 @@ This page explains general concepts. - [RPC](/docs/usage/blueprints.md#calling-the-methods-of-other-modules): how one module can call a method on another module (arguments get serialized to JSON-like binary data). - [Skills](/docs/usage/blueprints.md#defining-skills): An RPC function, except it can be called by an AI agent (a tool for an AI). - Agents: AI that has an objective, access to stream data, and is capable of calling skills as tools. +- [REPL](/docs/usage/repl.md): Live Python shell connected to a running DimOS instance for inspection and debugging. diff --git a/docs/usage/cli.md b/docs/usage/cli.md index 50a76cf552..dbf133673d 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -67,6 +67,8 @@ dimos run [ ...] [--daemon] [--disable ...] |--------|-------------| | `--daemon`, `-d` | Run in background (double-fork, health check, writes run registry) | | `--disable` | Module class names to exclude from the blueprint | +| `--repl` / `--no-repl` | Enable RPyC REPL server (default: enabled) | +| `--repl-port` | REPL server port (default: `18861`) | ```bash # Foreground (Ctrl-C to stop) @@ -172,6 +174,25 @@ All processes (main + workers) write to the same `main.jsonl`. Filter by module: dimos log --json | jq 'select(.logger | contains("RerunBridge"))' ``` +### `dimos repl` + +Connect to a running DimOS instance for interactive debugging. See [REPL](/docs/usage/repl.md) for full details. + +```bash +dimos repl [--host HOST] [--port PORT] +``` + +| Option | Description | +|--------|-------------| +| `--host`, `-H` | Host to connect to (default: `localhost`) | +| `--port`, `-p` | REPL server port (auto-detected from run registry) | + +```bash +dimos repl # auto-detect port from run registry +dimos repl --port 19000 # connect to a custom port +dimos repl --host 192.168.1.5 # connect to a remote instance +``` + ### `dimos list` List all available blueprints. diff --git a/docs/usage/repl.md b/docs/usage/repl.md new file mode 100644 index 0000000000..9ecf1e6474 --- /dev/null +++ b/docs/usage/repl.md @@ -0,0 +1,85 @@ +# REPL + +The REPL gives you a live Python shell connected to a running DimOS instance. You can inspect module state, call methods, and debug without restarting. + +## Quick Start + +Start DimOS (the REPL server is enabled by default): + +```bash +dimos run unitree-go2 +``` + +In another terminal, connect: + +```bash +dimos repl +``` + +You get an interactive Python session with these pre-loaded objects: + +| Name | Description | +|------|-------------| +| `coordinator` | The `ModuleCoordinator` instance | +| `modules()` | List deployed module class names | +| `get(name)` | Get a live module instance by class name | + +## Examples + +```python +# List all deployed modules +>>> modules() +['GO2Connection', 'RerunBridge', 'McpServer', ...] + +# Get a module instance and call methods on it +>>> wfe = get('WavefrontFrontierExplorer') +>>> wfe.begin_exploration() +"Started exploring." + +# Access the coordinator directly +>>> coordinator.list_modules() +['GO2Connection', 'RerunBridge', ...] +``` + +## How It Works + +The REPL uses [RPyC](https://rpyc.readthedocs.io/) for transparent remote object access. When `dimos run` starts, it launches: + +1. A **coordinator RPyC server** on the main process (default port `18861`). This is the entry point for `dimos repl`. +2. A **worker RPyC server** inside each worker process (auto-assigned ports). When you call `get("ModuleName")`, the REPL connects directly to that module's worker process. + +This means `get()` returns a live proxy to the actual module object in its worker process. Attribute access, method calls, and return values are transparently proxied over the network. + +## CLI Reference + +### `dimos run` Options + +| Option | Default | Description | +|--------|---------|-------------| +| `--repl` / `--no-repl` | `--repl` (enabled) | Enable or disable the RPyC REPL server | +| `--repl-port` | `18861` | Port for the coordinator REPL server | + +```bash +# Disable the REPL server +dimos run unitree-go2 --no-repl + +# Use a custom port +dimos run unitree-go2 --repl-port 19000 +``` + +### `dimos repl` + +Connect to a running DimOS instance. + +```bash +dimos repl [--host HOST] [--port PORT] +``` + +| Option | Default | Description | +|--------|---------|-------------| +| `--host`, `-H` | `localhost` | Host to connect to | +| `--port`, `-p` | auto-detected | REPL server port (reads from run registry if omitted) | + +The port is auto-detected from the run registry. You only need `--port` if you used a custom `--repl-port` and there is no active run entry (e.g., non-daemon foreground run that was killed). + +If IPython is installed, the REPL uses it automatically for tab completion, syntax highlighting, and history. Otherwise it falls back to the standard Python REPL. diff --git a/pyproject.toml b/pyproject.toml index 7e2f38546e..5a52c0d7be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,7 @@ dependencies = [ "psutil>=7.0.0", "sqlite-vec>=0.1.6", "lz4>=4.4.5", + "rpyc", # Used for `dimos repl` ] @@ -409,6 +410,8 @@ module = [ "pyzed", "pyzed.*", "rclpy.*", + "rpyc", + "rpyc.*", "sam2.*", "scipy", "scipy.*", diff --git a/uv.lock b/uv.lock index 529842294b..61de2ee82f 100644 --- a/uv.lock +++ b/uv.lock @@ -1704,6 +1704,7 @@ dependencies = [ { name = "pyturbojpeg" }, { name = "reactivex" }, { name = "rerun-sdk" }, + { name = "rpyc" }, { name = "scipy", version = "1.15.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "scipy", version = "1.17.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "sortedcontainers" }, @@ -2096,6 +2097,7 @@ requires-dist = [ { name = "rerun-sdk", specifier = ">=0.20.0" }, { name = "rerun-sdk", marker = "extra == 'docker'" }, { name = "rerun-sdk", marker = "extra == 'visualization'", specifier = ">=0.20.0" }, + { name = "rpyc" }, { name = "ruff", marker = "extra == 'dev'", specifier = "==0.14.3" }, { name = "scikit-learn", marker = "extra == 'misc'" }, { name = "scipy", specifier = ">=1.15.1" }, @@ -6871,6 +6873,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2b/31/21609a9be48e877bc33b089a7f495c853215def5aeb9564a31c210d9d769/plum_dispatch-2.5.7-py3-none-any.whl", hash = "sha256:06471782eea0b3798c1e79dca2af2165bafcfa5eb595540b514ddd81053b1ede", size = 42612, upload-time = "2025-01-17T20:07:26.461Z" }, ] +[[package]] +name = "plumbum" +version = "1.10.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pywin32", marker = "platform_python_implementation != 'PyPy' and sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/dc/c8/11a5f792704b70f071a3dbc329105a98e9cc8d25daaf09f733c44eb0ef8e/plumbum-1.10.0.tar.gz", hash = "sha256:f8cbf0ecec0b73ff4e349398b65112a9e3f9300e7dc019001217dcc148d5c97c", size = 320039, upload-time = "2025-10-31T05:02:48.697Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/79/ad/45312df6b63ba64ea35b8d8f5f0c577aac16e6b416eafe8e1cb34e03f9a7/plumbum-1.10.0-py3-none-any.whl", hash = "sha256:9583d737ac901c474d99d030e4d5eec4c4e6d2d7417b1cf49728cf3be34f6dc8", size = 127383, upload-time = "2025-10-31T05:02:47.002Z" }, +] + [[package]] name = "polars" version = "1.38.1" @@ -8678,6 +8692,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/b7/b95708304cd49b7b6f82fdd039f1748b66ec2b21d6a45180910802f1abf1/rpds_py-0.30.0-pp311-pypy311_pp73-musllinux_1_2_x86_64.whl", hash = "sha256:ac37f9f516c51e5753f27dfdef11a88330f04de2d564be3991384b2f3535d02e", size = 562191, upload-time = "2025-11-30T20:24:36.853Z" }, ] +[[package]] +name = "rpyc" +version = "6.0.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "plumbum" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8b/e7/1c17410673b634f4658bb5d2232d0c4507432a97508b2c6708e59481644a/rpyc-6.0.2.tar.gz", hash = "sha256:8e780a6a71b842128a80a337c64adfb6f919014e069951832161c9efc630c93b", size = 62321, upload-time = "2025-04-18T16:33:21.693Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3f/99/2e119d541d596daea39643eb9312b47c7847383951300f889166938035b1/rpyc-6.0.2-py3-none-any.whl", hash = "sha256:8072308ad30725bc281c42c011fc8c922be15f3eeda6eafb2917cafe1b6f00ec", size = 74768, upload-time = "2025-04-18T16:33:20.147Z" }, +] + [[package]] name = "ruff" version = "0.14.3"