-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_nn_only.py
123 lines (94 loc) · 3.77 KB
/
test_nn_only.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
import argparse
from pathlib import Path
import sys
import blobconverter
import cv2
import depthai as dai
import numpy as np
import time
import os
import zipfile
# Get Argument First
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--shaves', type=int, default=6, help="Number of shaves to use for blob")
parser.add_argument('-nn', '--model_name', type=str, default=None, help="Name of the model in the zoo")
parser.add_argument('-zoo', '--zoo_type', type=str, default=None, help="Zoo type")
parser.add_argument('-shape', '--input_shape', type=int, nargs='+', help="List of ints", required=True)
parser.add_argument('-fp16', '--fp16', action="store_true", help="Input must be FP16")
parser.add_argument('-c', '--cache', action="store_true", help="Use cache with blobconverter.")
parser.add_argument('-b', '--blob', type=str, help="If specified, blob will be used.")
args = parser.parse_args()
print("Parsed arguments")
print(args)
# Start defining a pipeline
pipeline = dai.Pipeline()
# Downloading model
if args.blob is None:
model_path = blobconverter.from_zoo(name=args.model_name,
zoo_type=args.zoo_type,
shaves=args.shaves,
use_cache = args.cache,
version = blobconverter.Versions.v2021_4)
else:
model_path = args.blob
# NeuralNetwork
print("Creating Neural Network...")
detection_nn = pipeline.createNeuralNetwork()
detection_nn.setBlobPath(str(model_path))
detection_nn.setNumInferenceThreads(2)
detection_nn.input.setBlocking(True)
nn_in = pipeline.createXLinkIn()
nn_in.setMaxDataSize(6291456)
nn_in.setStreamName("in_nn")
nn_in.out.link(detection_nn.input)
# Create outputs
xout_nn = pipeline.createXLinkOut()
xout_nn.setStreamName("nn")
detection_nn.out.link(xout_nn.input)
# Pipeline defined, now the device is assigned and pipeline is started
with dai.Device(pipeline) as device:
detection_in = device.getInputQueue("in_nn", maxSize=25, blocking=True)
q_nn = device.getOutputQueue(name="nn", maxSize=5, blocking=True)
fps_storage = []
for repetition in range(5):
print(f"Repetition: {repetition}")
# feed 55 messages
for i in range(25):
frame = np.random.randint(256, size=args.input_shape, dtype=np.uint8)
nn_data = dai.NNData()
if "super-resolution" in args.model_name:
nn_data.setLayer("0", frame)
frame = np.transpose(frame, (1, 2, 0))
frame = cv2.resize(frame, (args.input_shape[2] * 4, args.input_shape[1] * 4), cv2.INTER_LINEAR)
frame = np.transpose(frame, (2, 0, 1))
nn_data.setLayer("1", frame)
else:
nn_data.setLayer("input", frame)
detection_in.send(nn_data)
for i in range(5):
q_nn.get().getFirstLayerFp16()
start = time.time()
for i in range(20):
q_nn.get().getFirstLayerFp16()
diff = time.time() - start
fps_storage.append(20/diff)
"""
while len(fps_storage) <= 30:
if not args.fp16:
frame = np.random.randint(256, size=args.input_shape, dtype=int)
nn_data = dai.NNData()
nn_data.setLayer("input", frame)
else:
frame = np.random.rand(*args.input_shape)
frame = frame.astype(np.float16).flatten().tolist()
nn_data = dai.Buffer()
nn_data.setData(frame)
start = time.time()
detection_in.send(nn_data)
in_nn = q_nn.get().getFirstLayerFp16()
diff = time.time() - start
fps_storage.append(1 / diff)
print(1 / diff)
"""
print(np.mean(fps_storage), np.std(fps_storage), fps_storage)