-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbounded_executor.py
More file actions
30 lines (26 loc) · 1.09 KB
/
bounded_executor.py
File metadata and controls
30 lines (26 loc) · 1.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from concurrent.futures import ThreadPoolExecutor
from threading import BoundedSemaphore
class BoundedExecutor:
"""BoundedExecutor behaves as a ThreadPoolExecutor which will block on
calls to submit() once the limit given as "bound" work items are queued for
execution.
:param bound: Integer - the maximum number of items in the work queue
:param max_workers: Integer - the size of the thread pool
"""
def __init__(self, bound, max_workers):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.semaphore = BoundedSemaphore(bound + max_workers)
"""See concurrent.futures.Executor#submit"""
def submit(self, fn, *args, **kwargs):
self.semaphore.acquire()
try:
future = self.executor.submit(fn, *args, **kwargs)
except:
self.semaphore.release()
raise
else:
future.add_done_callback(lambda x: self.semaphore.release())
return future
"""See concurrent.futures.Executor#shutdown"""
def shutdown(self, wait=True):
self.executor.shutdown(wait)