From cab481b616790e2d19c2bad1a0fc01e47a337cd0 Mon Sep 17 00:00:00 2001 From: Federico Magistri Date: Mon, 3 Jun 2024 15:20:50 +0200 Subject: [PATCH] first commit --- chamfer_distance.py | 30 ++++++++++ dataloader.py | 132 ++++++++++++++++++++++++++++++++++++++++++++ metric.py | 65 ++++++++++++++++++++++ precision_recall.py | 85 ++++++++++++++++++++++++++++ 4 files changed, 312 insertions(+) create mode 100644 chamfer_distance.py create mode 100644 dataloader.py create mode 100644 metric.py create mode 100644 precision_recall.py diff --git a/chamfer_distance.py b/chamfer_distance.py new file mode 100644 index 0000000..12ab2e9 --- /dev/null +++ b/chamfer_distance.py @@ -0,0 +1,30 @@ +import numpy as np + +from metric import Metrics3D + + +class ChamferDistance(Metrics3D): + + def __init__(self): + self.cd_array = [] + + def update(self, gt, pt): + if self.prediction_is_empty(pt): + self.cd_array.append(1000) # just a high value + return + + gt_pcd = self.convert_to_pcd(gt) + pt_pcd = self.convert_to_pcd(pt) + pt_pcd.paint_uniform_color([1,0,0]) + gt_pcd.paint_uniform_color([0,0,1]) + dist_pt_2_gt = np.asarray(pt_pcd.compute_point_cloud_distance(gt_pcd)) + dist_gt_2_pt = np.asarray(gt_pcd.compute_point_cloud_distance(pt_pcd)) + d = (np.mean(dist_gt_2_pt) + np.mean(dist_pt_2_gt)) / 2 + self.cd_array.append(d) + + def reset(self): + self.cd_array = [] + + def compute(self): + cd = sum(self.cd_array) / len(self.cd_array) + return cd diff --git a/dataloader.py b/dataloader.py new file mode 100644 index 0000000..bb14ccd --- /dev/null +++ b/dataloader.py @@ -0,0 +1,132 @@ +import os +import json +import open3d as o3d +import numpy as np +import cv2 + + +class ShapeCompletionDataset(): + + def __init__(self, + data_source=None, + split='train', + return_pcd = True, + return_rgbd = True, + ): + + assert return_pcd or return_rgbd, "return_pcd and return_rgbd are set to False. Set at least one to True" + + self.data_source = data_source + self.split = split + self.return_pcd = return_pcd + self.return_rgbd = return_rgbd + + self.fruit_list = self.get_file_paths() + + def get_file_paths(self): + fruit_list = {} + for fid in os.listdir(os.path.join(self.data_source, self.split)): + fruit_list[fid] = { + 'path': os.path.join(self.data_source, self.split, fid), + } + return fruit_list + + def get_gt(self, fid): + return o3d.io.read_point_cloud(os.path.join(self.fruit_list[fid]['path'],'gt/pcd/fruit.ply')) + + def get_rgbd(self, fid): + fid_root = self.fruit_list[fid]['path'] + + intrinsic_path = os.path.join(fid_root,'input/intrinsic.json') + intrinsic = self.load_K(intrinsic_path) + + rgbd_data = { + 'intrinsic':intrinsic, + 'pcd': o3d.geometry.PointCloud(), + 'frames':{} + } + + frames = os.listdir(os.path.join(fid_root,'input/masks/')) + for frameid in frames: + + pose_path = os.path.join(fid_root,'input/poses/',frameid.replace('png','txt')) + pose = np.loadtxt(pose_path) + + rgb_path = os.path.join(fid_root,'input/color/',frameid) + rgb = cv2.cvtColor(cv2.imread(rgb_path), cv2.COLOR_BGR2RGB) + + depth_path = os.path.join(fid_root,'input/depth/',frameid.replace('png','npy')) + depth = np.load(depth_path) + + mask_path = os.path.join(fid_root,'input/masks/',frameid) + mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE) + + frame_key = frameid.replace('png','') + + if self.return_pcd: + frame_pcd = self.rgbd_to_pcd(rgb, depth, mask, pose, intrinsic) + rgbd_data['pcd'] += frame_pcd + + rgbd_data['frames'][frame_key] = { + 'rgb': rgb, + 'depth': depth, + 'mask': mask, + 'pose': pose + } + + + return rgbd_data + + @staticmethod + def load_K(path): + with open(path,'r') as f: + data = json.load(f)['intrinsic_matrix'] + k = np.reshape(data, (3, 3), order='F') + return k + + @staticmethod + def rgbd_to_pcd(rgb, depth, mask, pose, K): + + rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(o3d.geometry.Image(rgb), + o3d.geometry.Image(depth*mask), + depth_scale = 1, + depth_trunc=1.0, + convert_rgb_to_intensity=False) + + intrinsic = o3d.camera.PinholeCameraIntrinsic() + intrinsic.set_intrinsics(height=rgb.shape[0], + width=rgb.shape[1], + fx=K[0,0], + fy=K[1,1], + cx=K[0,2], + cy=K[1,2], + ) + + extrinsic = np.linalg.inv(pose) + + frame_pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, intrinsic, extrinsic) + return frame_pcd + + + def __len__(self): + return len(self.fruit_list) + + def __getitem__(self, idx): + + keys = list(self.fruit_list.keys()) + fid = keys[idx] + + gt_pcd = self.get_gt(fid) + input_data = self.get_rgbd(fid) + + item = { + 'groundtruth_pcd': gt_pcd + } + if self.return_pcd: + item['rgbd_pcd'] = input_data['pcd'] + if self.return_rgbd: + item['rgbd_intrinsic'] = input_data['intrinsic'] + item['rgbd_frames'] = input_data['frames'] + + return item + \ No newline at end of file diff --git a/metric.py b/metric.py new file mode 100644 index 0000000..da80fb2 --- /dev/null +++ b/metric.py @@ -0,0 +1,65 @@ +import open3d as o3d +# import torch +import numpy as np + +MESHTYPE = 6 +TETRATYPE = 10 +PCDTYPE = 1 + + +class Metrics3D(): + + def prediction_is_empty(self, geom): + + if isinstance(geom, o3d.geometry.Geometry): + geom_type = geom.get_geometry_type().value + if geom_type == MESHTYPE: + geom.remove_duplicated_vertices() + geom.remove_duplicated_triangles() + geom.remove_degenerate_triangles() + empty_v = self.is_empty(len(geom.vertices)) + empty_t = self.is_empty(len(geom.triangles)) + empty = empty_t or empty_v + elif geom_type == TETRATYPE: + geom.remove_duplicated_vertices() + geom.remove_duplicated_tetras() + geom.remove_degenerate_tetras() + empty_v = self.is_empty(len(geom.vertices)) + empty_t = self.is_empty(len(geom.tetras)) + empty = empty_t or empty_v + elif geom_type == PCDTYPE: + empty = self.is_empty(len(geom.points)) + else: + assert False, '{} geometry not supported'.format(geom.get_geometry_type()) + elif isinstance(geom, np.ndarray): + empty = self.is_empty(len(geom[:, :3])) + else: + assert False, '{} type not supported'.format(type(geom)) + + return empty + + @staticmethod + def convert_to_pcd(geom): + + if isinstance(geom, o3d.geometry.Geometry): + geom_type = geom.get_geometry_type().value + if geom_type == MESHTYPE or geom_type == TETRATYPE: + geom_pcd = geom.sample_points_uniformly(1000000) + elif geom_type == PCDTYPE: + geom_pcd = geom + else: + assert False, '{} geometry not supported'.format(geom.get_geometry_type()) + elif isinstance(geom, np.ndarray): + geom_pcd = o3d.geometry.PointCloud() + geom_pcd.points = o3d.utility.Vector3dVector(geom[:, :3]) + else: + assert False, '{} type not supported'.format(type(geom)) + + return geom_pcd + + @staticmethod + def is_empty(length): + empty = True + if length: + empty = False + return empty \ No newline at end of file diff --git a/precision_recall.py b/precision_recall.py new file mode 100644 index 0000000..34b2bf7 --- /dev/null +++ b/precision_recall.py @@ -0,0 +1,85 @@ +import numpy as np +import scipy + +from metric import Metrics3D + + +class PrecisionRecall(Metrics3D): + + def __init__(self, min_t, max_t, num): + self.thresholds = np.linspace(min_t, max_t, num) + self.pr_dict = {t: [] for t in self.thresholds} + self.re_dict = {t: [] for t in self.thresholds} + self.f1_dict = {t: [] for t in self.thresholds} + + def update(self, gt, pt): + if self.prediction_is_empty(pt): + for t in self.thresholds: + self.pr_dict[t].append(0) + self.re_dict[t].append(0) + self.f1_dict[t].append(0) + return + + gt_pcd = self.convert_to_pcd(gt) + pt_pcd = self.convert_to_pcd(pt) + + # precision: predicted --> ground truth + dist_pt_2_gt = np.asarray(pt_pcd.compute_point_cloud_distance(gt_pcd)) + + # recall: ground truth --> predicted + dist_gt_2_pt = np.asarray(gt_pcd.compute_point_cloud_distance(pt_pcd)) + + for t in self.thresholds: + p = np.where(dist_pt_2_gt < t)[0] + p = 100 / len(dist_pt_2_gt) * len(p) + self.pr_dict[t].append(p) + + r = np.where(dist_gt_2_pt < t)[0] + r = 100 / len(dist_gt_2_pt) * len(r) + self.re_dict[t].append(r) + + # fscore + if p == 0 or r == 0: + f = 0 + else: + f = 2 * p * r / (p + r) + self.f1_dict[t].append(f) + + def reset(self): + self.pr_dict = {t: [] for t in self.thresholds} + self.re_dict = {t: [] for t in self.thresholds} + self.f1_dict = {t: [] for t in self.thresholds} + + def compute_at_threshold(self, threshold): + t = self.find_nearest_threshold(threshold) + pr = sum(self.pr_dict[t]) / len(self.pr_dict[t]) + re = sum(self.re_dict[t]) / len(self.re_dict[t]) + f1 = sum(self.f1_dict[t]) / len(self.f1_dict[t]) + return pr, re, f1, t + + def compute_auc(self): + dx = self.thresholds[1] - self.thresholds[0] + perfect_predictor = scipy.integrate.simpson(np.ones_like(self.thresholds), dx=dx) + + pr, re, f1 = self.compute_at_all_thresholds() + + pr_area = scipy.integrate.simpson(pr, dx=dx) + norm_pr_area = pr_area / perfect_predictor + + re_area = scipy.integrate.simpson(re, dx=dx) + norm_re_area = re_area / perfect_predictor + + f1_area = scipy.integrate.simpson(f1, dx=dx) + norm_f1_area = f1_area / perfect_predictor + + return norm_pr_area, norm_re_area, norm_f1_area + + def compute_at_all_thresholds(self): + pr = [sum(self.pr_dict[t]) / len(self.pr_dict[t]) for t in self.thresholds] + re = [sum(self.re_dict[t]) / len(self.re_dict[t]) for t in self.thresholds] + f1 = [sum(self.f1_dict[t]) / len(self.f1_dict[t]) for t in self.thresholds] + return pr, re, f1 + + def find_nearest_threshold(self, value): + idx = (np.abs(self.thresholds - value)).argmin() + return self.thresholds[idx]