Conversation
- SqliteVectorStore: only catch "no such table" OperationalError, re-raise others - test_visualizer: fix det.ts → obs.ts, add @pytest.mark.tool, remove double teardown and duplicate assertion - conftest: remove unreachable duplicate return
…ypes - EmbeddingModel/CLIPModel: @overload so single-arg returns Embedding, multi-arg returns list - conftest: cast getfixturevalue returns to correct Store/BlobStore types
- MobileCLIPModel, TorchReIDModel: add matching overloads for embed/embed_text - Remove now-redundant cast in memory/embedding.py
…n guard, strict zip - Delete dimos/memory/type.py (license header only, no code) - Add validate_identifier() to search() and delete() in SqliteVectorStore - Change zip(strict=False) to zip(strict=True) in Batch transformer
# Conflicts: # dimos/memory2/test_visualizer.py # dimos/memory2/transform.py
There was a problem hiding this comment.
Pull request overview
This PR extends the memory2 streaming architecture with (1) a StreamModule base that wires In→memory2.Stream pipelines→Out, (2) support for unbound stream pipelines via Stream() + .chain(), and (3) voxel-map accumulation refactored into a reusable VoxelGrid + VoxelMap transformer, alongside typing improvements and new tool-only replay/visualization tests.
Changes:
- Add unbound
memory2.Streampipelines (Stream()),.chain()to bind them to real streams, and.publish()to driveOutports. - Introduce
StreamModule(defaulting to a live-onlyNullStore) and update voxel mapping to a framework-agnosticVoxelGrid+VoxelMaptransformer. - Add embedding API overloads + new MemoryStore retention controls (
max_size,NullStore), and add tool-marked replay DB tests (with LFS artifact).
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| dimos/simulation/unity/test_unity_sim.py | Add stop() to test transport mock to support new module shutdown behavior. |
| dimos/robot/all_blueprints.py | Register new "stream-module" module entry. |
| dimos/models/embedding/treid.py | Add overloads for embed/embed_text return typing. |
| dimos/models/embedding/mobileclip.py | Add overloads for embed/embed_text return typing. |
| dimos/models/embedding/clip.py | Add __future__ annotations + overloads for embed APIs. |
| dimos/models/embedding/base.py | Define overloads on the abstract EmbeddingModel interface. |
| dimos/memory2/vectorstore/sqlite.py | Validate identifiers; handle missing vec tables by returning empty results. |
| dimos/memory2/transform.py | Add Batch transformer and stride() utility. |
| dimos/memory2/test_voxel_map.py | New tool-marked tests for voxel-map accumulation + replay integration. |
| dimos/memory2/test_visualizer.py | New tool-marked “visualizer” workflow tests over replay DB. |
| dimos/memory2/test_save.py | Update expected error message substring for .save() restrictions. |
| dimos/memory2/test_module.py | New tests for unbound streams, .chain(), StreamModule, and NullStore behavior. |
| dimos/memory2/test_e2e.py | Add tool-marked embed-and-search workflow using CLIP + EmbedImages. |
| dimos/memory2/stream.py | Add unbound streams, .chain(), .publish(), improved errors, and stricter live/save/append guards. |
| dimos/memory2/store/null.py | Add NullStore (live-only, max_size=0) convenience store. |
| dimos/memory2/store/memory.py | Implement MemoryStoreConfig.max_size and wire it into backend creation. |
| dimos/memory2/store/README.md | Update docs to reflect Store/Backend/ObservationStore roles and new layout. |
| dimos/memory2/observationstore/memory.py | Add max_size retention via deque(maxlen=...). |
| dimos/memory2/module.py | Add StreamModule that bridges Module ports to memory2 streams. |
| dimos/memory2/conftest.py | Add CLIP fixture for tool tests and tighten fixture typing via cast(). |
| dimos/memory/embedding.py | Remove now-unneeded cast() thanks to embed() overload typing. |
| dimos/mapping/voxels.py | Refactor voxel mapping into VoxelGrid + VoxelMapTransformer; reintroduce mapper as StreamModule. |
| dimos/mapping/test_voxels.py | Update tests to use VoxelGrid directly and dispose resources explicitly. |
| dimos/mapping/pointclouds/test_occupancy_speed.py | Switch benchmark to use VoxelGrid instead of the old mapper module. |
| dimos/hardware/sensors/lidar/fastlio2/fastlio_blueprints.py | Update blueprint wiring to new VoxelGridMapper config (no publish_interval). |
| dimos/core/stream.py | Add Stream.stop() to stop transports on module shutdown. |
| dimos/core/module.py | Stop transports for all In/Out ports during module close to avoid leaks/cycles. |
| data/.lfs/go2_bigoffice.db.tar.gz | Add LFS pointer for replay DB artifact used by tool tests. |
Comments suppressed due to low confidence (2)
dimos/memory2/transform.py:91
- Batch.init should validate batch_size > 0 (raise ValueError) to avoid surprising behavior (e.g., batch_size=0 causes every element to be flushed immediately, negative values behave similarly).
def __init__(self, fn: Callable[[list[T]], Sequence[R]], batch_size: int = 16) -> None:
self._fn = fn
self._batch_size = batch_size
dimos/memory2/test_visualizer.py:55
- There are several spelling mistakes in these comments (e.g., "dissaper" -> "disappear"). Please fix to keep docs/comments readable.
# these need to be functions that are easily called to render latest query results in different ways
# I can imagine querying for multiple things and I want previous visualization to dissaper, being replaced
# by latest results. we might do this transparently so agent queries are also visible in real time
#
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| stream: Stream[Any] = store.stream(in_name, inp_port.type) | ||
|
|
||
| # we push input into the stream | ||
| inp_port.subscribe(lambda msg: stream.append(msg)) |
There was a problem hiding this comment.
You're not keeping track of the subscription.
There was a problem hiding this comment.
I am very smart and shutdown of module now shuts down transports which unsubscribes these things
I think this manual resource handling is annoying and fragile, stopping an owner object should propagade down the tree
| # we push input into the stream | ||
| inp_port.subscribe(lambda msg: stream.append(msg)) | ||
|
|
||
| live = stream.live() |
There was a problem hiding this comment.
stream.live() creates a new Stream. It also needs to be closed.
There was a problem hiding this comment.
same as above, shutdown of store propagades to it's streams
store = self.register_disposable(NullStore())
…ables.add() ModuleBase now delegates disposable lifecycle entirely to CompositeResource instead of managing its own CompositeDisposable. All 39 module files migrated from self._disposables.add() to self.register_disposable(), which handles lazy initialization. Removed the _add_disposable helper from drone module.
Fix bugs: stray self param in test_build_global_map, duplicate assertion in test_e2e, exclude StreamModule base class from module registry. Test quality: move inline imports to file top, add try/finally cleanup, use context managers for resource lifecycle, fix conditional test logic. Code quality: simplify redundant maxlen conditional, make VoxelGrid config fields private, use Optional typing to remove type ignores, accept Path objects in SqliteStore. Add docs/agents/testing.md with guidelines for writing tests.
Bump CallbackCollector default timeout from 2s to 5s to prevent flaky failures under load. Fix drone test mocks returning plain lambdas instead of disposable objects from subscribe().
Tests that stream.stop() disposes its backend, cascades to components, and that delete_stream() is the proper cleanup API.
register_disposable() expects DisposableBase, but Module port .subscribe() returns a plain callable. Wrap with Disposable() at call sites to fix all 11 mypy type-var errors.
No description provided.