-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmatched_map.py
466 lines (402 loc) · 19.7 KB
/
matched_map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# MIT License
# Copyright (c) 2023 Luca Lobefaro, Meher V. R. Malladi, Olga Vysotska, Tiziano Guadagnino, Cyrill Stachniss
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from tqdm import tqdm
import open3d as o3d
from typing import List, Tuple
import numpy as np
from pathlib import Path
from utils.loading_tools import load_point_cloud
import os
from scipy.spatial import KDTree
import copy
class MatchedMap:
def __init__(self, ref_folder: Path, query_folder: Path) -> None:
# Load the reference point cloud
assert "voxelized_map.ply" in os.listdir(
str(ref_folder / "mapping_out")
), "Reference folder should contain the file 'mapping_out/voxelized_map.ply'"
self._ref_pcd = load_point_cloud(ref_folder / "mapping_out/voxelized_map.ply")
# Load the query point cloud
pcd_filename = f"voxelized_map.ply"
assert pcd_filename in os.listdir(
str(query_folder / "mapping_out")
), "Query folder should contain the file 'mapping_out/voxelized_map.ply'"
self._query_pcd = load_point_cloud(query_folder / "mapping_out" / pcd_filename)
# Pre-compute needed information
self._ref_pcd.estimate_normals()
self._query_pcd.estimate_normals()
self._ref_pcd_points = np.asarray(self._ref_pcd.points)
self._query_pcd_points = np.asarray(self._query_pcd.points)
self._ref_pcd_kdtree = KDTree(self._ref_pcd_points)
self._query_pcd_kdtree = KDTree(self._query_pcd_points)
self._ref_pcd_size = len(self._ref_pcd.points)
self._query_pcd_size = len(self._query_pcd.points)
# Initialize all the book-keeping for reference points
self._ref_point_transformations = []
self._ref_point_transformations_dists = []
for _ in range(self._ref_pcd_size):
self._ref_point_transformations.append(None)
self._ref_point_transformations_dists.append(np.inf)
# Initialize all the book-keeping for the query points
self._query_point_transformations = []
self._query_point_transformations_dists = []
self._point_matches_query2ref = []
for _ in range(self._query_pcd_size):
self._query_point_transformations.append(None)
self._query_point_transformations_dists.append(np.inf)
self._point_matches_query2ref.append(None)
def size_matches(self) -> int:
return sum(el is not None for el in self._point_matches_query2ref)
def get_ref_pcd(self) -> o3d.geometry.PointCloud:
return self._ref_pcd
def get_query_pcd(self) -> o3d.geometry.PointCloud:
return self._query_pcd
def get_ref_pcd_kdtree(self) -> KDTree:
return self._ref_pcd_kdtree
def get_query_pcd_kdtree(self) -> KDTree:
return self._query_pcd_kdtree
def add_matches(self, matches: List[Tuple[int, int]]) -> None:
# For every new match
for match in matches:
# Add the new association if we do not have already a
# point associated to this query
if self._point_matches_query2ref[match[0]] is None:
self._point_matches_query2ref[match[0]] = match[1]
def get_matches(self) -> List[Tuple[int, int]]:
return self._point_matches
def visualize_ref_map(self) -> None:
o3d.visualization.draw_geometries([self._ref_pcd])
def visualize_query_map(self) -> None:
o3d.visualization.draw_geometries([self._query_pcd])
def visualize(self, with_associations: bool = False) -> None:
colored_query_pcd = copy.deepcopy(self._query_pcd)
colored_query_pcd.paint_uniform_color([1, 0.706, 0])
if with_associations:
o3d.visualization.draw_geometries(
[self._ref_pcd, colored_query_pcd, self._get_matches_lines()]
)
else:
o3d.visualization.draw_geometries([self._ref_pcd, colored_query_pcd])
def save_matches(self, filename: Path) -> None:
with open(filename, "w") as file:
for query_id, ref_id in enumerate(self._point_matches_query2ref):
if ref_id is not None:
file.write(f"{query_id} {ref_id}\n")
def save_points_transformations(
self, ref_filename: Path, query_filename: Path
) -> None:
with open(ref_filename, "w") as file:
for trans in self._ref_point_transformations:
if trans is None:
file.write("None\n")
else:
file.write(f"{trans[0]} {trans[1]} {trans[2]}\n")
with open(query_filename, "w") as file:
for trans in self._query_point_transformations:
if trans is None:
file.write("None\n")
else:
file.write(f"{trans[0]} {trans[1]} {trans[2]}\n")
def load_matches(self, filename: Path) -> None:
with open(filename, "r") as file:
for line in file:
query_pt_idx, ref_pt_idx = line.split()
self._point_matches_query2ref[int(query_pt_idx)] = int(ref_pt_idx)
def load_points_transformations(
self, ref_filename: Path, query_filename: Path
) -> None:
with open(ref_filename, "r") as file:
for point_idx, line in enumerate(file):
if line == "None\n":
self._ref_point_transformations[point_idx] = None
else:
x, y, z = line.split()
self._ref_point_transformations[point_idx] = [
float(x),
float(y),
float(z),
]
assert len(self._ref_point_transformations) == self._ref_pcd_size
with open(query_filename, "r") as file:
for point_idx, line in enumerate(file):
if line == "None\n":
self._query_point_transformations[point_idx] = None
else:
x, y, z = line.split()
self._query_point_transformations[point_idx] = [
float(x),
float(y),
float(z),
]
assert len(self._query_point_transformations) == self._query_pcd_size
# This function computes the transformation of each reference point for which
# we have a sparse association (an association from lobefaro2023iros)
# Each of this point is called "fixed point"
def compute_points_transformations_from_matches(self) -> None:
for query_pt_idx, ref_pt_idx in enumerate(self._point_matches_query2ref):
if ref_pt_idx is not None:
self._ref_point_transformations[ref_pt_idx] = (
self._query_pcd_points[query_pt_idx]
- self._ref_pcd_points[ref_pt_idx]
)
self._ref_point_transformations_dists[ref_pt_idx] = 0
self._query_point_transformations[query_pt_idx] = (
self._ref_pcd_points[ref_pt_idx]
- self._query_pcd_points[query_pt_idx]
)
self._query_point_transformations_dists[query_pt_idx] = 0
def _find_nearest_points(
self, point: np.ndarray, map_kd_tree: KDTree, n_points: int, nn_threshold: float
) -> Tuple[List[int], List[float]]:
dists, ids = map_kd_tree.query(
point.reshape(-1, 3),
k=n_points,
distance_upper_bound=nn_threshold,
workers=-1,
)
# If we are searching only for one point deal with th results in a different way
if n_points == 1:
if dists[0] != np.inf:
return [ids[0]], [dists[0]]
else:
return [], []
# If, instead, we are searching for more points, bulid the resulting array
nearest_ids = []
nearest_dists = []
for dist, idx in zip(dists[0], ids[0]):
if dist != np.inf:
nearest_ids.append(idx)
nearest_dists.append(dist)
return nearest_ids, nearest_dists
def propagate_T(
self, nearest_point_same_map_th, nearest_point_other_map_th
) -> None:
# Initialization
ref_points = np.asarray(self._ref_pcd.points)
query_points = np.asarray(self._query_pcd.points)
# Propagate transformation moving ref points
for query_pt_idx, ref_pt_idx in tqdm(
enumerate(self._point_matches_query2ref),
total=len(self._point_matches_query2ref),
desc="Propagating from reference map to query map",
):
# We do not have a transformation for this point
if ref_pt_idx is None:
continue
# Take the transformation associated to this match
current_t = self._ref_point_transformations[ref_pt_idx]
# Find the nearest points in the reference for the current match
nearest_pts_ids, _ = self._find_nearest_points(
ref_points[ref_pt_idx],
self._ref_pcd_kdtree,
16,
nearest_point_same_map_th,
)
for nn_pt_id in nearest_pts_ids:
# If this point has already a "consolidated" match, skip it
if self._ref_point_transformations_dists[nn_pt_id] == 0.0:
continue
# Move the point according to the current t
moved_pt = ref_points[nn_pt_id] + current_t
# Search the correspondence in the query pcd after moving the point
(
nearest_query_pts_ids,
nearest_query_pts_dists,
) = self._find_nearest_points(
moved_pt, self._query_pcd_kdtree, 1, nearest_point_other_map_th
)
# If we do not find one, skip this point
if len(nearest_query_pts_ids) < 1:
continue
# Take the absolute value of the distance
nearest_query_pts_dists[0] = abs(nearest_query_pts_dists[0])
# If we do not have a transformation for the current point,
# or we have a better transformation than before, save it
if (
self._ref_point_transformations[nn_pt_id] is None
or nearest_query_pts_dists[0]
< self._ref_point_transformations_dists[nn_pt_id]
):
self._ref_point_transformations[nn_pt_id] = (
query_points[nearest_query_pts_ids[0]] - ref_points[nn_pt_id]
)
self._ref_point_transformations_dists[nn_pt_id] = (
nearest_query_pts_dists[0]
)
self._query_point_transformations[nearest_query_pts_ids[0]] = (
ref_points[nn_pt_id] - query_points[nearest_query_pts_ids[0]]
)
self._query_point_transformations_dists[
nearest_query_pts_ids[0]
] = nearest_query_pts_dists[0]
self._point_matches_query2ref[nearest_query_pts_ids[0]] = nn_pt_id
# Propagate transformations moving query points
for query_pt_idx, ref_pt_idx in tqdm(
enumerate(self._point_matches_query2ref),
total=len(self._point_matches_query2ref),
desc="Propagating from query map to reference map",
):
# We do not have a transformation for this point
if ref_pt_idx is None:
continue
# Take the transformation associated to this match
current_t = self._query_point_transformations[query_pt_idx]
# Find the nearest points in the query for the current match
nearest_pts_ids, _ = self._find_nearest_points(
query_points[query_pt_idx],
self._query_pcd_kdtree,
16,
nearest_point_same_map_th,
)
for nn_pt_id in nearest_pts_ids:
# If this point has already a "consolidated" match, skip it
if self._query_point_transformations_dists[nn_pt_id] == 0.0:
continue
# Move the opint according to the current t
moved_pt = query_points[nn_pt_id] + current_t
# Search the correspondence in the ref pcd after moving the point
nearest_ref_pts_ids, nearest_ref_pts_dists = self._find_nearest_points(
moved_pt, self._ref_pcd_kdtree, 1, nearest_point_other_map_th
)
# If we do not find one, skip this point
if len(nearest_ref_pts_ids) < 1:
continue
# Take the absolute value of the distance
nearest_ref_pts_dists[0] = abs(nearest_ref_pts_dists[0])
# If we do not have a transformation for the current point,
# or we have a better transformation than before, save it
if (
self._query_point_transformations[nn_pt_id] is None
or nearest_ref_pts_dists[0]
< self._query_point_transformations_dists[nn_pt_id]
):
self._ref_point_transformations[nearest_ref_pts_ids[0]] = (
query_points[nn_pt_id] - ref_points[nearest_ref_pts_ids[0]]
)
self._ref_point_transformations_dists[nearest_ref_pts_ids[0]] = (
nearest_ref_pts_dists[0]
)
self._query_point_transformations[nn_pt_id] = (
ref_points[nearest_ref_pts_ids[0]] - query_points[nn_pt_id]
)
self._query_point_transformations_dists[nn_pt_id] = (
nearest_ref_pts_dists[0]
)
self._point_matches_query2ref[nn_pt_id] = nearest_ref_pts_ids[0]
# Consolidate found transformations by setting distances to 0
self._ref_point_transformations_dists = [
0 if el is not None else None
for el in self._ref_point_transformations_dists
]
# Create a new point cloud containing all the points from both ref and query
# and give the interpolated position of those points that have a transformation
# associated
def get_interpolated_pcd(self, t: float) -> o3d.geometry.PointCloud:
# Initialization
assert t >= 0 and t <= 1
# Take the points from the reference pcd
interpolated_points = []
interpolated_colors = []
# ref_pcd_points = np.asarray(self._ref_pcd.points)
# ref_pcd_colors = np.asarray(self._ref_pcd.colors)
# for idx, transf in enumerate(self._ref_point_transformations):
# if transf is not None:
# interpolated_colors.append(ref_pcd_colors[idx])
# interpolated_points.append(
# ref_pcd_points[idx] + [t * el for el in transf]
# )
# # else:
# # interpolated_colors.append(ref_pcd_colors[idx])
# # interpolated_points.append(ref_pcd_points[idx])
# Take the points from the query pcd
ref_pcd_colors = np.asarray(self._ref_pcd.colors)
query_pcd_points = np.asarray(self._query_pcd.points)
query_pcd_colors = np.asarray(self._query_pcd.colors)
for idx, transf in enumerate(self._query_point_transformations):
if transf is not None:
interpolated_colors.append(
[
(t * query_color) + ((1 - t) * ref_color)
for query_color, ref_color in zip(
query_pcd_colors[idx],
ref_pcd_colors[self._point_matches_query2ref[idx]],
)
]
)
interpolated_points.append(
query_pcd_points[idx] + [(1 - t) * el for el in transf]
)
# else:
# interpolated_colors.append(query_pcd_colors[idx])
# interpolated_points.append(query_pcd_points[idx])
interpolated_pcd = o3d.geometry.PointCloud()
interpolated_pcd.points.extend(np.asarray(interpolated_points))
interpolated_pcd.colors.extend(np.asarray(interpolated_colors))
return interpolated_pcd
def get_interpolated_ref(self) -> o3d.geometry.PointCloud:
interpolated_points = []
interpolated_colors = []
ref_pcd_points = np.asarray(self._ref_pcd.points)
ref_pcd_colors = np.asarray(self._ref_pcd.colors)
query_pcd_colors = np.asarray(self._query_pcd.colors)
for idx, transf in enumerate(self._ref_point_transformations):
if transf is not None:
interpolated_colors.append(
# TODO: color is no interpolated here
ref_pcd_colors[idx]
)
interpolated_points.append(ref_pcd_points[idx] + [el for el in transf])
else:
interpolated_colors.append(ref_pcd_colors[idx])
interpolated_points.append(ref_pcd_points[idx])
interpolated_ref = o3d.geometry.PointCloud()
interpolated_ref.points.extend(np.asarray(interpolated_points))
interpolated_ref.colors.extend(np.asarray(interpolated_colors))
return interpolated_ref
def _get_matches_lines(
self,
) -> o3d.geometry.LineSet:
# Get the pcd points
query_pcd_points = np.asarray(self._query_pcd.points)
reference_pcd_points = np.asarray(self._ref_pcd.points)
# Define an open3D line set
line_set = o3d.geometry.LineSet()
# Take all the matched points and, for each of them, create a line
points = []
lines = []
current_point_idx = 0
for query_id, ref_id in enumerate(self._point_matches_query2ref):
# Ignore if this is not an actual association
if ref_id is None:
continue
# Add the query point of the current match
points.append(query_pcd_points[query_id])
current_point_idx += 1
# Add the reference point of the current match
points.append(reference_pcd_points[ref_id])
current_point_idx += 1
# Add a line connecting the two points
lines.append([current_point_idx - 2, current_point_idx - 1])
# Color the lines of blues
colors = [[1, 0, 0] for i in range(len(lines))]
# Fill the open3d line set
line_set.points = o3d.utility.Vector3dVector(points)
line_set.lines = o3d.utility.Vector2iVector(lines)
line_set.colors = o3d.utility.Vector3dVector(colors)
# Return it
return line_set