Skip to content

Fix four correctness bugs (exception hang, reentrancy, termination race, busy-poll)#7

Open
Roach wants to merge 1 commit into
michalmonday:masterfrom
Roach:fix/p0-correctness
Open

Fix four correctness bugs (exception hang, reentrancy, termination race, busy-poll)#7
Roach wants to merge 1 commit into
michalmonday:masterfrom
Roach:fix/p0-correctness

Conversation

@Roach
Copy link
Copy Markdown
Contributor

@Roach Roach commented May 15, 2026

I was reading through again and spotted four correctness bugs that all live in the same producer/consumer path, so figured I'd bundle the fixes into one PR rather than spam you with four.

Each one is independently real; together they cover most of the silent-hang / wrong-result scenarios I could reproduce.


1. Exceptions in the user function hang the generator forever

on_completed in process_chunk calls future.result() with no try/except. If the user's function raises, the exception re-raises inside the add_done_callback thread, result_queue.put(...) never runs, and the consumer waits forever for the missing index.

Fix: catch it, push a _TaskError(exc) sentinel through the queue, re-raise it in the caller when that slot would have been yielded.

2. fast_map isn't reentrant

enqueueing_finished = False
enqueued_items_count = 0

These are module globals — written by the enqueuer thread, read by the consumer. Two concurrent fast_map calls (which the README explicitly says people do) clobber each other. Second call resets the counter to 0 while the first is still consuming → early return or hang.

Moved both into per-call state closed over by enqueuer. No more shared mutable module state.

3. Flag/count race at termination

Termination check is expected_index == enqueued_items_count and enqueueing_finished. The enqueuer previously incremented the counter per-item and set the flag at the end with no ordering between the two writes (and no atomic read on the consumer side). So a consumer that observes finished=True could still be reading a stale count.

Fix: write the final count before setting the Event. Anyone who sees the flag is now guaranteed to see the final total too.

4. Busy-poll + unreliable Queue.empty() in the consumer

while True:
    alive_procs = [p for p in procs if p.is_alive()]
    if result_queue.empty() and len(alive_procs) == 0:
        return
    while not result_queue.empty():
        ...

Two issues here:

  • multiprocessing.Queue.empty() is documented as unreliable, so a result produced between empty() and is_alive() can get dropped on the floor and never yielded
  • when no result is ready but workers are alive, this loop spins at 100% CPU and steals a core from the user's tasks

Replaced with result_queue.get(timeout=0.1) plus an Event-based termination check. Also added a defensive bailout if every worker has died without producing the expected index, so a worker crash can't deadlock the generator.


Tests

Added test/test_p0_regressions.py — one case per fix plus empty-input and single-task edges:

test_user_exception_is_reraised_not_hung           PASSED
test_concurrent_calls_do_not_clobber_each_other    PASSED
test_basic_ordering_and_completion                 PASSED
test_single_task                                   PASSED
test_empty_input_terminates                        PASSED

The concurrency and empty-input cases both hang on master (verified locally), the others fail with the exception or get wrong ordering under load.


Scope

I kept this strictly correctness — didn't want to balloon the diff. Side note: I've got separate notes on a handful of other things I'd be happy to send follow-up PRs for if you're interested:

  • atexit.register leaks one entry per call
  • macOS / Windows spawn portability — the existing examples/*.py and script-style tests don't run on macOS as-is (missing if __name__ == "__main__": guard)
  • on_error callback for fast_map_async, context-manager / explicit close, safer threads_limit default
  • packaging cleanup — setup.cfg + pyproject.toml duplication, the committed fast_map.py.bak

Let me know if you have questions or want me to split this up differently! :man-bowing:

1. User-function exceptions hung the generator forever
   `on_completed` called `future.result()` unguarded; if the user
   function raised, the callback aborted before putting anything on
   the result queue, and the consumer waited indefinitely for the
   missing index. Wrap in try/except, push a `_TaskError` sentinel,
   re-raise in the caller.

2. Module-level globals made fast_map non-reentrant
   `enqueueing_finished` and `enqueued_items_count` were module
   globals, so two concurrent calls (the README even encourages this)
   clobbered each other's state. Move both into per-call locals
   closed over by the enqueuer.

3. Flag/count race at termination
   The consumer's exit check read `enqueueing_finished` and
   `enqueued_items_count` in an order that could observe a stale
   count alongside finished=True. Write the final count BEFORE
   setting the Event so any observer that sees the flag also sees
   the final total.

4. Busy-poll + unreliable Queue.empty() in consumer loop
   The main loop polled `result_queue.empty()` and `is_alive()`
   without blocking, burning a full CPU between results and racing
   with documented-unreliable `Queue.empty()`. Replace with
   `result_queue.get(timeout=...)` and an Event-based termination
   check; bail out if all workers die without producing the
   expected index.

Also adds a regression test module covering each fix plus
empty-input and single-task edge cases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant