-
Notifications
You must be signed in to change notification settings - Fork 0
/
integration_v7_single.py
365 lines (294 loc) · 11.5 KB
/
integration_v7_single.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# -*- codeing = utf-8 -*-
# @Time : 2024-05-22 17:15
# @Author : 张庭恺
# @File : integration.py
# @Software : PyCharm
import copy
import numpy as np
from arguments import Config, get_args
# from PCReg.build_reg import Registration
# from Pointnet_Pointnet2.build_pointnet import PointNet
from my_utils import *
# server.py
from concurrent import futures
import grpc
import service_pb2
import service_pb2_grpc
import json
def parse_str(s:str):
# '5.,55.77,77.87;5.,88.37,126.72;5.,79.48,138.69'
s_lst = s.split(';')
all_points = []
for s_ in s_lst:
xyz_lst = s_.split(',')
xyz = [float(xyz_) for xyz_ in xyz_lst]
all_points.append(xyz)
edge = np.array(all_points)
return edge
# class AntomationGrpcTwoServiceServicer(antomation_two_pb2_grpc.AntomationGrpcTwoServiceServicer):
class AntomationGrpcTwoServiceServicer(service_pb2_grpc.Autov3Servicer):
def BindPlane(self, request, context):
# response = antomation_two_pb2.AntomationGrpcTwoResponse()
response = service_pb2.Response()
args = get_args()
cfg = Config()
cfg.plane = args.plane
integration = Integration(cfg)
# TODO 第一个参数 实测列表-每个元素对应一个配准、分割后的点云文件名(url)
# 1、读取配准、分割后的数据
postreg_source_files = request.filenames # type: str
postreg_source_files = postreg_source_files.split('#')
# postreg_source : list = field(default=[],metadata = {'help':'300个分割后的实测点云表面'})
# target : list = field(default=[],metadata = {'help':'3个分割后的标准点云表面'})
postreg_source = []
for file_name in postreg_source_files:
cur_pcd = read_ply(file_name)
cur_pcd = np.asarray(cur_pcd.points)
postreg_source.append(cur_pcd)
# TODO 第二个参数 选取的两个配合面的边界值
# 2、获取选取的两个配合面
boundarys = request.boundarys # type: str
if '#' in boundarys:
plane1, plane2 = boundarys.split('#')
else:
plane1 = parse_str(boundarys)
# plane1 = list(map(lambda x: float(x), plane1))
# plane1 = np.array(plane1)[None, :] # shape[1,3]
#
# plane2 = list(map(lambda x: float(x), plane2))
# plane2 = np.array(plane2)[None, :] # shape[1,3]
# 3、计算传过来选定的两个配合面各自的特征
# TODO 添加半径特征
selected_center1 = compute_centroid(plane1)[None, :] # shape[1,3]
selected_radius1 = compute_radius(plane1, selected_center1) # shape[1]
selected_radius1 = np.array([selected_radius1])[None, :]
selected_feature1 = np.concatenate([selected_center1, selected_radius1], axis=1) # shape[1,4]
# selected_center2 = compute_centroid(plane2)[None, :] # shape[1,3]
# selected_radius2 = compute_radius(plane2, selected_center2) # shape[1]
# selected_radius2 = np.array([selected_radius2])[None, :]
# selected_feature2 = np.concatenate([selected_center2, selected_radius2], axis=1) # shape[1,4]
# 4、计算所有配准后分割出来的实测点云表面的特征
centers_ply = integration.comput_center(postreg_source) # shape[n,3]
centers_ply_lst = np.split(centers_ply, centers_ply.shape[0], axis=0) # list[3]
radius_ply_lst = []
for pcd, center in zip(postreg_source, centers_ply_lst):
radius = compute_radius(pcd, center)
radius_ply_lst.append(radius)
radius_ply_lst = np.array(radius_ply_lst)[:, None] # shape[n,1]
all_feature = np.concatenate([centers_ply, radius_ply_lst], axis=1) # shape[n,4]
print('_' * 20, '特征提取完成\t\t\t', '_' * 20)
# 匹配到的文件路径
# TODO 这里实现多线程或者多进程处理提高并行计算效率,使用任务分组计算
# 5、匹配第一个配合面对应的点云
matched1_files = feature_match_cos_multi(all_feature, postreg_source_files, selected_feature1, topk=1)
matched1_idxs = [postreg_source_files.index(file) for file in matched1_files]
# 6、匹配第二个配合面对应的点云
# matched2_files = feature_match_cos_multi(all_feature, postreg_source_files, selected_feature2, topk=1)
# matched2_idxs = [postreg_source_files.index(file) for file in matched2_files]
print('_' * 20, '匹配完成\t\t\t', '_' * 20)
print('匹配到的第一个配合面:' , matched1_files)
# print('匹配到的第二个配合面:' , matched2_files)
# 7、计算第一个配合面的特征值
res_dict = {}
for matched_idx in matched1_idxs:
matched_filename = postreg_source_files[matched_idx]
print(postreg_source_files[matched_idx])
match_pcd = postreg_source[matched_idx]
# match_cad_show = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(match_pcd))
# show_files.append(match_cad_show)
# o3d.visualization.draw_geometries([selected_cad_plane_show, match_cad_show])
plane = fit_plane(match_pcd)
matched_planarity = calculate_planarity(match_pcd, plane)
matched_centroid = compute_centroid(match_pcd)
matched_radius = compute_radius(match_pcd, matched_centroid)
matched_height = compute_height(match_pcd)
res_dict[matched_filename] = {
'planarity': str(matched_planarity),
'centroid': str(matched_centroid),
'radius': str(matched_radius),
}
print('matched_planarity:', matched_planarity)
print('matched_centroid:', matched_centroid)
print('matched_radius:', matched_radius)
# print('matched_height:', matched_height)
# 7、计算第二个配合面的特征值
# for matched_idx in matched2_idxs:
#
# matched_filename = postreg_source_files[matched_idx]
# print(matched_filename)
# match_pcd = postreg_source[matched_idx]
# # match_cad_show = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(match_pcd))
# # show_files.append(match_cad_show)
# # o3d.visualization.draw_geometries([selected_cad_plane_show, match_cad_show])
#
# plane = fit_plane(match_pcd)
# matched_planarity = calculate_planarity(match_pcd, plane)
#
# matched_centroid = compute_centroid(match_pcd)
# matched_radius = compute_radius(match_pcd, matched_centroid)
# matched_height = compute_height(match_pcd)
# res_dict[matched_filename] = {
# 'planarity': str(matched_planarity),
# 'centroid': str(matched_centroid),
# 'radius': str(matched_radius),
# }
#
# print('matched_planarity:', matched_planarity)
# print('matched_centroid:', matched_centroid)
#
# print('matched_radius:', matched_radius)
# # print('matched_height:', matched_height)
#
# # 8、计算两个配合面的关系特征
# for idx1 , idx2 in zip(matched1_idxs, matched2_idxs):
# match_pcd1 = postreg_source[idx1]
# match_pcd2 = postreg_source[idx2]
#
# res_eccentricity = error_eccentricity(match_pcd1, match_pcd2)
# res_parallelism = parallelism(match_pcd1, match_pcd2)
#
# res_dict['relationship_feature'] = {
# 'eccentricity': str(res_eccentricity),
# 'parallelism': str(res_parallelism),
# }
#
# print('两配合面的偏心度为:', res_eccentricity)
# print('两配合面的平度为:', res_parallelism)
json_res = json.dumps(res_dict)
# res = res_dict.values()
# res = list(map(lambda x: str(x),res))
# res = '\n'.join(res)
response.plane1_feature = json_res
return response
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# antomation_two_pb2_grpc.add_AntomationGrpcTwoServiceServicer_to_server(AntomationGrpcTwoServiceServicer(), server)
service_pb2_grpc.add_Autov3Servicer_to_server(AntomationGrpcTwoServiceServicer(), server)
server.add_insecure_port('[::]:50059')
server.start()
print("Server started, listening on port 50059.")
server.wait_for_termination()
PointCloud = o3d.geometry.PointCloud
ndarray = np.ndarray
class Integration:
'''
集成所有部分的类
需要做的事情
1.构建配准模型
2.构建pointnet++模型
3.包含CAD离散方法
4.包含特征匹配方法
5.分割方法
'''
segmented_ply: List = []
segmented_cad: List = []
postseg_dir: str = None
def __init__(self, cfg: Config):
self.cfg = cfg
# self.reg_model = Registration(cfg) if cfg.use_reg else None
# self.pointnet_model = PointNet(cfg) if cfg.use_pointnet else None
self.postreg_ply = None
pass
def reg_ply_cad_icp(self, source: PointCloud, target: PointCloud, voxelsize: float = 5) -> Tuple[
PointCloud, ndarray]:
'''
使用ICP方法配准两个模型
Parameters
----------
source :待配准的点云
target :目标点云
voxelsize :下采样的时候体素的大小
Returns
-------
'''
# 首先进行下采样、法向量估计、快速点特征直方图(FPFH)
ply_down, ply_fpfh = preprocess_point_cloud(source, voxelsize)
cad_down, cad_fpfh = preprocess_point_cloud(target, voxelsize)
# 进行ICP配准
# 使用的方法是RANSAC 随机一致性采样法
result_ransac = execute_global_registration(ply_down, cad_down, ply_fpfh, cad_fpfh, voxel_size=voxelsize)
# 精配准
result_icp = refine_registration(ply_down, cad_down, voxelsize, result_ransac)
# 配准后的点云
# 这个方法是在原地修改
post_reg_ply = copy.deepcopy(source)
post_reg_ply = post_reg_ply.transform(result_icp.transformation)
return post_reg_ply, result_icp.transformation
def transformation(self, point_cloud: PointCloud, transformation: ndarray) -> PointCloud:
point_cloud_copy = copy.deepcopy(point_cloud)
point_cloud_copy.transform(transformation)
return point_cloud_copy
def comput_center(self, points_list: List[np.ndarray]) -> np.ndarray:
'''
计算点云的质心
Parameters
----------
points_list:List [ndarray:[n_points , 3]]
Returns
-------
ndarray:[n,3]
'''
centroid_list = []
for cur_points in points_list:
centroid = compute_centroid(cur_points)
centroid_list.append(centroid)
centroid_list = np.vstack(centroid_list)
return centroid_list
def segment_postreg_ply(self, ply_file: Optional[str] = None, out_dir: Optional[str] = None) -> List:
# TODO 先写死,后续调用c++的程序进行分割
segmented_ply_dir = './data/segmentply'
segmented_ply_dir_list = os.listdir(segmented_ply_dir)
self.postseg_ply_dir = out_dir
for ply in segmented_ply_dir_list:
cur_ply_path = os.path.join(segmented_ply_dir, ply)
cur_pcd = read_ply(cur_ply_path)
points = np.asarray(cur_pcd.points)
self.segmented_ply.append(points)
# shape不一样
return self.segmented_ply
def pc_normalize(self, pc: ndarray) -> ndarray:
centroid = np.mean(pc, axis=0)
pc = pc - centroid
m = np.max(np.sqrt(np.sum(pc ** 2, axis=1)))
pc = pc / m
return pc
def segment_cad(self, cad_file: Optional[str] = None, out_dir: Optional[str] = None) -> List:
# TODO 先写死,后续调用c++的程序进行分割
segmented_cad_dir = './data/segmentply'
segmented_cad_dir_list = os.listdir(segmented_cad_dir)
self.postseg_cad_dir = out_dir
for cad in segmented_cad_dir_list:
cur_cad_path = os.path.join(segmented_cad_dir, cad)
cur_pcd = read_ply(cur_cad_path)
points = np.asarray(cur_pcd.points)
self.segmented_cad.append(points)
# shape不一样
return self.segmented_cad
pass
def select_plane(self, plane_list: List[ndarray], idx) -> np.ndarray:
# TODO 先写死,后续会添加参数作为判断条件
'''
Parameters
----------
plane_list
idx
边界表示:str 可能多个面,不同面之间用分号表示
Returns
-------
'''
idx = idx
pcd = plane_list[idx]
return pcd
pass
def faiss_match(self, query: np.ndarray, query_dir: str, Index: np.ndarray):
# TODO 特征检索匹配
matched_file = feature_match_single(query, query_dir, Index)
self.matched_file = matched_file
pass
def visualize(self, pcd1: ndarray, pcd2: ndarray, *args):
# TODO 可视化
visualize_point_clouds(pcd1, pcd2, *args)
pass
if __name__ == '__main__':
# 初始化
serve()