feat(viz): add RerunWebSocketServer for dimos-viewer remote mode#1643
feat(viz): add RerunWebSocketServer for dimos-viewer remote mode#1643jeff-hykin wants to merge 12 commits intodevfrom
Conversation
Greptile SummaryThis PR adds
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Viewer as dimos-viewer<br/>(--connect mode)
participant WS as RerunWebSocketServer<br/>(0.0.0.0:3030/ws)
participant CP as clicked_point<br/>Out[PointStamped]
participant TV as tele_cmd_vel<br/>Out[Twist]
participant DS as Downstream<br/>(e.g. ReplanningAStarPlanner)
Viewer->>WS: WebSocket connect (ws://host:3030/ws)
WS-->>Viewer: connection accepted
loop Every second
Viewer->>WS: {"type":"heartbeat","timestamp_ms":...}
Note over WS: logged, no publish
end
Viewer->>WS: {"type":"click","x":1.0,"y":2.0,"z":0.5,...}
WS->>CP: publish PointStamped(x,y,z,ts,frame_id)
CP->>DS: subscriber callback
Viewer->>WS: {"type":"twist","linear_x":0.5,...,"angular_z":0.8}
WS->>TV: publish Twist(linear, angular)
Viewer->>WS: {"type":"stop"}
WS->>TV: publish Twist.zero()
Viewer--xWS: disconnect
Note over WS: logs disconnect, ready for reconnect
Last reviewed commit: "fix: ruff formatting..." |
| def _dispatch(self, raw: str | bytes) -> None: | ||
| try: | ||
| msg = json.loads(raw) | ||
| except json.JSONDecodeError: | ||
| logger.warning(f"RerunWebSocketServer: ignoring non-JSON message: {raw!r}") | ||
| return | ||
|
|
||
| msg_type = msg.get("type") | ||
|
|
There was a problem hiding this comment.
Non-dict JSON causes unhandled
AttributeError
json.loads() can return any JSON value (list, int, null, bool, etc.). If it returns anything other than a dict, the call to msg.get("type") on line 154 raises AttributeError: 'list' object has no attribute 'get', which is not caught by the json.JSONDecodeError guard. This exception propagates through _handle_client (which only catches websockets.ConnectionClosed) and becomes an unhandled coroutine exception, abruptly terminating the connection handler for that client.
| def _dispatch(self, raw: str | bytes) -> None: | |
| try: | |
| msg = json.loads(raw) | |
| except json.JSONDecodeError: | |
| logger.warning(f"RerunWebSocketServer: ignoring non-JSON message: {raw!r}") | |
| return | |
| msg_type = msg.get("type") | |
| def _dispatch(self, raw: str | bytes) -> None: | |
| try: | |
| msg = json.loads(raw) | |
| except json.JSONDecodeError: | |
| logger.warning(f"RerunWebSocketServer: ignoring non-JSON message: {raw!r}") | |
| return | |
| if not isinstance(msg, dict): | |
| logger.warning(f"RerunWebSocketServer: ignoring non-object JSON message: {raw!r}") | |
| return | |
| msg_type = msg.get("type") |
| @rpc | ||
| def stop(self) -> None: | ||
| if ( | ||
| self._ws_loop is not None | ||
| and not self._ws_loop.is_closed() | ||
| and self._stop_event is not None | ||
| ): | ||
| self._ws_loop.call_soon_threadsafe(self._stop_event.set) | ||
| super().stop() |
There was a problem hiding this comment.
Race condition:
stop() is a no-op if called before _serve() initialises _stop_event
_stop_event is assigned inside the _serve() coroutine (line 122), which runs on the background thread. There is a window between the thread starting (and _ws_loop being set) and the coroutine executing the self._stop_event = asyncio.Event() line. If stop() is called in that window, the guard self._stop_event is not None is False and the call to call_soon_threadsafe is skipped entirely — super().stop() still runs, but the asyncio server loop never receives the stop signal and keeps the port bound until the process exits.
In practice this is most likely to surface when a test fails before _wait_for_server() returns, leaving the port occupied for the next test run. Consider initialising the event before the thread is spawned (using asyncio.Event() on the server's own loop once it is created) or adding a proper join() with a short deadline so stop() is always synchronous.
| env={ | ||
| "DISPLAY": "", | ||
| "HOME": "/home/dimos", | ||
| "PATH": "/home/dimos/.cargo/bin:/usr/bin:/bin", | ||
| }, | ||
| stdout=subprocess.PIPE, | ||
| stderr=subprocess.PIPE, | ||
| ) | ||
|
|
||
| # Give the viewer up to 5 s to connect its WebSocket client to our server. | ||
| # We detect the connection by waiting for the server to accept a client. | ||
| deadline = time.monotonic() + 5.0 | ||
| while time.monotonic() < deadline: |
There was a problem hiding this comment.
Hard-coded
HOME and PATH will break on most CI/developer machines
The stripped environment passes "HOME": "/home/dimos" and a minimal PATH that only covers /home/dimos/.cargo/bin, /usr/bin, and /bin. This will fail to locate dimos-viewer on any system where it is installed in a virtual-environment bin (e.g. ~/.local/bin, $VIRTUAL_ENV/bin, or a pixi-managed path) or where the user's home directory is not /home/dimos.
Consider inheriting the caller's environment and only overriding DISPLAY, or using shutil.which("dimos-viewer") to assert the binary is reachable before the test runs:
import os, shutil
env = {**os.environ, "DISPLAY": ""}
proc = subprocess.Popen(
["dimos-viewer", "--connect", f"--ws-url=ws://127.0.0.1:{_E2E_PORT}/ws"],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)| async def _serve(self) -> None: | ||
| import websockets.asyncio.server as ws_server | ||
|
|
||
| self._stop_event = asyncio.Event() | ||
|
|
||
| async with ws_server.serve( | ||
| self._handle_client, | ||
| host=self.config.host, | ||
| port=self.config.port, | ||
| ): | ||
| logger.info( | ||
| f"RerunWebSocketServer listening on ws://{self.config.host}:{self.config.port}/ws" | ||
| ) | ||
| await self._stop_event.wait() |
There was a problem hiding this comment.
WebSocket server accepts connections on any URL path, not just
/ws
websockets.asyncio.server.serve() called with only a handler function accepts every incoming WebSocket upgrade request regardless of the requested path. Any client that connects to ws://host:3030/ or ws://host:3030/admin gets the same handler. Because the server binds to 0.0.0.0 by default, this means any machine on the network can send arbitrary click and teleop commands by connecting to any path.
Consider adding a path check inside _handle_client and closing the connection immediately if the path is not /ws:
async def _handle_client(self, websocket: Any) -> None:
if websocket.request.path != "/ws":
await websocket.close(1008, "Not Found")
return
...- Move _server_ready.set() inside ws_server.serve() context so stop() waits for the port to actually bind before sending shutdown signal - Add /ws path filter to reject non-viewer WebSocket connections - Add pytest.mark.skipif for dimos-viewer binary test in CI - Fix import ordering in manipulation/blueprints.py
The default websockets ping_interval=20s + ping_timeout=20s was too aggressive. Increase both to 30s to give the viewer more time to respond, especially during brief network hiccups.
…lure If _serve() throws (e.g. port in use), _server_ready was never set, causing stop() to block for 5s. Now logs the exception and sets _server_ready in finally block. Revert: git revert HEAD
fe84b4d to
204d8b7
Compare
NOTE: this will become ready-for-review once this dimos-viewer PR is merged
Problem(s)
--connectdoesn't work for remote connections.cmd_velwhich ends up making stream-renaming a pain since everything else must be renamed to know if the cmd_vel is coming from the viewer or from a planner/module.Solution
Use websockets instead of LCM for the viewer.
This PR only changes one side, the other half lives in this dimos-viewer PR. This should only be merged after that one.
Breaking Changes
None
How to Test
You'll need the
jeff/fix/connectbranch ofdimos-viewercompiled and python installed into dimos:Alternatively
Click in the 3D viewport and use WASD keys — should see
[CLICK]and[TWIST]in terminal 1.Contributor License Agreement