-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
279 lines (235 loc) · 10.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# =============================================================================
"""
Code Information:
Date: 04/05/2019
Programmer: John A. Betancourt G.
Mail: [email protected]
Web: www.linkedin.com/in/jhon-alberto-betancourt-gonzalez-345557129
Description: Project 4 - Udacity - self driving cars Nanodegree
(Deep Learning) Driving Behavioral Cloning
Tested on:
python 2.7 (3.X should work)
OpenCV 3.0.0 (3.X or 4.X should work)
UBUNTU 16.04
"""
# =============================================================================
# LIBRARIES AND DEPENDENCIES - LIBRARIES AND DEPENDENCIES - LIBRARIES AND DEPEN
# =============================================================================
#importing useful packages
import numpy as np
import glob
import cv2
import csv
import os
import matplotlib.pyplot as plt
import subprocess
# =============================================================================
def getClipboardData():
p = subprocess.Popen(['xclip','-selection', 'clipboard', '-o'], stdout=subprocess.PIPE)
retcode = p.wait()
data = p.stdout.read()
return data
def setClipboardData(data):
p = subprocess.Popen(['xclip','-selection','clipboard'], stdin=subprocess.PIPE)
p.stdin.write(data)
p.stdin.close()
retcode = p.wait()
def print_list_text(img_src, str_list, origin=(0, 0), color=(0, 255, 255),
thickness=2, fontScale=0.45, y_space=20):
""" prints text list in cool way
Args:
img_src: `cv2.math` input image to draw text
str_list: `list` list with text for each row
origin: `tuple` (X, Y) coordinates to start drawings text vertically
color: `tuple` (R, G, B) color values of text to print
thickness: `int` thickness of text to print
fontScale: `float` font scale of text to print
y_space: `int` [pix] vertical space between lines
Returns:
img_src: `cv2.math` input image with text drawn
"""
for idx, strprint in enumerate(str_list):
cv2.putText(img = img_src,
text = strprint,
org = (origin[0], origin[1] + (y_space * idx)),
fontFace = cv2.FONT_HERSHEY_SIMPLEX,
fontScale = fontScale,
color = (0, 0, 0),
thickness = thickness+3,
lineType = cv2.LINE_AA)
cv2.putText(img = img_src,
text = strprint,
org = (origin[0], origin[1] + (y_space * idx)),
fontFace = cv2.FONT_HERSHEY_SIMPLEX,
fontScale = fontScale,
color = color,
thickness = thickness,
lineType = cv2.LINE_AA)
return img_src
def load_dataset(data_paths, csv_name="driving_log.csv"):
""" load dataset from path
Args:
data_paths: `list` of sting with paths to datasets
csv_name: `string` name of csv files
Returns:
data" `list` of dictionaries with datasets information
"""
data = []
for idx_path, data_path in enumerate(data_paths):
data_path = os.path.join(data_path, csv_name)
with open(data_path) as csvfile:
reader = csv.reader(csvfile)
for idx, line in enumerate(reader):
if idx:
data.append({
"img_c":line[0],
"img_l":line[1],
"img_r":line[2],
"steering": float(line[3]),
})
print("From .... {}: {} samples".format(data_paths[idx_path][-30:], idx))
return data
def create_video(cam_label, dst_path, src_path, file_name, fps=15.,
video_size=(320, 160)):
""" Create video from images list
Args:
cam_label" `string` camera label to record video
dst_path" `string` path to save videos
src_path" `string` path where data is stored
file_name" `string` video file name to save
fps" `float` desired video frame rate
video_size" `tuple` desired video size (width, height)
Returns:
"""
data = load_dataset([src_path], csv_name="driving_log.csv")
# Define the codec and format and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*"mp4v") # Codec H264, format MP4
video_name = os.path.join(dst_path, file_name) # File name and format
video_out = cv2.VideoWriter(video_name, fourcc, fps, video_size) # Video recorder variable
# Write frame to video
for idx in range(len(data)):
img = cv2.imread(os.path.join(dst_path, 'IMG', data[idx][cam_label]))
video_out.write(cv2.resize(img, video_size))
# Release video variable memory
video_out.release()
def reproduce_dataset(fps=15., loop=True, sc_fc=2., up_limit=0,
down_limit=0, dataset_path="", CORRECTION=0.2):
""" show dataset
Args:
fps: `float` desired video frame rate
loop: `boolean` enable/disable looping
sc_fc: `float` scaling factor to show video
up_limit: `int` superior limit to ignore image
down_limit: `int` inferior limit to ignore image
dataset_path: `string` path where dataset `data` is stored
CORRECTION: `float` angle correction factor
Returns:
"""
data = load_dataset([dataset_path], csv_name="driving_log.csv")
cam_labels = ("img_l", "img_c", "img_r")
reproduce = True
while True:
idx = 0
while idx in range(len(data)):
if reproduce:
imgs = []
for label in cam_labels:
img = cv2.imread(data[idx][label])
if up_limit or down_limit:
# Draw superior limit
img2 = cv2.rectangle(img.copy(),(0,0),(img.shape[1], up_limit),(0,0,0), -1)
img = cv2.line(img,(0,up_limit),(img.shape[1],up_limit),(0,0,255),2)
# Draw inferior limit
img2 = cv2.rectangle(img2,(0,int(img.shape[0]-down_limit)),(img.shape[1], img.shape[0]),(0,0,0), -1)
img = cv2.line(img,(0, int(img.shape[0]-down_limit)),(img.shape[1],int(img.shape[0]-down_limit)),(0,0,255),2)
# Overlay image
img = cv2.addWeighted(src2 = img2, src1 = img,
alpha = 0.2, beta = 1, gamma = 0)
img = cv2.resize(img,
(int(img.shape[1]*sc_fc), int(img.shape[0]*sc_fc)))
angle = data[idx]['steering']
if label == cam_labels[0]: angle += CORRECTION
elif label == cam_labels[2]: angle -= CORRECTION
angle = round(angle, 2)
print_list_text(
img, ("Angle: {}[rad]".format(angle), "cam: {}".format(label)),
origin=(10, img.shape[0] - 35), color=(255, 0, 255),
thickness=1, fontScale=0.50, y_space=20)
imgs.append(img)
# Concatenate all images
img = np.concatenate(imgs, axis = 1)
str_list = ("A: stop/reproduce", "Q: Quit", "{}%".format(round(idx*100./len(data), 2)))
print_list_text(
img, str_list, origin=(10, 15), color=(0, 255, 255),
thickness=1, fontScale=0.50, y_space=20)
cv2.imshow("data_visualization", img)
# Increment index
idx =-1 if loop and idx == len(data) -1 else (idx+1 if reproduce else idx)
user_inpu = cv2.waitKey(int(1000/fps)) & 0xFF
if user_inpu == ord('q') or user_inpu == ord('Q'): break
if user_inpu == ord('a') or user_inpu == ord('A'): reproduce = not reproduce
if user_inpu == ord('q') or user_inpu == ord('Q'): break
if idx >= len(data) -1: break
# Destroy GUI/User interface window
cv2.destroyAllWindows()
def plot_data_distribution(y_data, scfc=30, graph_name="", save_name=None):
""" plots the dataset steering angle distribution
Args:
y_data: `list` of steering angles
scfc: `int` graph's bars scaling factor
graph_name: `string` graphs name
save_name" `string` absolute path to save graph
Returns:
"""
plt.figure(figsize=(15,5))
plt.hist(y_data, density=False, bins= [bin/scfc for bin in range(-scfc, scfc, 1)], facecolor='g', alpha=0.75)
plt.xlabel('Steering Angles')
plt.ylabel('Distributions')
plt.title(graph_name)
plt.grid(True)
if save_name is not None:
plt.savefig(save_name)
plt.show()
def plot_training_history(history_object, fig_size=(20,30), save_name=None):
""" plots the training loss
Args:
history_object: `dic` with training history loss for validation and training
fig_size: `tuple` graph's size
save_name" `string` absolute path to save graph
Returns:
"""
plt.figure(figsize=fig_size)
### plot the training and validation loss for each epoch
plt.plot(history_object.history['loss'])
plt.plot(history_object.history['val_loss'])
plt.title('model mean squared error loss')
plt.ylabel('mean squared error loss')
plt.xlabel('epoch')
plt.legend(['training set', 'validation set'], loc='upper right')
plt.grid(True)
if save_name is not None:
plt.savefig(save_name)
plt.show()
def create_model_result_video(src_path, dst_path="", save_name="model_results_cam_c.mp4",
fps=30., video_size=(360, 160)):
""" creates models results from behavioral driving images
Args:
src_path: `string` absolute path where models results images are stored
dst_path" `string` destination path to save video
save_name" `string` video file name
fps" `float` video frame rate
video_size" `tuple` desired video size
Returns:
"""
# Define the codec and format and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*"mp4v") # Codec H264, format MP4
video_name = os.path.join(dst_path, save_name) # File name and format
video_out = cv2.VideoWriter(video_name, fourcc, fps, video_size) # Video recorder variable
for index, filename in enumerate(sorted(os.listdir(src_path))):
# Read image and write it to video
img_path = os.path.join(src_path, filename)
img = cv2.imread(img_path)
video_out.write(cv2.resize(img, video_size))
# Release video variable memory
video_out.release()
# =============================================================================