-
Notifications
You must be signed in to change notification settings - Fork 0
/
workerpool.py
executable file
·124 lines (91 loc) · 2.81 KB
/
workerpool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/python
from multiprocessing import Pool
import os
import sys
import time
import signal
import json
class WorkerPool():
max_workers = None
pool = None
func = None
tasks = None
sumitted = None
def __init__(self,func,n=20, maxtasksperchild=5):
self.func = func
self.max_workers = n
self.tasks = list()
self.pool = Pool(processes = self.max_workers, maxtasksperchild = maxtasksperchild)
self.submitted = dict()
self.created = time.time()
def age(self):
return time.time() - self.created
def add(self,task):
res = self.pool.apply_async(self.func, task)
# print "wp task: {}".format(task)
self.submitted[id(res)] = { 'iname': str(task), 'submitted': int(time.time()) }
self.tasks.append(res)
def nready(self):
nready = 0
for t in self.tasks:
if t.ready():
nready+=1
return nready
def dump(self):
nready = self.nready()
print("tasks: {}/{} ready".format(nready, len(self.tasks)))
for t in self.tasks:
s = self.submitted[id(t)]
print(".. {} ({} ago)".format(s['iname'], int(time.time() - s['submitted'])))
def size(self):
return len(self.tasks)
def close(self):
print("close started...")
self.pool.close()
#self.pool.join()
self.pool.terminate()
print("closed")
def results(self):
out = list()
ready = [ t for t in self.tasks if t.ready() ]
notready = [ t for t in self.tasks if not t in ready ]
self.tasks = notready
for r in ready:
del self.submitted[id(r)]
result = r.get()
out.append(result)
return out
if __name__ == '__main__':
class task():
time = None
def __init__(self, n,time=None):
self.n = n
if time is None:
self.time = n
else:
self.time = time
def __repr__(self):
return "task {} ({} sec)".format(self.n, self.time)
def myfunc(a):
signal.signal(signal.SIGINT, signal.SIG_DFL)
time.sleep(a.time)
return a.n*a.n
def sighandler(signum, frame):
global stop
print("caught signal",signum)
stop = True
stop = False
signal.signal(signal.SIGINT, sighandler)
wp = worker_pool(myfunc)
for i in xrange(1000):
wp.add(task(i))
while True:
wp.dump()
print("size:", wp.size())
for r in wp.getlist():
print("R:",r)
time.sleep(5)
if stop:
print("stopping!")
wp.close()
sys.exit(0)