-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathruntime.py
76 lines (62 loc) · 2.11 KB
/
runtime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import torch
import torch.nn as nn
import multiprocessing as mp
import time
import json
from models.common import DetectMultiBackend
def time_sync():
if torch.cuda.is_available():
torch.cuda.synchronize()
return time.time()
class TaskEngine(mp.Process):
def __init__(self,
name: str,
barrier: mp.Barrier,
result: mp.Queue,
img_queue: mp.Queue,
batch: int,
model: str,
ddl: float,
):
super().__init__(name=name)
self._name = name
self._barrier = barrier
self._tracking = result
self._img_queue = img_queue
self._model = DetectMultiBackend(weights=model, fp16=True)
self._bsize = batch
self._ddl = ddl # ms
def warmup(self):
img = torch.rand(1, 3, 640, 640, dtype=torch.float16).cuda()
self._model.to('cuda')
for _ in range(50):
out = self._model(img)
torch.cuda.synchronize()
del img, out
@torch.no_grad()
def run(self):
self.warmup()
self._barrier.wait()
violated = 0
task_num = 0
p_start = time.time()
while True:
signal = self._img_queue.get()
if signal == 'exit': break
start = time.time()
ims = torch.rand((self._bsize, 3, 640, 640), dtype=torch.float16).cuda()
y = self._model(ims)
torch.cuda.synchronize()
# len(y) == 2 and len(y[0]) == 1, len(y[1]) == 3
del ims
y[0] = y[0].cpu()
for res in y[1]:
res = res.cpu()
end = time.time()
task_num += self._bsize
if (end - start) * 1000 > self._ddl:
violated += self._bsize
# print(f"{self._name} completes inference one time ...")
p_end = time.time()
self._tracking.put(json.dumps((p_start, p_end, violated, task_num)))
print("{} has finished successfully !!!".format(self._name))