diff --git a/loky/reusable_executor.py b/loky/reusable_executor.py index faf604c2..f3eec588 100644 --- a/loky/reusable_executor.py +++ b/loky/reusable_executor.py @@ -211,7 +211,9 @@ def get_reusable_executor( ) executor.shutdown(wait=True, kill_workers=kill_workers) _executor = executor = _executor_kwargs = None - # Recursive call to build a new instance + # Build and return the replacement while still holding + # the singleton lock so this branch never returns the + # stale executor that was just shut down. return cls.get_reusable_executor( max_workers=max_workers, **kwargs ) @@ -227,6 +229,20 @@ def get_reusable_executor( def submit(self, fn, *args, **kwargs): with self._submit_resize_lock: + if self._flags.broken is None and self._flags.shutdown: + executor = _executor + executor_kwargs = _executor_kwargs + if ( + executor is not None + and executor is not self + and executor_kwargs is not None + ): + # A concurrent call to get_reusable_executor rotated the + # singleton after this executor was resolved but before + # submit was called. Retry exactly once on the replacement. + return super(_ReusablePoolExecutor, executor).submit( + fn, *args, **kwargs + ) return super().submit(fn, *args, **kwargs) def _resize(self, max_workers): diff --git a/tests/test_reusable_executor.py b/tests/test_reusable_executor.py index 59fb9540..89195597 100644 --- a/tests/test_reusable_executor.py +++ b/tests/test_reusable_executor.py @@ -3,6 +3,7 @@ import sys import gc import ctypes +from concurrent.futures import ThreadPoolExecutor from tempfile import NamedTemporaryFile import pytest import warnings @@ -924,6 +925,55 @@ def helper_func( t.join() assert output_collector == ["ok"] * len(threads) + def test_reusable_executor_submit_during_shutdown_race(self): + resolved = threading.Event() + barrier = threading.Barrier(2) + results = [] + errors = [] + + def submit_on_resolved_executor(): + try: + executor = get_reusable_executor( + max_workers=1, env={"a": "1"} + ) + resolved.set() + barrier.wait(timeout=10) + results.append( + executor.submit(id_sleep, 1, 0).result(timeout=10) + ) + except Exception as e: + errors.append(e) + + thread = threading.Thread(target=submit_on_resolved_executor) + thread.start() + + assert resolved.wait(timeout=10) + get_reusable_executor(max_workers=1, env={"a": "2"}) + barrier.wait(timeout=10) + thread.join(timeout=10) + + assert not thread.is_alive() + assert not any(isinstance(e, ShutdownExecutorError) for e in errors) + if errors: + raise errors[0] + assert results == [1] + + @pytest.mark.parametrize("iteration", range(2)) + def test_reusable_executor_submit_with_concurrent_env_changes( + self, iteration + ): + def submit_with_env(i): + executor = get_reusable_executor( + max_workers=1, env={"a": f"{iteration}-{i}"} + ) + return executor.submit(id_sleep, i, 0).result(timeout=10) + + n_submissions = 20 + with ThreadPoolExecutor(max_workers=10) as executor: + results = list(executor.map(submit_with_env, range(n_submissions))) + + assert results == list(range(n_submissions)) + def test_reusable_executor_reuse_true(self): executor = get_reusable_executor(max_workers=3, timeout=42) executor.submit(id, 42).result()