Threading Utils (and fix for Native Module flakey test)#1663
Threading Utils (and fix for Native Module flakey test)#1663jeff-hykin wants to merge 22 commits intodevfrom
Conversation
Greptile SummaryThis PR replaces ad-hoc threading and event-loop management spread across Confidence Score: 4/5Core threading logic is solid and well-tested; one new integration test has a definite bug that will fail on CI, plus two minor defensive-programming gaps — none affect the production code path The fundamental fix (RLock, auto-cleanup disposables) is correct and thoroughly tested. The three issues found are: one failing test (P1, easy one-liner fix), one misleading type annotation (P2), and one unhelpful error message path for a rare deserialization edge case (P2). None of these affect the production start()/stop() lifecycle or the actual threading correctness. Prior review concerns are either resolved or explicitly addressed by the developer. Score 4 rather than 5 because the new integration test will fail as written. dimos/agents/mcp/test_mcp_server.py (test_mcp_server_lifecycle will crash before the retry loop), dimos/utils/thread_utils.py (AsyncModuleThread.loop type annotation) Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant ModuleBase
participant AsyncModuleThread
participant ModuleProcess
participant CompositeDisposable
Caller->>ModuleBase: __init__()
ModuleBase->>CompositeDisposable: create _disposables
ModuleBase->>AsyncModuleThread: AsyncModuleThread(module=self)
AsyncModuleThread->>CompositeDisposable: add(Disposable(self.stop))
note over AsyncModuleThread: _loop = None
Caller->>ModuleBase: start()
ModuleBase->>ModuleBase: mod_state → "started"
ModuleBase->>AsyncModuleThread: start()
AsyncModuleThread->>AsyncModuleThread: create event loop + daemon thread
note over AsyncModuleThread: _loop = running loop
Caller->>ModuleBase: (NativeModule) start()
ModuleBase->>ModuleProcess: ModuleProcess(module=self, ...)
ModuleProcess->>CompositeDisposable: add(Disposable(self.stop))
ModuleProcess->>ModuleProcess: start() → Popen + watchdog ModuleThread
Caller->>ModuleBase: stop()
ModuleBase->>ModuleBase: mod_state → "stopped"
ModuleBase->>CompositeDisposable: dispose()
CompositeDisposable->>AsyncModuleThread: stop() → loop.stop() + join
CompositeDisposable->>ModuleProcess: stop() → SIGTERM/SIGKILL + join
ModuleBase->>ModuleBase: rpc.stop(), _tf.stop()
|
| assert done.wait(timeout=10), "Deadlock with slow ModuleThread.stop()" | ||
|
|
||
|
|
||
| from dimos.utils.typing_utils import ExceptionGroup |
There was a problem hiding this comment.
ExceptionGroup imported at bottom of file, used earlier
ExceptionGroup is imported on line 888 but first used on line 750 inside TestSafeThreadMap methods. This works at runtime because the full module is loaded before any test runs, but it's confusing to readers: the symbol appears to be undefined at its use sites, and any linter or static analysis tool will flag these as NameErrors. The import should be moved to the top-level imports block alongside the other third-party imports.
| from dimos.utils.typing_utils import ExceptionGroup | |
| from dimos.utils.typing_utils import ExceptionGroup |
(Move this to the top of the file alongside the other dimos.utils imports, and remove line 888.)
| """ | ||
|
|
||
| @staticmethod | ||
| def _make_fake_stop(mod: FakeModule, done: threading.Event) -> Callable: |
There was a problem hiding this comment.
Missing
Callable import used in return-type annotation
Callable is referenced as a return-type annotation in _make_fake_stop but is never imported in this file. With from __future__ import annotations in effect, the annotation is stored as a string at definition time and won't raise a NameError at runtime. However, any call to typing.get_type_hints(_make_fake_stop) — including some test introspection tools — will fail with NameError: name 'Callable' is not defined.
Add to the imports at the top of the file:
from collections.abc import Callable| self._watchdog = ModuleThread( | ||
| module=self._module, | ||
| target=self._watch, | ||
| name=f"proc-{self._process.pid}-watchdog", | ||
| ) |
There was a problem hiding this comment.
Each
ModuleProcess.start() call adds a new ModuleThread disposable
Every time start() is called (line 388), a new ModuleThread is constructed for the watchdog. ModuleThread.__init__ immediately registers a Disposable(self.stop) in module._disposables (line 155). CompositeDisposable simply appends, so restarting the process accumulates stale disposables for watchdog threads that have already exited.
For the single-use lifecycle this is fine. But if start() is ever called more than once (e.g. after a failed first attempt, or the deferred-start path), the module's disposable list grows unboundedly, and on teardown each old watchdog's stop() is called even though it already finished, which — while idempotent — is surprising and hard to debug.
Consider either:
- Explicitly removing the old watchdog disposable before creating a new one, or
- Documenting clearly that
start()is a one-shot operation and raising an error on re-entry.
There was a problem hiding this comment.
super().start() will throw if its called more than once. We can/should assume start isn't being called multiple times AFAIK.
setstate getstate are different though, start could be called after setstate I believe
| with self.mod_state as state: | ||
| if state == "stopped": | ||
| raise RuntimeError(f"{type(self).__name__} cannot be restarted after stop") | ||
| self.mod_state.set("started") |
There was a problem hiding this comment.
I know lots of modules don't call super().start() but they also wouldn't be using mod_state cause its a new thing.
Different/off-topic discussion, but I think core2 should have ModuleBase as class decorator instead of an inherited class (we can basically wrap methods instead of saying "please remember to call super").
There was a problem hiding this comment.
That's one of the reasons I don't like inheritance. But can you explain what you mean by ModuleBase being a decorator? At first glace that seems more complicated.
| loop = getattr(self, "_loop", None) | ||
| # dispose of things BEFORE making aspects like rpc and _tf invalid | ||
| if hasattr(self, "_disposables"): | ||
| self._disposables.dispose() # stops _async_thread via disposable |
There was a problem hiding this comment.
I think its important to move disposables up before the rpc stop and the tf stop
| if self._uvicorn_server: | ||
| self._uvicorn_server.should_exit = True | ||
| loop = self._loop | ||
| if loop is not None and self._serve_future is not None: |
There was a problem hiding this comment.
the loop is always there until super().stop() is called
| server = uvicorn.Server(config) | ||
| self._uvicorn_server = server | ||
| loop = self._loop | ||
| assert loop is not None |
There was a problem hiding this comment.
loop always there until stop is called
| return s.getsockname()[1] | ||
|
|
||
|
|
||
| def test_mcp_server_lifecycle() -> None: |
dimos/core/test_core.py
Outdated
| assert hasattr(class_rpcs["start"], "__rpc__"), "start should have __rpc__ attribute" | ||
|
|
||
| nav._close_module() | ||
| nav._stop() |
There was a problem hiding this comment.
I'm trying to consolidate our naming to be "stop" instead of half "stop" half "close"
| # ThreadSafeVal: a lock-protected value with context-manager support | ||
|
|
||
|
|
||
| class ThreadSafeVal(Generic[T]): |
There was a problem hiding this comment.
this is my favorite util. I hate having _thing and _thing_lock and _thing2 and _thing2_lock, but I also hate seeing _thing being used in a method and thinking "hmm ... does _thing have a lock thats not being used?". This prevents ambiguity about what vals need locks and what vals don't
| self._thread.start() | ||
|
|
||
| def stop(self) -> None: | ||
| """Signal the thread to stop and join it. |
There was a problem hiding this comment.
this is probably the part that needs the most review
| # safe_thread_map: parallel map that collects all results before raising | ||
|
|
||
|
|
||
| def safe_thread_map( |
There was a problem hiding this comment.
Not used in this PR, but is used by the docker branch so getting it in here a bit early cause this is the util file it belongs in
|
|
||
| if sys.version_info < (3, 11): | ||
|
|
||
| class ExceptionGroup(Exception): # type: ignore[no-redef] # noqa: N818 |
There was a problem hiding this comment.
I didn't want to repeat all this cludge so I put it here. Let me know if there's a better spot
| if self._thread.is_alive() and self._thread is not threading.current_thread(): | ||
| self._thread.join(timeout=self._close_timeout) | ||
|
|
||
| def join(self, timeout: float | None = None) -> None: |
There was a problem hiding this comment.
I don't think you need join since you're already join()-ing in stop.
dimos/utils/thread_utils.py
Outdated
| self._stopped = False | ||
| self._stop_lock = threading.Lock() |
There was a problem hiding this comment.
Why do you need _stopped and _stop_lock? You have _stop_event.
dimos/utils/thread_utils.py
Outdated
|
|
||
| def start(self) -> None: | ||
| """Start the underlying thread.""" | ||
| self._stop_event.clear() |
There was a problem hiding this comment.
You don't need this. It's already off. If you want ModuleThread to be restartable, then you need to use another thread since threads aren't restartable.
dimos/utils/thread_utils.py
Outdated
| if start: | ||
| self.start() |
There was a problem hiding this comment.
Noooooo, don't autostart in the constructor. 😭
There was a problem hiding this comment.
😈 no boilerplate
But fr, how do you feel about ModuleThread().start()
| self._worker = ModuleThread( | ||
| module=self, | ||
| target=self._run_loop, | ||
| name="my-worker", |
There was a problem hiding this comment.
It would be nice if ModuleThread used self.module.__class__.__name__ as the prefix so we can just leave name blank most of the time and it still produces a useful name for debugging.
dimos/utils/thread_utils.py
Outdated
| return f"ThreadSafeVal({self._value!r})" | ||
|
|
||
|
|
||
| # ModuleThread: a thread that auto-registers with a module's disposables |
There was a problem hiding this comment.
Why add this if there's a docstring below?
There was a problem hiding this comment.
cause AI loves redundancy
(I'll remove it, thanks for bringing attention)
dimos/core/module.py
Outdated
| def _close_module(self) -> None: | ||
| with self._module_closed_lock: | ||
| if self._module_closed: | ||
| def _stop(self) -> None: |
There was a problem hiding this comment.
_close_module is a remnant from the the Module class hierarchy was more complicated. Some classes were skipping Module.__init__ and didn't initialize self._disposables for example. That's why I'm using hasattr(self, "_disposables") or hasattr(self, "_tf"). We didn't even have stop then.
I think it's not needed at all anymore. This could be deleted if you want and moved into def stop.
There was a problem hiding this comment.
happily! I though it was a rpc vs non-rpc thing
| self._worker = ModuleThread( | ||
| module=self, | ||
| target=self._run_loop, | ||
| name="my-worker", |
There was a problem hiding this comment.
| name="my-worker", | |
| name=self.module.__class__.__name__+"_my_worker", |
- Add mod.stop() to test_process_crash_triggers_stop so watchdog, LCM, and event-loop threads are properly joined from the test thread - Filter third-party daemon threads with generic names (Thread-\d+) in conftest monitor_threads to ignore torch/HF background threads that have no cleanup API
Convert test_process_crash_triggers_stop to use a fixture that calls mod.stop() in teardown. The watchdog thread calls self.stop() but can't join itself, so an explicit stop() from the test thread is needed to properly clean up all threads. Drop the broad conftest regex filter for generic daemon thread names per review feedback.
mod.stop() is a no-op when the watchdog already called it, so capture thread IDs before the test and join new ones in teardown.
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
- Merge _stop() into stop() in ModuleBase (removes unnecessary indirection) - Update all callers of _stop() to use stop() directly - Add thread_start() convenience function that creates + starts a ModuleThread
AsyncModuleThread no longer spawns the event loop thread in __init__. The loop is created on the first call to start(), which ModuleBase.start() now calls. This means module construction no longer has side effects — no threads are spawned until the module is explicitly started.
1b4450c to
4240573
Compare
|
|
||
| @property | ||
| def loop(self) -> asyncio.AbstractEventLoop: |
There was a problem hiding this comment.
loop property typed as non-optional but can return None
self._loop is initialised to None in __init__ and is only set to a real AbstractEventLoop inside start(). The property is typed as asyncio.AbstractEventLoop (non-optional), but any caller that accesses .loop before start() has been called will receive None and get a runtime AttributeError rather than a clear error about the missing initialisation.
Consider raising explicitly:
| @property | |
| def loop(self) -> asyncio.AbstractEventLoop: | |
| @property | |
| def loop(self) -> asyncio.AbstractEventLoop: | |
| """The managed event loop.""" | |
| if self._loop is None: | |
| raise RuntimeError( | |
| f"{self._module_name} async thread has not been started; call start() first" | |
| ) | |
| return self._loop |
This would have immediately surfaced the test_mcp_server_lifecycle issue above instead of propagating a confusing AttributeError from deep inside asyncio.
| assert response["error"]["code"] == -32601 | ||
|
|
||
|
|
||
| def _free_port() -> int: |
There was a problem hiding this comment.
This already exists as _find_free_port in the codebase. I've actually converted it to a fixture (find_free_port) in my PR but unlikely to be merged soon.
| for _ in range(40): | ||
| try: | ||
| resp = requests.post( | ||
| url, | ||
| json={"jsonrpc": "2.0", "method": "initialize", "id": 1}, | ||
| timeout=0.5, | ||
| ) | ||
| if resp.status_code == 200: | ||
| break | ||
| except requests.ConnectionError: | ||
| time.sleep(0.1) |
There was a problem hiding this comment.
Stash already has 3 instances of this exact sequance (as functions called wait_for_mcp). Please don't add a 4th that is inlined.
| assert data["result"]["serverInfo"]["name"] == "dimensional" | ||
|
|
||
| # Stop and verify it shuts down | ||
| server.stop() |
There was a problem hiding this comment.
The server should be stopped even if the test fails. The best way to do it is to use a fixture for the mcp server.
| loop_thread = getattr(self, "_loop_thread", None) | ||
| loop = getattr(self, "_loop", None) | ||
| # dispose of things BEFORE making aspects like rpc and _tf invalid | ||
| if hasattr(self, "_disposables"): |
There was a problem hiding this comment.
You'll have lots of conflicts here with what Ivan is doing for disposables in his PR: #1682 .
| self.rpc.stop() # type: ignore[attr-defined] | ||
| self.rpc = None # type: ignore[assignment] |
There was a problem hiding this comment.
Please avoid type: ignore
| _bound_rpc_calls: dict[str, RpcCall] = {} | ||
| _module_closed: bool = False | ||
| _module_closed_lock: threading.Lock | ||
| mod_state: ThreadSafeVal[ModState] |
There was a problem hiding this comment.
I don't like mod for module. It's not much of an abbreviation (it makes more sense for modification).
| @@ -0,0 +1,559 @@ | |||
| # Copyright 2025-2026 Dimensional Inc. | |||
There was a problem hiding this comment.
I think these are separate enough that they could be their own files. It's often said that "utils" is a dumping ground for any odd code, but this could be put into a dimos/core/threading/ directory.
| if self._owns_loop and self._loop is not None and self._loop.is_running(): | ||
| self._loop.call_soon_threadsafe(self._loop.stop) | ||
|
|
||
| if self._thread is not None and self._thread.is_alive(): |
There was a problem hiding this comment.
It's good to also check that this call isn't from the current thread.
| close_timeout: float = 2.0, | ||
| ) -> None: | ||
| self._close_timeout = close_timeout | ||
| self._stopped = ThreadSafeVal(False) |
There was a problem hiding this comment.
This is what Event is for.
| self._loop: asyncio.AbstractEventLoop | None = None | ||
| self._module_name = type(module).__name__ | ||
|
|
||
| module._disposables.add(Disposable(self.stop)) |
There was a problem hiding this comment.
ModuleBase has an AsyncModuleThread. It's rather odd for AsyncModuleThread to tell its owner what to do.
AsyncModuleThread shoud not take ModuleBase in its __init__. ModuleBase should register the shutdown of AsyncModuleThread
| @property | ||
| def loop(self) -> asyncio.AbstractEventLoop: | ||
| """The managed event loop.""" | ||
| return self._loop |
There was a problem hiding this comment.
I'm surprised mypy doesn't complain that you're returning None when only asyncio.AbstractEventLoop is allowed. Greptile is right about the comment above.
| self._process: subprocess.Popen[bytes] | None = None | ||
| self._watchdog: ModuleThread | None = None | ||
| self._module = module | ||
| self._stopped = ThreadSafeVal(False) |
| self.last_stdout: collections.deque[str] = collections.deque(maxlen=log_tail_lines) | ||
| self.last_stderr: collections.deque[str] = collections.deque(maxlen=log_tail_lines) | ||
|
|
||
| module._disposables.add(Disposable(self.stop)) |
There was a problem hiding this comment.
Module should add ModuleProcess.stop to its disposables. ModuleProcess should not help itself to Module's private data like _disposables.
| return on_errors(zipped, successes, errors) # type: ignore[return-value, no-any-return] | ||
| raise ExceptionGroup("safe_thread_map failed", errors) | ||
|
|
||
| return [outcomes[i] for i in range(len(items))] # type: ignore[misc] |
There was a problem hiding this comment.
Please don't ignore.
Problem
Flakey test, and a history of Flakey tests surrounding threads in modules.
Solution
Smart thread tooling with auto-cleanup to reduce the risk of bad cleanup and reduce bloat.
Breaking Changes
None, did more testing than usual because this touches core.
How to Test
Contributor License Agreement