-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmulti_task.py
60 lines (47 loc) · 1.88 KB
/
multi_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import threading
from multiprocessing import Process, Queue, Barrier
import time
import logging
from typing import List
class MultiTasks(Process):
def __init__(self,
queues: List[Queue],
barrier: Barrier,
lifetime: int,
intervals: List[float]):
super().__init__()
self._lifetime = lifetime # denote the duration of video streams
self._barrier = barrier
self._task_num = len(queues)
self._queues = queues
self._intervals = intervals
assert len(queues) == len(intervals)
def send(self,
barrier: threading.Barrier,
task_id: int):
barrier.wait()
if self._intervals[task_id] == 0.0:
logging.warning('No data processed for task {}'.format(task_id))
return
count = 0
target_num = int(self._lifetime / self._intervals[task_id])
while count < target_num:
self._queues[task_id].put('img')
count += 1
time.sleep(self._intervals[task_id])
self._queues[task_id].put('exit')
print("#### Task_{} finished and image number: {} ####".format(task_id, target_num))
def run(self):
thread_barrier = threading.Barrier(self._task_num + 1)
thread_pools = []
for task_id in range(self._task_num):
thread = threading.Thread(target=self.send,
args=(thread_barrier, task_id),
name="Queue_%d" % (task_id))
thread.start()
thread_pools.append(thread)
self._barrier.wait()
thread_barrier.wait()
for t in thread_pools:
t.join()
print("All image queue threads complete successfully !!!")