-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththreadpool.py
142 lines (121 loc) · 3.79 KB
/
threadpool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
A simple thread pool.
@author: Junaid P V
@license: GPLv3
"""
from threading import Thread, RLock, Lock
from time import sleep
from functools import wraps
def synchronous( tlockname ):
"""
A decorator to place an instance based lock around a method
from: http://code.activestate.com/recipes/577105-synchronization-decorator-for-class-methods/
"""
def _synched(func):
@wraps(func)
def _synchronizer(self,*args, **kwargs):
tlock = self.__getattribute__( tlockname)
tlock.acquire()
try:
return func(self, *args, **kwargs)
finally:
tlock.release()
return _synchronizer
return _synched
class ThreadPoolThread(Thread):
def __init__(self, pool):
Thread.__init__(self)
self.__pool = pool
self.start()
def run(self):
try:
while True:
task = self.__pool.pop_task()
if task == None:
break
if task[1] != None and task[2] != None:
task[0](*task[1], **task[2])
elif task[1] != None:
task[0](*task[1])
else:
task[0]()
finally:
# Always inform about thread finish
self.__pool.thread_finished()
class ThreadPool(object):
def __init__(self, thread_count):
self.__tasks = []
self.task_lock = Lock()
self.__thread_count = thread_count
self.__threads = []
self.threads_lock = Lock()
self.__finished_threads_count = 0
self.finished_threads_count_lock = Lock()
@synchronous('task_lock')
def add_task(self, callable_, args=None, kwds=None):
self.__tasks.append((callable_, args, kwds))
@synchronous('task_lock')
def pop_task(self):
if len(self.__tasks) > 0:
return self.__tasks.pop(0)
else:
return None
def start_workers(self):
self.__finished_threads_count = 0
self.__threads = []
for i in range(self.__thread_count):
worker = ThreadPoolThread(self)
self.__threads.append(worker)
def wait(self):
"""
Wait for every worker threads to finish
"""
while True:
finished_threads_count = self.get_finished_threads_count()
if finished_threads_count == self.__thread_count:
break
sleep(1)
@synchronous('finished_threads_count_lock')
def thread_finished(self):
self.__finished_threads_count += 1
@synchronous('finished_threads_count_lock')
def get_finished_threads_count(self):
return self.__finished_threads_count
"""
Example
"""
if __name__ == '__main__':
from threading import current_thread
class SampleTask:
def __init__(self, name):
self.name = name
def call_me(self, count):
print "Thread: ", current_thread().getName()
for i in range(count):
print self.name, ': counting ', i
sleep(3)
pool = ThreadPool(2)
a = SampleTask("A")
b = SampleTask("B")
c = SampleTask("C")
d = SampleTask("D")
e = SampleTask("E")
pool.add_task(a.call_me, (5,))
pool.add_task(b.call_me, (7,))
pool.add_task(c.call_me, (6,))
pool.add_task(d.call_me, (4,))
pool.add_task(e.call_me, (5,))
pool.start_workers()
pool.wait()
print "Sleeping 5 seconds before next run..."
sleep(5)
f = SampleTask("F")
g = SampleTask("G")
h = SampleTask("H")
pool.add_task(f.call_me, (5,))
pool.add_task(g.call_me, (4,))
pool.add_task(h.call_me, (3,))
pool.start_workers()
pool.wait()