-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patheventqueue.py
More file actions
100 lines (87 loc) · 2.48 KB
/
eventqueue.py
File metadata and controls
100 lines (87 loc) · 2.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import subprocess
from Queue import Queue, Empty
import threading
import thread
from threading import Thread
class IEvent:
def __init__(self):
pass
def dispatch(self):
pass
def __expr__(self):
pass
def __str__(self):
pass
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, queue):
Thread.__init__(self)
self._stop= False
self.queue = queue
self.daemon = False
self.start()
def run(self):
while not self._stop:
try:
event = self.queue.get(True, 2)
event.dispatch()
except Empty: continue # do not call task_done if no event in the queue
except Exception, e: print e
self.queue.task_done()
def stop(self):
self._stop = True
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.queue = Queue()
self._allthreads = []
for _ in range(num_threads):
self._allthreads.append(Worker(self.queue))
def addEvent(self, IEvent):
"""Add a task to the queue"""
self.queue.put(IEvent)
def stop(self):
"""Wait for completion of all the tasks in the queue"""
self.queue.join()
for thd in self._allthreads:
thd.stop()
thd.join()
class DummyEvent(IEvent):
def __init__(self, seconds):
self._seconds = seconds
def dispatch(self):
from time import sleep
print threading.currentThread().name, ": sleep", self._seconds
sleep(self._seconds)
def __expr__(self):
return "DummyEvent %d" % self._seconds
def __str__(self):
return self.__expr__()
class SubprocesEvent(IEvent):
def dispatch(self):
command = "ls -tl *.py"
print threading.currentThread().name, "Running command %s" % command
result = subprocess.call(command, shell=True)
print result
def __expr__(self):
return "SubprocesEvent"
def __str__(self):
return self.__expr__()
def test1():
from random import randrange
delays = [randrange(1, 10) for i in range(10)]
pool = ThreadPool(5)
for i, d in enumerate(delays):
pool.addEvent(DummyEvent(d))
pool.stop()
def test2():
from random import randrange
delays = [randrange(1, 10) for i in range(10)]
pool = ThreadPool(5)
for i, d in enumerate(delays):
pool.addEvent(SubprocesEvent())
pool.stop()
def main():
test2()
if __name__ == '__main__':
main()