Skip to content

loky: fix ShutdownExecutorError race in concurrent submissions to reusable executor#632

Open
mvanhorn wants to merge 1 commit into
joblib:masterfrom
mvanhorn:fix/458-loky-reusable-executor-submit-shutdown-race
Open

loky: fix ShutdownExecutorError race in concurrent submissions to reusable executor#632
mvanhorn wants to merge 1 commit into
joblib:masterfrom
mvanhorn:fix/458-loky-reusable-executor-submit-shutdown-race

Conversation

@mvanhorn
Copy link
Copy Markdown

Summary

The fix has two parts. Both stay inside loky/reusable_executor.py; no changes to process_executor.py are needed.

Part 1 - hold a stable reference under the singleton lock.

get_reusable_executor (and the underlying _ReusablePoolExecutor.get_reusable_executor classmethod) returns the executor instance after releasing _executor_lock. Callers then invoke .submit() unprotected. We do not want to hold _executor_lock across user submissions (that would serialize all submits across threads), but we can guarantee that the returned executor reference is one that has not yet been shut down by ensuring two invariants:

  • Inside the lock, if the cached executor is going to be shut down because kwargs changed, the new executor is constructed before the lock is released and returned to the caller. (This is already the case via the recursive call; verify nothing escapes with a stale reference.)
  • The recursive call path that builds a new executor must not return the old executor variable still bound in the outer scope. Audit the existing branches to confirm the recursive return value is propagated correctly. The current code already does return cls.get_reusable_executor(...), so this is correct; add a comment marking the invariant.

Part 2 - make _ReusablePoolExecutor.submit shutdown-resilient.

Even with Part 1, two threads can each receive what was the live executor at lock-release time, after which Thread A's shutdown-and-replace happens between Thread B's resolve and Thread B's submit. The robust fix is to detect a shutdown executor inside _ReusablePoolExecutor.submit and retry against the current singleton:

def submit(self, fn, *args, **kwargs):
    with self._submit_resize_lock:
        if self._flags.shutdown:
            # The singleton was rotated out between resolve and submit
            # by a concurrent get_reusable_executor() call with different
            # kwargs. Re-resolve the live singleton and submit there.
            executor, _ = type(self).get_reusable_executor(**_executor_kwargs)
            return executor.submit(fn, *args, **kwargs)
        return super().submit(fn, *args, **kwargs)

Caveats to handle in the patch:

  • _executor_kwargs is module-global and may itself be racing. Snapshot it under _executor_lock at the top of the retry path.
  • The retry must terminate. Bound to one retry; if the live executor is also shutdown (e.g. interpreter shutdown via _global_shutdown), let the underlying ShutdownExecutorError propagate so behavior at interpreter exit is unchanged.
  • The retry must not loop forever if _executor_kwargs is None (post-shutdown). In that case, raise the original error.

Part 3 - regression test.

Add test_reusable_executor_submit_during_shutdown_race to tests/test_reusable_executor.py. Port the reproducer from the issue, parameterize the outer iteration count to keep CI runtime under a few seconds, and assert no future raises ShutdownExecutorError. To make the test deterministic on CI, drive the race directly: use a threading.Barrier to synchronize a "submit from thread" call with a "get_reusable_executor with different env from main thread" call so the shutdown-and-replace lands between resolve and submit. The test fails on master and passes after the fix.

PR title: FIX avoid ShutdownExecutorError when reusable executor is rotated concurrently

PR body sketch:

Closes #458.

Concurrent calls to get_reusable_executor with differing kwargs (e.g. different env per thread) can rotate the singleton executor between the time one caller resolves the singleton and the time it calls .submit(), raising ShutdownExecutorError.

This patch makes _ReusablePoolExecutor.submit detect that case and re-resolve against the current singleton once before propagating the error. A targeted regression test drives the race via a threading.Barrier so it is deterministic on CI.

Why this matters

Issue #458, filed by maintainer @ogrisel, reports that submitting to get_reusable_executor concurrently from multiple threads (e.g. via a stdlib ThreadPoolExecutor map) intermittently raises ShutdownExecutorError: cannot schedule new futures after shutdown. The supplied reproducer creates a fresh reusable executor with a different env argument from the main thread, then immediately fans out 100 submissions across 10 threads. Because each thread's env={"a": str(i)} differs from the cached executor kwargs, every other submission's call to get_reusable_executor triggers a shutdown-and-recreate of the singleton executor. The race is between:

  1. Thread A inside get_reusable_executor having just resolved the executor to the current singleton, then yielding before calling .submit().
  2. Thread B entering get_reusable_executor under _executor_lock, deciding the kwargs differ, calling executor.shutdown(wait=True), replacing _executor = None, and releasing the lock.
  3. Thread A waking up and calling .submit() on the shutdown executor, which raises ShutdownExecutorError.

The traceback confirms the failing call site is _ReusablePoolExecutor.submit -> ProcessPoolExecutor.submit -> the self._flags.shutdown branch at loky/process_executor.py:1258.

Testing

  • New test test_reusable_executor_submit_during_shutdown_race in tests/test_reusable_executor.py:

    • Uses a threading.Barrier(2) shared by a worker thread (calling executor.submit(...) on a resolved executor) and the main thread (calling get_reusable_executor(env={"a": "X"}) with kwargs differing from the resolved executor's kwargs to force shutdown-and-replace).
    • Worker thread resolves executor = get_reusable_executor(env={"a": "1"}), waits at the barrier, then calls executor.submit(lambda: 1) and asserts the result is 1.
    • Main thread resolves a different singleton with env={"a": "2"}, waits at the barrier (which forces the resolve-then-shutdown to happen just before the worker thread's .submit), and asserts no exception.
    • Without the fix, this test reliably reproduces ShutdownExecutorError (verified by reverting just the submit patch and running the test in a loop).
  • Original reproducer from the issue runs cleanly through 10 outer iterations: copy the reproducer into the test file, gated by a slow marker if needed (@pytest.mark.skipif(os.environ.get("CI")) is not appropriate since maintainers want CI to catch regressions; if runtime is a concern, scale range(10) to range(2)).

  • Existing tests/test_reusable_executor.py suite still passes (pytest tests/test_reusable_executor.py -v). Pay particular attention to test_reusable_executor_thread_safety (added in PR [MRG] add a new test: test_reusable_executor_thread_safety #116) to confirm no regression.

  • pytest tests/ -v -k "not (slow or psutil)" passes on Python 3.11, 3.12, 3.13. (Local matrix; CI covers the rest.)

  • Manual smoke: run the issue's exact reproducer in an ipython session; expect no exception across 10 outer iterations, where pre-fix it triggers within 1-3 iterations on the maintainer's machine.

Fixes #458

AI was used for assistance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Submission to reusable executor is not fully thread-safe

1 participant