-
Notifications
You must be signed in to change notification settings - Fork 6
/
TwoDimTTC.py
198 lines (174 loc) · 10.5 KB
/
TwoDimTTC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
########################################################################################################
#
# Use function TTC(samples, 'dataframe') or TTC(samples, 'values') to compute two-dimensional Time-To-Collision.
#
# The first input is a pandas dataframe of vehicle pair samples, which should include the following columns.
# -----------------------------------------------------------------------------------
# x_i : x coordinate of the ego vehicle (usually assumed to be centroid) |
# y_i : y coordinate of the ego vehicle (usually assumed to be centroid) |
# vx_i : x coordinate of the velocity of the ego vehicle |
# vy_i : y coordinate of the velocity of the ego vehicle |
# hx_i : x coordinate of the heading direction of the ego vehicle |
# hy_i : y coordinate of the heading direction of the ego vehicle |
# length_i : length of the ego vehicle |
# width_i : width of the ego vehicle |
# x_j : x coordinate of another vehicle (usually assumed to be centroid) |
# y_j : y coordinate of another vehicle (usually assumed to be centroid) |
# vx_j : x coordinate of the velocity of another vehicle |
# vy_j : y coordinate of the velocity of another vehicle |
# hx_j : x coordinate of the heading direction of another vehicle |
# hy_j : y coordinate of the heading direction of another vehicle |
# length_j : length of another vehicle |
# width_j : width of another vehicle |
#------------------------------------------------------------------------------------
# The second input allows outputing a dataframe with inputed samples plus a new column named 'TTC', or mere TTC values.
#
# If TTC==np.inf, the ego vehicle and another vehicle will never collide if they keep current speed.
# A negative TTC means the bounding boxes of the ego vehicle and another vehicle are overlapping.
# This is due to approximating the space occupied by a vehicle with a rectangular.
# In other words, TTC<0 in this computation means the collision between the two vehicles almost (or although seldom, already) occurred.
#
# *** Note that mere TTC computation can give an extreme small positive value even when the vehivles are overlapping a bit.
# In order to improve the accuracy, please use function CurrentD(samples, 'dataframe') or CurrentD(samples, 'values') to further
# exclude overlapping vehicles.
#
########################## Copyright (c) 2022 Yiru Jiao <[email protected]> ###########################
# Import
import numpy as np
import warnings
# Functions
def line(point0, point1):
x0, y0 = point0
x1, y1 = point1
a = y0 - y1
b = x1 - x0
c = x0*y1 - x1*y0
return a, b, c
def intersect(line0, line1):
a0, b0, c0 = line0
a1, b1, c1 = line1
D = a0*b1 - a1*b0 # D==0 then two lines overlap
D[D==0] = np.nan
x = (b0*c1 - b1*c0)/D
y = (a1*c0 - a0*c1)/D
# x[np.isnan(x)] = np.inf
# y[np.isnan(y)] = np.inf
return np.array([x, y])
def ison(line_start, line_end, point):
crossproduct = (point[1]-line_start[1])*(line_end[0]-line_start[0]) - (point[0]-line_start[0])*(line_end[1]-line_start[1])
dotproduct = (point[0]-line_start[0])*(line_end[0]-line_start[0]) + (point[1]-line_start[1])*(line_end[1]-line_start[1])
squaredlength = (line_end[0]-line_start[0])**2 + (line_end[1]-line_start[1])**2
return (np.absolute(crossproduct)<=1e-5)&(dotproduct>=0)&(dotproduct<=squaredlength)
def dist_p2l(point, line_start, line_end):
return np.absolute((line_end[0]-line_start[0])*(line_start[1]-point[1])-(line_start[0]-point[0])*(line_end[1]-line_start[1]))/np.sqrt((line_end[0]-line_start[0])**2+(line_end[1]-line_start[1])**2)
def getpoints(samples):
## vehicle i
heading_i = samples[['hx_i','hy_i']].values
perp_heading_i = np.array([-heading_i[:,1], heading_i[:,0]]).T
heading_scale_i = np.tile(np.sqrt(heading_i[:,0]**2+heading_i[:,1]**2), (2,1)).T
length_i = np.tile(samples.length_i.values, (2,1)).T
width_i = np.tile(samples.width_i.values, (2,1)).T
point_up = samples[['x_i','y_i']].values + heading_i/heading_scale_i*length_i/2
point_down = samples[['x_i','y_i']].values - heading_i/heading_scale_i*length_i/2
point_i1 = (point_up + perp_heading_i/heading_scale_i*width_i/2).T
point_i2 = (point_up - perp_heading_i/heading_scale_i*width_i/2).T
point_i3 = (point_down + perp_heading_i/heading_scale_i*width_i/2).T
point_i4 = (point_down - perp_heading_i/heading_scale_i*width_i/2).T
## vehicle j
heading_j = samples[['hx_j','hy_j']].values
perp_heading_j = np.array([-heading_j[:,1], heading_j[:,0]]).T
heading_scale_j= np.tile(np.sqrt(heading_j[:,0]**2+heading_j[:,1]**2), (2,1)).T
length_j = np.tile(samples.length_j.values, (2,1)).T
width_j = np.tile(samples.width_j.values, (2,1)).T
point_up = samples[['x_j','y_j']].values + heading_j/heading_scale_j*length_j/2
point_down = samples[['x_j','y_j']].values - heading_j/heading_scale_j*length_j/2
point_j1 = (point_up + perp_heading_j/heading_scale_j*width_j/2).T
point_j2 = (point_up - perp_heading_j/heading_scale_j*width_j/2).T
point_j3 = (point_down + perp_heading_j/heading_scale_j*width_j/2).T
point_j4 = (point_down - perp_heading_j/heading_scale_j*width_j/2).T
return (point_i1, point_i2, point_i3, point_i4, point_j1, point_j2, point_j3, point_j4)
def CurrentD(samples, toreturn='dataframe'):
if toreturn!='dataframe' and toreturn!='values':
warnings.warn('Incorrect target to return. Please specify \'dataframe\' or \'values\'.')
else:
point_i1, point_i2, point_i3, point_i4, point_j1, point_j2, point_j3, point_j4 = getpoints(samples)
dist_mat = []
count_i = 0
for point_i_start, point_i_end in zip([point_i1, point_i4, point_i3, point_i2],[point_i2, point_i3, point_i1, point_i4]):
count_j = 0
for point_j_start, point_j_end in zip([point_j1, point_j4, point_j3, point_j2],[point_j2, point_j3, point_j1, point_j4]):
if count_i<2 and count_j<2 :
# Distance from point to point
dist_mat.append(np.sqrt((point_i_start[0]-point_j_start[0])**2+(point_i_start[1]-point_j_start[1])**2))
dist_mat.append(np.sqrt((point_i_start[0]-point_j_end[0])**2+(point_i_start[1]-point_j_end[1])**2))
dist_mat.append(np.sqrt((point_i_end[0]-point_j_start[0])**2+(point_i_end[1]-point_j_start[1])**2))
dist_mat.append(np.sqrt((point_i_end[0]-point_j_end[0])**2+(point_i_end[1]-point_j_end[1])**2))
# Distance from point to edge
ist = intersect(line(point_i_start, point_i_start+np.array([-(point_j_start-point_j_end)[1],(point_j_start-point_j_end)[0]])), line(point_j_start, point_j_end))
ist[:,~ison(point_j_start, point_j_end, ist)] = np.nan
dist_mat.append(np.sqrt((ist[0]-point_i_start[0])**2+(ist[1]-point_i_start[1])**2))
# Overlapped bounding boxes
ist = intersect(line(point_i_start, point_i_end), line(point_j_start, point_j_end))
dist = np.ones(len(samples))*np.nan
dist[ison(point_i_start, point_i_end, ist)&ison(point_j_start, point_j_end, ist)] = 0
dist[np.isnan(ist[0])&(ison(point_i_start, point_i_end, point_j_start)|ison(point_i_start, point_i_end, point_j_end))] = 0
dist_mat.append(dist)
count_j += 1
count_i += 1
cdist = np.nanmin(np.array(dist_mat), axis=0)
if toreturn=='dataframe':
samples['CurrentD'] = cdist
return samples
elif toreturn=='values':
return cdist
def TTC_ij(samples):
point_i1, point_i2, point_i3, point_i4, point_j1, point_j2, point_j3, point_j4 = getpoints(samples)
direct_v = (samples[['vx_i','vy_i']].values - samples[['vx_j','vy_j']].values).T
dist_mat = []
leaving_mat = []
for point_line_start in [point_i1,point_i2,point_i3,point_i4]:
for edge_start, edge_end in zip([point_j1, point_j3, point_j1, point_j2],[point_j2, point_j4, point_j3, point_j4]):
point_line_end = point_line_start+direct_v
### intersection point
ist = intersect(line(point_line_start, point_line_end), line(edge_start, edge_end))
ist[:,~ison(edge_start, edge_end, ist)] = np.nan
### distance from point to intersection point
dist_ist = np.sqrt((ist[0]-point_line_start[0])**2+(ist[1]-point_line_start[1])**2)
dist_ist[np.isnan(dist_ist)] = np.inf
dist_mat.append(dist_ist)
leaving = direct_v[0]*(ist[0]-point_line_start[0]) + direct_v[1]*(ist[1]-point_line_start[1])
leaving[leaving>=0] = 10
leaving[leaving<0] = 1
leaving_mat.append(leaving)
dist2overlap = np.array(dist_mat).min(axis=0)
TTC = dist2overlap/np.sqrt((samples.vx_i-samples.vx_j)**2+(samples.vy_i-samples.vy_j)**2)
leaving = np.nansum(np.array(leaving_mat),axis=0)
TTC[leaving<10] = np.inf
TTC[(leaving>10)&(leaving%10!=0)] = -1
return TTC
# Computation
def TTC(samples, toreturn='dataframe'):
if toreturn!='dataframe' and toreturn!='values':
warnings.warn('Incorrect target to return. Please specify \'dataframe\' or \'values\'.')
else:
ttc_ij = TTC_ij(samples)
keys = [var+'_i' for var in ['x','y','vx','vy','hx','hy','length','width']]
values = [var+'_j' for var in ['x','y','vx','vy','hx','hy','length','width']]
keys.extend(values)
values.extend(keys)
rename_dict = {keys[i]: values[i] for i in range(len(keys))}
ttc_ji = TTC_ij(samples.rename(columns=rename_dict))
if toreturn=='dataframe':
samples['TTC'] = np.minimum(ttc_ij, ttc_ji)
return samples
elif toreturn=='values':
return np.minimum(ttc_ij, ttc_ji)
# Efficiency evaluation
def efficiency(samples, iterations):
import time
ts = []
for _ in range(iterations):
t = time.time()
_ = TTC(samples, 'values')
ts.append(time.time()-t)
return sum(ts)/iterations