Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7a23d45
vis kickstart
leshy Mar 21, 2026
fdf9bd4
fix(memory2): address PR review — narrow exception catch, fix test bugs
leshy Mar 21, 2026
b0ad0bc
fix(typing): add @overload to embed/embed_text, fix conftest return t…
leshy Mar 21, 2026
607ee10
fix(typing): add @overload to remaining EmbeddingModel subclasses
leshy Mar 21, 2026
3d7becc
batch transform
leshy Mar 21, 2026
4ffc0ff
Merge branch 'dev' into feat/memory2-vis-kickstart
leshy Mar 23, 2026
c936444
checkpoint
leshy Mar 24, 2026
329f263
Merge branch 'dev' into feat/memory2-vis-kickstart
leshy Mar 24, 2026
423128f
fix(memory2): address PR review — delete empty file, add SQL injectio…
leshy Mar 24, 2026
da56191
Merge branch 'feat/memory2-vis-kickstart' into feat/memory2-svgvis
leshy Mar 25, 2026
2057960
transform module
leshy Mar 25, 2026
bac18cc
memory module experiment
leshy Mar 25, 2026
fb0b970
cleanup
leshy Mar 25, 2026
9ca9019
better API
leshy Mar 25, 2026
a71f914
cleanup
leshy Mar 25, 2026
ac838a2
small cleanup
leshy Mar 25, 2026
5e91c8c
memory2 module refactor
leshy Mar 25, 2026
b755dee
nullstore
leshy Mar 25, 2026
39206ab
tests fix
leshy Mar 25, 2026
8f5577b
null store
leshy Mar 26, 2026
4dd60c0
typing cleanup
leshy Mar 26, 2026
fb962be
Merge remote-tracking branch 'origin/dev' into feat/memory2-svgvis
leshy Mar 26, 2026
ae6de37
cleanup
leshy Mar 26, 2026
05fd764
mem module
leshy Mar 26, 2026
8e1e7f1
better store cleanup
leshy Mar 26, 2026
d04ade4
further shutdown cleanup
leshy Mar 26, 2026
338713e
correct live stream shutdown
leshy Mar 26, 2026
70980ff
null store tests extracted
leshy Mar 26, 2026
7a538d3
tests fix
leshy Mar 26, 2026
840f874
observation projection
leshy Mar 26, 2026
71fc3a9
Fix VoxelMap rename to VoxelMapTransformer and typo in VoxelGridMappe…
leshy Mar 26, 2026
3a19b4e
typo
leshy Mar 26, 2026
abb2fbf
Merge branch 'dev' into feat/memory2-svgvis
leshy Mar 26, 2026
331d821
Use CompositeResource.register_disposable() instead of direct _dispos…
leshy Mar 27, 2026
b84f96c
agent test writing guidelines
leshy Mar 27, 2026
b5c2ae8
test_null: test through NullStore API instead of ListObservationStore…
leshy Mar 27, 2026
a3ff4ab
Address PR #1682 review comments
leshy Mar 27, 2026
207e763
Rename _xf to _transform in Stream for readability
leshy Mar 27, 2026
b56517d
Fix flaky LCM test timeout and drone mock disposable error
leshy Mar 27, 2026
503e992
disposing review
leshy Mar 27, 2026
49c9b64
Add stream-level custody tests for Stream → Backend ownership
leshy Mar 27, 2026
51745e9
test cleanup
leshy Mar 27, 2026
1b5f3d4
Add agent code style guide with no-banner rule
leshy Mar 27, 2026
f135766
unbound stream tests belong in stream
leshy Mar 27, 2026
889d153
comemnts cleanup
leshy Mar 27, 2026
9aabcd3
small cleanup
leshy Mar 27, 2026
5fe848f
Wrap subscribe() callables in Disposable for register_disposable()
leshy Mar 27, 2026
07b6609
Merge branch 'dev' into feat/memory2-svgvis
leshy Mar 27, 2026
469da56
temporary test supression
leshy Mar 27, 2026
88aa58a
Merge branch 'feat/memory2-svgvis' of github.com:dimensionalOS/dimos …
leshy Mar 27, 2026
b0f04c9
obs.start bugfix
leshy Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data/.lfs/go2_bigoffice.db.tar.gz
Git LFS file not shown
4 changes: 2 additions & 2 deletions dimos/agents/agent_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(Disposable(self.agent.subscribe(self._on_agent_message)))
self._disposables.add(Disposable(self.agent_idle.subscribe(self._on_agent_idle)))
self.register_disposable(Disposable(self.agent.subscribe(self._on_agent_message)))
self.register_disposable(Disposable(self.agent_idle.subscribe(self._on_agent_idle)))
# Signal that subscription is ready
self._subscription_ready.set()

Expand Down
2 changes: 1 addition & 1 deletion dimos/agents/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def start(self) -> None:
def _on_human_input(string: str) -> None:
self._message_queue.put(HumanMessage(content=string))

self._disposables.add(Disposable(self.human_input.subscribe(_on_human_input)))
self.register_disposable(Disposable(self.human_input.subscribe(_on_human_input)))

@rpc
def on_system_modules(self, _modules: list[RPCClient]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion dimos/agents/skills/demo_robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class DemoRobot(Module):

def start(self) -> None:
super().start()
self._disposables.add(interval(1.0).subscribe(lambda _: self._publish_gps_location()))
self.register_disposable(interval(1.0).subscribe(lambda _: self._publish_gps_location()))

def stop(self) -> None:
super().stop()
Expand Down
4 changes: 3 additions & 1 deletion dimos/agents/skills/google_maps_skill_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import json
from typing import Any

from reactivex.disposable import Disposable

from dimos.agents.annotation import skill
from dimos.core.core import rpc
from dimos.core.module import Module
Expand Down Expand Up @@ -49,7 +51,7 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(self.gps_location.subscribe(self._on_gps_location)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.gps_location.subscribe(self._on_gps_location)))

@rpc
def stop(self) -> None:
Expand Down
4 changes: 3 additions & 1 deletion dimos/agents/skills/gps_nav_skill.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import json

from reactivex.disposable import Disposable

from dimos.agents.annotation import skill
from dimos.core.core import rpc
from dimos.core.module import Module
Expand All @@ -37,7 +39,7 @@ class GpsNavSkillContainer(Module):
@rpc
def start(self) -> None:
super().start()
self._disposables.add(self.gps_location.subscribe(self._on_gps_location)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.gps_location.subscribe(self._on_gps_location)))

@rpc
def stop(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions dimos/agents/skills/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(Disposable(self.color_image.subscribe(self._on_color_image)))
self._disposables.add(Disposable(self.odom.subscribe(self._on_odom)))
self.register_disposable(Disposable(self.color_image.subscribe(self._on_color_image)))
self.register_disposable(Disposable(self.odom.subscribe(self._on_odom)))
self._skill_started = True

@rpc
Expand Down
4 changes: 3 additions & 1 deletion dimos/agents/skills/osm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.


from reactivex.disposable import Disposable

from dimos.agents.annotation import skill
from dimos.core.module import Module
from dimos.core.stream import In
Expand All @@ -39,7 +41,7 @@ def __init__(self) -> None:
def start(self) -> None:
super().start()
if hasattr(self.gps_location, "subscribe"):
self._disposables.add(self.gps_location.subscribe(self._on_gps_location)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.gps_location.subscribe(self._on_gps_location)))
else:
logger.warning(
"OsmSkill: gps_location stream does not support direct subscribe (RemoteIn)"
Expand Down
4 changes: 2 additions & 2 deletions dimos/agents/skills/person_follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(Disposable(self.color_image.subscribe(self._on_color_image)))
self.register_disposable(Disposable(self.color_image.subscribe(self._on_color_image)))
if self.config.use_3d_navigation:
self._disposables.add(Disposable(self.global_map.subscribe(self._on_pointcloud)))
self.register_disposable(Disposable(self.global_map.subscribe(self._on_pointcloud)))

@rpc
def stop(self) -> None:
Expand Down
5 changes: 3 additions & 2 deletions dimos/agents/vlm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from langchain.chat_models import init_chat_model
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from reactivex.disposable import Disposable

from dimos.agents.system_prompt import SYSTEM_PROMPT
from dimos.core.core import rpc
Expand Down Expand Up @@ -60,8 +61,8 @@ def __init__(self, **kwargs: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._disposables.add(self.color_image.subscribe(self._on_image)) # type: ignore[arg-type]
self._disposables.add(self.query_stream.subscribe(self._on_query)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.color_image.subscribe(self._on_image)))
self.register_disposable(Disposable(self.query_stream.subscribe(self._on_query)))

@rpc
def stop(self) -> None:
Expand Down
5 changes: 3 additions & 2 deletions dimos/agents/vlm_stream_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import time

from langchain_core.messages import AIMessage, HumanMessage
from reactivex.disposable import Disposable

from dimos.core.core import rpc
from dimos.core.module import Module
Expand Down Expand Up @@ -62,8 +63,8 @@ def __init__( # type: ignore[no-untyped-def]
@rpc
def start(self) -> None:
super().start()
self._disposables.add(self.color_image.subscribe(self._on_image)) # type: ignore[arg-type]
self._disposables.add(self.answer_stream.subscribe(self._on_answer)) # type: ignore[arg-type]
self.register_disposable(Disposable(self.color_image.subscribe(self._on_image)))
self.register_disposable(Disposable(self.answer_stream.subscribe(self._on_answer)))
self._worker = threading.Thread(target=self._run_queries, daemon=True)
self._worker.start()

Expand Down
4 changes: 2 additions & 2 deletions dimos/agents/web_human_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ def start(self) -> None:
# Subscribe to both text input sources
# 1. Direct text from web interface
unsub = self._web_interface.query_stream.subscribe(self._human_transport.publish)
self._disposables.add(unsub)
self.register_disposable(unsub)

# 2. Transcribed text from STT
unsub = stt_node.emit_text().subscribe(self._human_transport.publish)
self._disposables.add(unsub)
self.register_disposable(unsub)

self._thread = Thread(target=self._web_interface.run, daemon=True)
self._thread.start()
Expand Down
21 changes: 8 additions & 13 deletions dimos/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@
)

from langchain_core.tools import tool
from reactivex.disposable import CompositeDisposable

from dimos.core.core import T, rpc
from dimos.core.global_config import GlobalConfig, global_config
from dimos.core.introspection.module.info import extract_module_info
from dimos.core.introspection.module.render import render_module_io
from dimos.core.resource import Resource
from dimos.core.resource import CompositeResource
from dimos.core.rpc_client import RpcCall
from dimos.core.stream import In, Out, RemoteOut, Transport
from dimos.protocol.rpc.pubsubrpc import LCMRPC
Expand Down Expand Up @@ -92,15 +91,14 @@ class _BlueprintPartial(Protocol):
def __call__(self, **kwargs: Any) -> "Blueprint": ...


class ModuleBase(Configurable[ModuleConfigT], Resource):
class ModuleBase(Configurable[ModuleConfigT], CompositeResource):
# This won't type check against the TypeVar, but we need it as the default.
default_config: type[ModuleConfigT] = ModuleConfig # type: ignore[assignment]

_rpc: RPCSpec | None = None
_tf: TFSpec[Any] | None = None
_loop: asyncio.AbstractEventLoop | None = None
_loop_thread: threading.Thread | None
_disposables: CompositeDisposable
_bound_rpc_calls: dict[str, RpcCall] = {}
_module_closed: bool = False
_module_closed_lock: threading.Lock
Expand All @@ -111,7 +109,6 @@ def __init__(self, config_args: dict[str, Any]):
super().__init__(**config_args)
self._module_closed_lock = threading.Lock()
self._loop, self._loop_thread = get_loop()
self._disposables = CompositeDisposable()
try:
self.rpc = self.config.rpc_transport()
self.rpc.serve_module_rpc(self)
Expand All @@ -132,6 +129,7 @@ def start(self) -> None:

@rpc
def stop(self) -> None:
super().stop()
self._close_module()

def _close_module(self) -> None:
Expand All @@ -158,14 +156,12 @@ def _close_module(self) -> None:
if hasattr(self, "_tf") and self._tf is not None:
self._tf.stop()
self._tf = None
if hasattr(self, "_disposables"):
self._disposables.dispose()

# Break the In/Out -> owner -> self reference cycle so the instance
# can be freed by refcount instead of waiting for GC.
for attr in list(vars(self).values()):
if isinstance(attr, (In, Out)):
attr.owner = None
# Stop transports and break the In/Out -> owner -> self reference
# cycle so the instance can be freed by refcount instead of waiting for GC.
for attr in [*self.inputs.values(), *self.outputs.values()]:
attr.stop()
attr.owner = None

def _close_rpc(self) -> None:
if self.rpc:
Expand All @@ -188,7 +184,6 @@ def __setstate__(self, state) -> None: # type: ignore[no-untyped-def]
"""Restore object from pickled state."""
self.__dict__.update(state)
# Reinitialize runtime attributes
self._disposables = CompositeDisposable()
self._module_closed_lock = threading.Lock()
self._loop = None
self._loop_thread = None
Expand Down
25 changes: 13 additions & 12 deletions dimos/core/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from abc import abstractmethod
import sys
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypeVar

if sys.version_info >= (3, 11):
from typing import Self
Expand All @@ -29,6 +29,8 @@
from reactivex.abc import DisposableBase
from reactivex.disposable import CompositeDisposable

D = TypeVar("D", bound=DisposableBase)


class Resource(DisposableBase):
@abstractmethod
Expand Down Expand Up @@ -75,18 +77,17 @@ def __exit__(
class CompositeResource(Resource):
"""Resource that owns child disposables, disposed on stop()."""

_disposables: CompositeDisposable

def __init__(self) -> None:
self._disposables = CompositeDisposable()
_disposables: CompositeDisposable | None = None

def register_disposables(self, *disposables: DisposableBase) -> None:
"""Register child disposables to be disposed when this resource stops."""
for d in disposables:
self._disposables.add(d)
def register_disposable(self, disposable: D) -> D:
"""Register a child disposable to be disposed when this resource stops."""
if self._disposables is None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous version was better. self._disposables should be set in __init__.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a bunch of classes to this (all modules) so I don't want to create overhead of creating a compositedisposable if not used

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I actually didn't realize this before, but ModuleBase already has self._disposables. This is why multiple inheritance is confusing. disposables are created in ModuleBase, but also in CompositeResource.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha ok, can I remove from modules? on-demand CompositeResource is so convinient for Modules

self._disposables = CompositeDisposable()
self._disposables.add(disposable)
return disposable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this return the disposable?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convinience API, you often create an object, and want to register as disposable

bla = self.register_disposable(new Bla())

Copy link
Copy Markdown
Member

@jeff-hykin jeff-hykin Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda like the convenience of Thread().start() some might say 🤔


def start(self) -> None:
pass
def start(self) -> None: ...

def stop(self) -> None:
self._disposables.dispose()
if self._disposables is not None:
self._disposables.dispose()
4 changes: 4 additions & 0 deletions dimos/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ def __str__(self) -> str:
+ ("" if not self._transport else " via " + str(self._transport))
)

def stop(self) -> None:
if self._transport is not None:
self._transport.stop()


class Out(Stream[T], ObservableMixin[T]):
_transport: Transport # type: ignore[type-arg]
Expand Down
4 changes: 2 additions & 2 deletions dimos/core/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _odom(msg) -> None:
self.mov.publish(msg.position)

unsub = self.odometry.subscribe(_odom)
self._disposables.add(Disposable(unsub))
self.register_disposable(Disposable(unsub))

def _lidar(msg) -> None:
self.lidar_msg_count += 1
Expand All @@ -57,7 +57,7 @@ def _lidar(msg) -> None:
print("RCV: unknown time", msg)

unsub = self.lidar.subscribe(_lidar)
self._disposables.add(Disposable(unsub))
self.register_disposable(Disposable(unsub))


def test_classmethods() -> None:
Expand Down
4 changes: 2 additions & 2 deletions dimos/hardware/sensors/camera/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ def on_image(image: Image) -> None:
self.color_image.publish(image)
self._latest_image = image

self._disposables.add(
self.register_disposable(
stream.subscribe(on_image),
)

self._disposables.add(
self.register_disposable(
rx.interval(1.0).subscribe(lambda _: self.publish_metadata()),
)

Expand Down
4 changes: 2 additions & 2 deletions dimos/hardware/sensors/camera/realsense/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ def start(self) -> None:

if self.config.enable_pointcloud and self.config.enable_depth:
interval_sec = 1.0 / self.config.pointcloud_fps
self._disposables.add(
self.register_disposable(
backpressure(rx.interval(interval_sec)).subscribe(
on_next=lambda _: self._generate_pointcloud(),
on_error=lambda e: print(f"Pointcloud error: {e}"),
)
)

interval_sec = 1.0 / self.config.camera_info_fps
self._disposables.add(
self.register_disposable(
rx.interval(interval_sec).subscribe(
on_next=lambda _: self._publish_camera_info(),
on_error=lambda e: print(f"CameraInfo error: {e}"),
Expand Down
4 changes: 2 additions & 2 deletions dimos/hardware/sensors/camera/zed/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def start(self) -> None:
self._enable_tracking()

interval_sec = 1.0 / self.config.camera_info_fps
self._disposables.add(
self.register_disposable(
rx.interval(interval_sec).subscribe(
on_next=lambda _: self._publish_camera_info(),
on_error=lambda e: print(f"CameraInfo error: {e}"),
Expand All @@ -193,7 +193,7 @@ def start(self) -> None:

if self.config.enable_pointcloud and self.config.enable_depth:
interval_sec = 1.0 / self.config.pointcloud_fps
self._disposables.add(
self.register_disposable(
backpressure(rx.interval(interval_sec)).subscribe(
on_next=lambda _: self._generate_pointcloud(),
on_error=lambda e: print(f"Pointcloud error: {e}"),
Expand Down
Loading
Loading