Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion loky/reusable_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def get_reusable_executor(
)
executor.shutdown(wait=True, kill_workers=kill_workers)
_executor = executor = _executor_kwargs = None
# Recursive call to build a new instance
# Build and return the replacement while still holding
# the singleton lock so this branch never returns the
# stale executor that was just shut down.
return cls.get_reusable_executor(
max_workers=max_workers, **kwargs
)
Expand All @@ -227,6 +229,20 @@ def get_reusable_executor(

def submit(self, fn, *args, **kwargs):
with self._submit_resize_lock:
if self._flags.broken is None and self._flags.shutdown:
executor = _executor
executor_kwargs = _executor_kwargs
if (
executor is not None
and executor is not self
and executor_kwargs is not None
):
# A concurrent call to get_reusable_executor rotated the
# singleton after this executor was resolved but before
# submit was called. Retry exactly once on the replacement.
return super(_ReusablePoolExecutor, executor).submit(
fn, *args, **kwargs
)
return super().submit(fn, *args, **kwargs)

def _resize(self, max_workers):
Expand Down
50 changes: 50 additions & 0 deletions tests/test_reusable_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import gc
import ctypes
from concurrent.futures import ThreadPoolExecutor
from tempfile import NamedTemporaryFile
import pytest
import warnings
Expand Down Expand Up @@ -924,6 +925,55 @@ def helper_func(
t.join()
assert output_collector == ["ok"] * len(threads)

def test_reusable_executor_submit_during_shutdown_race(self):
resolved = threading.Event()
barrier = threading.Barrier(2)
results = []
errors = []

def submit_on_resolved_executor():
try:
executor = get_reusable_executor(
max_workers=1, env={"a": "1"}
)
resolved.set()
barrier.wait(timeout=10)
results.append(
executor.submit(id_sleep, 1, 0).result(timeout=10)
)
except Exception as e:
errors.append(e)

thread = threading.Thread(target=submit_on_resolved_executor)
thread.start()

assert resolved.wait(timeout=10)
get_reusable_executor(max_workers=1, env={"a": "2"})
barrier.wait(timeout=10)
thread.join(timeout=10)

assert not thread.is_alive()
assert not any(isinstance(e, ShutdownExecutorError) for e in errors)
if errors:
raise errors[0]
assert results == [1]

@pytest.mark.parametrize("iteration", range(2))
def test_reusable_executor_submit_with_concurrent_env_changes(
self, iteration
):
def submit_with_env(i):
executor = get_reusable_executor(
max_workers=1, env={"a": f"{iteration}-{i}"}
)
return executor.submit(id_sleep, i, 0).result(timeout=10)

n_submissions = 20
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(submit_with_env, range(n_submissions)))

assert results == list(range(n_submissions))

def test_reusable_executor_reuse_true(self):
executor = get_reusable_executor(max_workers=3, timeout=42)
executor.submit(id, 42).result()
Expand Down