Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dimos/agents_deprecated/memory/image_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, model_name: str = "clip", dimensions: int = 512) -> None:
def _initialize_model(self): # type: ignore[no-untyped-def]
"""Initialize the specified embedding model."""
try:
import onnxruntime as ort # type: ignore[import-untyped]
import onnxruntime as ort # type: ignore[import-untyped,import-not-found]
import torch # noqa: F401
from transformers import ( # type: ignore[import-untyped]
AutoFeatureExtractor,
Expand Down
41 changes: 36 additions & 5 deletions dimos/core/native_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class MyCppModule(NativeModule):

from dimos.core.core import rpc
from dimos.core.module import Module, ModuleConfig
from dimos.utils.change_detect import PathEntry, did_change
from dimos.utils.logging_config import setup_logger

if sys.version_info < (3, 13):
Expand All @@ -80,9 +81,10 @@ class NativeModuleConfig(ModuleConfig):
extra_env: dict[str, str] = Field(default_factory=dict)
shutdown_timeout: float = 10.0
log_format: LogFormat = LogFormat.TEXT
rebuild_on_change: list[PathEntry] | None = None

# Override in subclasses to exclude fields from CLI arg generation
cli_exclude: frozenset[str] = frozenset()
cli_exclude: frozenset[str] = frozenset({"rebuild_on_change"})

def to_cli_args(self) -> list[str]:
"""Auto-convert subclass config fields to CLI args.
Expand Down Expand Up @@ -186,8 +188,11 @@ def stop(self) -> None:
if self._watchdog is not None and self._watchdog is not threading.current_thread():
self._watchdog.join(timeout=2)
self._watchdog = None
self._process = None
# Clean up the asyncio loop thread (from ModuleBase) BEFORE
# clearing _process — tests use _process=None as their exit
# signal, and the loop thread must be joined first.
super().stop()
self._process = None

def _watch_process(self) -> None:
"""Block until the native process exits; trigger stop() if it crashed."""
Expand Down Expand Up @@ -243,18 +248,39 @@ def _resolve_paths(self) -> None:
if not Path(self.config.executable).is_absolute() and self.config.cwd is not None:
self.config.executable = str(Path(self.config.cwd) / self.config.executable)

def _build_cache_name(self) -> str:
"""Return a stable, unique cache name for this module's build state."""
source_file = Path(inspect.getfile(type(self))).resolve()
return f"native_{source_file}:{type(self).__qualname__}"

def _maybe_build(self) -> None:
"""Run ``build_command`` if the executable does not exist."""
"""Run ``build_command`` if the executable does not exist or sources changed."""
exe = Path(self.config.executable)
if exe.exists():

# Check if rebuild needed due to source changes
needs_rebuild = False
if self.config.rebuild_on_change and exe.exists():
if did_change(
self._build_cache_name(), self.config.rebuild_on_change, cwd=self.config.cwd
):
logger.info("Source files changed, triggering rebuild", executable=str(exe))
needs_rebuild = True
Comment on lines +262 to +267
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Cache written before build completes — failed rebuild silently blocks all future rebuilds

did_change() always writes the new hash to the cache file as a side effect before returning. So when it returns True here (change detected, rebuild needed), the cache is already updated to the new hash. If the build subsequently fails (e.g. RuntimeError is raised at line 288), the exception propagates up and the cache is left pointing at the new file state.

On the very next call to _maybe_build():

  1. exe.exists()True (old binary is still there)
  2. did_change(...)False (cache already reflects the new file state)
  3. needs_rebuild = False → the method returns early

The module silently continues running the stale executable and will never retry the build, even if you fix the build error and call start() again.

The root fix is to not update the cache during the "check" phase, and only commit the hash after a successful build. One approach is to split did_change into a read-only has_changed check and a separate update_cache write, calling them independently:

# Check-only (read the cache, do not write it)
if self.config.rebuild_on_change and exe.exists():
    cache_name = f"native_{type(self).__name__}_build"
    if has_changed(cache_name, self.config.rebuild_on_change, cwd=self.config.cwd):
        logger.info("Source files changed, triggering rebuild", executable=str(exe))
        needs_rebuild = True

and after a successful build:

if self.config.rebuild_on_change:
    update_cache(cache_name, self.config.rebuild_on_change, cwd=self.config.cwd)


if exe.exists() and not needs_rebuild:
return

if self.config.build_command is None:
raise FileNotFoundError(
f"Executable not found: {exe}. "
"Set build_command in config to auto-build, or build it manually."
)

# Don't unlink the exe before rebuilding — the build command is
# responsible for replacing it. For nix builds the exe lives inside
# a read-only store; `nix build -o` atomically swaps the output
# symlink without touching store contents.
logger.info(
"Executable not found, running build",
"Rebuilding" if needs_rebuild else "Executable not found, building",
executable=str(exe),
build_command=self.config.build_command,
)
Expand Down Expand Up @@ -282,6 +308,11 @@ def _maybe_build(self) -> None:
f"Build command succeeded but executable still not found: {exe}"
)

# Seed the cache after a successful build so the next check has a baseline
# (needed for the initial build when the pre-build change check was skipped)
if self.config.rebuild_on_change:
did_change(self._build_cache_name(), self.config.rebuild_on_change, cwd=self.config.cwd)

def _collect_topics(self) -> dict[str, str]:
"""Extract LCM topic strings from blueprint-assigned stream transports."""
topics: dict[str, str] = {}
Expand Down
15 changes: 9 additions & 6 deletions dimos/core/test_native_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,16 @@ def test_process_crash_triggers_stop() -> None:
assert mod._process is not None
pid = mod._process.pid

# Wait for the process to die and the watchdog to call stop()
for _ in range(30):
time.sleep(0.1)
if mod._process is None:
break
try:
# Wait for the process to die and the watchdog to call stop()
for _ in range(30):
time.sleep(0.1)
if mod._process is None:
break

assert mod._process is None, f"Watchdog did not clean up after process {pid} died"
assert mod._process is None, f"Watchdog did not clean up after process {pid} died"
finally:
mod.stop()


@pytest.mark.slow
Expand Down
140 changes: 140 additions & 0 deletions dimos/core/test_native_rebuild.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for NativeModule rebuild-on-change integration."""

from __future__ import annotations

from pathlib import Path
import stat

import pytest

from dimos.core.native_module import NativeModule, NativeModuleConfig
from dimos.utils.change_detect import PathEntry


@pytest.fixture(autouse=True)
def _use_tmp_cache(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""Redirect the change-detection cache to a temp dir for every test."""
monkeypatch.setattr(
"dimos.utils.change_detect._get_cache_dir",
lambda: tmp_path / "cache",
)


@pytest.fixture()
def build_env(tmp_path: Path) -> dict[str, Path]:
"""Set up a temp directory with a source file, executable path, and marker path."""
src = tmp_path / "src"
src.mkdir()
(src / "main.c").write_text("int main() { return 0; }")

exe = tmp_path / "mybin"
marker = tmp_path / "build_ran.marker"

# Build script: create the executable and a marker file
build_script = tmp_path / "build.sh"
build_script.write_text(f"#!/bin/sh\ntouch {exe}\nchmod +x {exe}\ntouch {marker}\n")
build_script.chmod(build_script.stat().st_mode | stat.S_IEXEC)

return {"src": src, "exe": exe, "marker": marker, "build_script": build_script}


class _RebuildConfig(NativeModuleConfig):
executable: str = ""
rebuild_on_change: list[PathEntry] | None = None


class _RebuildModule(NativeModule[_RebuildConfig]):
default_config = _RebuildConfig


def _make_module(build_env: dict[str, Path]) -> _RebuildModule:
"""Create a _RebuildModule pointing at the temp build env."""
return _RebuildModule(
executable=str(build_env["exe"]),
build_command=f"sh {build_env['build_script']}",
rebuild_on_change=[str(build_env["src"])],
cwd=str(build_env["src"]),
)


def test_rebuild_on_change_triggers_build(build_env: dict[str, Path]) -> None:
"""When source files change, the build_command should re-run."""
mod = _make_module(build_env)
try:
exe = build_env["exe"]
marker = build_env["marker"]

# First build: exe doesn't exist → build runs
mod._maybe_build()
assert exe.exists()
assert marker.exists()
marker.unlink()

# No change → build should NOT run
mod._maybe_build()
assert not marker.exists()

# Modify source → build SHOULD run
(build_env["src"] / "main.c").write_text("int main() { return 1; }")
mod._maybe_build()
assert marker.exists(), "Build should have re-run after source change"
finally:
mod.stop()


def test_no_change_skips_rebuild(build_env: dict[str, Path]) -> None:
"""When sources haven't changed, build_command must not run again."""
mod = _make_module(build_env)
try:
marker = build_env["marker"]

# Initial build
mod._maybe_build()
assert marker.exists()
marker.unlink()

# Second call — nothing changed
mod._maybe_build()
assert not marker.exists(), "Build should have been skipped (no source changes)"
finally:
mod.stop()


def test_rebuild_on_change_none_skips_check(build_env: dict[str, Path]) -> None:
"""When rebuild_on_change is None, no change detection happens at all."""
exe = build_env["exe"]
marker = build_env["marker"]

mod = _RebuildModule(
executable=str(exe),
build_command=f"sh {build_env['build_script']}",
rebuild_on_change=None,
cwd=str(build_env["src"]),
)
try:
# Initial build
mod._maybe_build()
assert exe.exists()
assert marker.exists()
marker.unlink()

# Modify source — but rebuild_on_change is None, so no rebuild
(build_env["src"] / "main.c").write_text("int main() { return 1; }")
mod._maybe_build()
assert not marker.exists(), "Should not rebuild when rebuild_on_change is None"
finally:
mod.stop()
19 changes: 10 additions & 9 deletions dimos/manipulation/planning/utils/mesh_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tempfile
from typing import TYPE_CHECKING

from dimos.utils.change_detect import did_change
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
Expand Down Expand Up @@ -76,14 +77,15 @@ def prepare_urdf_for_drake(
package_paths = package_paths or {}
xacro_args = xacro_args or {}

# Generate cache key
# Generate cache key from configuration (not file content — did_change handles that)
cache_key = _generate_cache_key(urdf_path, package_paths, xacro_args, convert_meshes)
cache_path = _CACHE_DIR / cache_key / urdf_path.stem
cache_path.mkdir(parents=True, exist_ok=True)
cached_urdf = cache_path / f"{urdf_path.stem}.urdf"

# Check cache
if cached_urdf.exists():
# Check cache: reuse only if the output exists AND the source file hasn't changed
source_changed = did_change(f"urdf_{cache_key}", [str(urdf_path)])
if cached_urdf.exists() and not source_changed:
logger.debug(f"Using cached URDF: {cached_urdf}")
return str(cached_urdf)

Expand Down Expand Up @@ -118,16 +120,15 @@ def _generate_cache_key(
) -> str:
"""Generate a cache key for the URDF configuration.

Includes a version number to invalidate cache when processing logic changes.
Encodes the configuration inputs (not file content — ``did_change`` handles
content-based invalidation separately). Includes a version number to
invalidate the cache when processing logic changes.
"""
# Include file modification time
mtime = urdf_path.stat().st_mtime if urdf_path.exists() else 0

# Version number to invalidate cache when processing logic changes
# Increment this when adding new processing steps (e.g., stripping transmission blocks)
processing_version = "v2"
processing_version = "v3"

key_data = f"{processing_version}:{urdf_path}:{mtime}:{sorted(package_paths.items())}:{sorted(xacro_args.items())}:{convert_meshes}"
key_data = f"{processing_version}:{urdf_path}:{sorted(package_paths.items())}:{sorted(xacro_args.items())}:{convert_meshes}"
return hashlib.md5(key_data.encode()).hexdigest()[:16]


Expand Down
10 changes: 7 additions & 3 deletions dimos/protocol/pubsub/impl/test_lcmpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,30 @@
)
from dimos.utils.testing.collector import CallbackCollector

# Isolated multicast group so stale messages from other tests
# (which use the default 239.255.76.67:7667) don't leak in.
_ISOLATED_LCM_URL = "udpm://239.255.76.98:7698?ttl=0"


@pytest.fixture
def lcm_pub_sub_base() -> Generator[LCMPubSubBase, None, None]:
lcm = LCMPubSubBase()
lcm = LCMPubSubBase(url=_ISOLATED_LCM_URL)
lcm.start()
yield lcm
lcm.stop()


@pytest.fixture
def pickle_lcm() -> Generator[PickleLCM, None, None]:
lcm = PickleLCM()
lcm = PickleLCM(url=_ISOLATED_LCM_URL)
lcm.start()
yield lcm
lcm.stop()


@pytest.fixture
def lcm() -> Generator[LCM, None, None]:
lcm = LCM()
lcm = LCM(url=_ISOLATED_LCM_URL)
lcm.start()
yield lcm
lcm.stop()
Expand Down
12 changes: 8 additions & 4 deletions dimos/protocol/pubsub/test_pattern_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ class Case(Generic[TopicT, MsgT]):
regex_patterns: list[tuple[TopicT, set[int]]] = field(default_factory=list)


# Use an isolated multicast group to avoid cross-test LCM contamination.
_ISOLATED_LCM_URL = "udpm://239.255.76.99:7699?ttl=0"


@contextmanager
def lcm_typed_context() -> Generator[tuple[LCM, LCM], None, None]:
pub = LCM()
sub = LCM()
pub = LCM(url=_ISOLATED_LCM_URL)
sub = LCM(url=_ISOLATED_LCM_URL)
pub.start()
sub.start()
try:
Expand All @@ -67,8 +71,8 @@ def lcm_typed_context() -> Generator[tuple[LCM, LCM], None, None]:

@contextmanager
def lcm_bytes_context() -> Generator[tuple[LCMPubSubBase, LCMPubSubBase], None, None]:
pub = LCMPubSubBase()
sub = LCMPubSubBase()
pub = LCMPubSubBase(url=_ISOLATED_LCM_URL)
sub = LCMPubSubBase(url=_ISOLATED_LCM_URL)
pub.start()
sub.start()
try:
Expand Down
2 changes: 1 addition & 1 deletion dimos/simulation/mujoco/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import mujoco
import numpy as np
import onnxruntime as ort # type: ignore[import-untyped]
import onnxruntime as ort # type: ignore[import-untyped,import-not-found]

from dimos.simulation.mujoco.input_controller import InputController
from dimos.utils.logging_config import setup_logger
Expand Down
Loading
Loading