-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Federico Magistri
committed
Jun 3, 2024
0 parents
commit cab481b
Showing
4 changed files
with
312 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import numpy as np | ||
|
||
from metric import Metrics3D | ||
|
||
|
||
class ChamferDistance(Metrics3D): | ||
|
||
def __init__(self): | ||
self.cd_array = [] | ||
|
||
def update(self, gt, pt): | ||
if self.prediction_is_empty(pt): | ||
self.cd_array.append(1000) # just a high value | ||
return | ||
|
||
gt_pcd = self.convert_to_pcd(gt) | ||
pt_pcd = self.convert_to_pcd(pt) | ||
pt_pcd.paint_uniform_color([1,0,0]) | ||
gt_pcd.paint_uniform_color([0,0,1]) | ||
dist_pt_2_gt = np.asarray(pt_pcd.compute_point_cloud_distance(gt_pcd)) | ||
dist_gt_2_pt = np.asarray(gt_pcd.compute_point_cloud_distance(pt_pcd)) | ||
d = (np.mean(dist_gt_2_pt) + np.mean(dist_pt_2_gt)) / 2 | ||
self.cd_array.append(d) | ||
|
||
def reset(self): | ||
self.cd_array = [] | ||
|
||
def compute(self): | ||
cd = sum(self.cd_array) / len(self.cd_array) | ||
return cd |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
import os | ||
import json | ||
import open3d as o3d | ||
import numpy as np | ||
import cv2 | ||
|
||
|
||
class ShapeCompletionDataset(): | ||
|
||
def __init__(self, | ||
data_source=None, | ||
split='train', | ||
return_pcd = True, | ||
return_rgbd = True, | ||
): | ||
|
||
assert return_pcd or return_rgbd, "return_pcd and return_rgbd are set to False. Set at least one to True" | ||
|
||
self.data_source = data_source | ||
self.split = split | ||
self.return_pcd = return_pcd | ||
self.return_rgbd = return_rgbd | ||
|
||
self.fruit_list = self.get_file_paths() | ||
|
||
def get_file_paths(self): | ||
fruit_list = {} | ||
for fid in os.listdir(os.path.join(self.data_source, self.split)): | ||
fruit_list[fid] = { | ||
'path': os.path.join(self.data_source, self.split, fid), | ||
} | ||
return fruit_list | ||
|
||
def get_gt(self, fid): | ||
return o3d.io.read_point_cloud(os.path.join(self.fruit_list[fid]['path'],'gt/pcd/fruit.ply')) | ||
|
||
def get_rgbd(self, fid): | ||
fid_root = self.fruit_list[fid]['path'] | ||
|
||
intrinsic_path = os.path.join(fid_root,'input/intrinsic.json') | ||
intrinsic = self.load_K(intrinsic_path) | ||
|
||
rgbd_data = { | ||
'intrinsic':intrinsic, | ||
'pcd': o3d.geometry.PointCloud(), | ||
'frames':{} | ||
} | ||
|
||
frames = os.listdir(os.path.join(fid_root,'input/masks/')) | ||
for frameid in frames: | ||
|
||
pose_path = os.path.join(fid_root,'input/poses/',frameid.replace('png','txt')) | ||
pose = np.loadtxt(pose_path) | ||
|
||
rgb_path = os.path.join(fid_root,'input/color/',frameid) | ||
rgb = cv2.cvtColor(cv2.imread(rgb_path), cv2.COLOR_BGR2RGB) | ||
|
||
depth_path = os.path.join(fid_root,'input/depth/',frameid.replace('png','npy')) | ||
depth = np.load(depth_path) | ||
|
||
mask_path = os.path.join(fid_root,'input/masks/',frameid) | ||
mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE) | ||
|
||
frame_key = frameid.replace('png','') | ||
|
||
if self.return_pcd: | ||
frame_pcd = self.rgbd_to_pcd(rgb, depth, mask, pose, intrinsic) | ||
rgbd_data['pcd'] += frame_pcd | ||
|
||
rgbd_data['frames'][frame_key] = { | ||
'rgb': rgb, | ||
'depth': depth, | ||
'mask': mask, | ||
'pose': pose | ||
} | ||
|
||
|
||
return rgbd_data | ||
|
||
@staticmethod | ||
def load_K(path): | ||
with open(path,'r') as f: | ||
data = json.load(f)['intrinsic_matrix'] | ||
k = np.reshape(data, (3, 3), order='F') | ||
return k | ||
|
||
@staticmethod | ||
def rgbd_to_pcd(rgb, depth, mask, pose, K): | ||
|
||
rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(o3d.geometry.Image(rgb), | ||
o3d.geometry.Image(depth*mask), | ||
depth_scale = 1, | ||
depth_trunc=1.0, | ||
convert_rgb_to_intensity=False) | ||
|
||
intrinsic = o3d.camera.PinholeCameraIntrinsic() | ||
intrinsic.set_intrinsics(height=rgb.shape[0], | ||
width=rgb.shape[1], | ||
fx=K[0,0], | ||
fy=K[1,1], | ||
cx=K[0,2], | ||
cy=K[1,2], | ||
) | ||
|
||
extrinsic = np.linalg.inv(pose) | ||
|
||
frame_pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, intrinsic, extrinsic) | ||
return frame_pcd | ||
|
||
|
||
def __len__(self): | ||
return len(self.fruit_list) | ||
|
||
def __getitem__(self, idx): | ||
|
||
keys = list(self.fruit_list.keys()) | ||
fid = keys[idx] | ||
|
||
gt_pcd = self.get_gt(fid) | ||
input_data = self.get_rgbd(fid) | ||
|
||
item = { | ||
'groundtruth_pcd': gt_pcd | ||
} | ||
if self.return_pcd: | ||
item['rgbd_pcd'] = input_data['pcd'] | ||
if self.return_rgbd: | ||
item['rgbd_intrinsic'] = input_data['intrinsic'] | ||
item['rgbd_frames'] = input_data['frames'] | ||
|
||
return item | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import open3d as o3d | ||
# import torch | ||
import numpy as np | ||
|
||
MESHTYPE = 6 | ||
TETRATYPE = 10 | ||
PCDTYPE = 1 | ||
|
||
|
||
class Metrics3D(): | ||
|
||
def prediction_is_empty(self, geom): | ||
|
||
if isinstance(geom, o3d.geometry.Geometry): | ||
geom_type = geom.get_geometry_type().value | ||
if geom_type == MESHTYPE: | ||
geom.remove_duplicated_vertices() | ||
geom.remove_duplicated_triangles() | ||
geom.remove_degenerate_triangles() | ||
empty_v = self.is_empty(len(geom.vertices)) | ||
empty_t = self.is_empty(len(geom.triangles)) | ||
empty = empty_t or empty_v | ||
elif geom_type == TETRATYPE: | ||
geom.remove_duplicated_vertices() | ||
geom.remove_duplicated_tetras() | ||
geom.remove_degenerate_tetras() | ||
empty_v = self.is_empty(len(geom.vertices)) | ||
empty_t = self.is_empty(len(geom.tetras)) | ||
empty = empty_t or empty_v | ||
elif geom_type == PCDTYPE: | ||
empty = self.is_empty(len(geom.points)) | ||
else: | ||
assert False, '{} geometry not supported'.format(geom.get_geometry_type()) | ||
elif isinstance(geom, np.ndarray): | ||
empty = self.is_empty(len(geom[:, :3])) | ||
else: | ||
assert False, '{} type not supported'.format(type(geom)) | ||
|
||
return empty | ||
|
||
@staticmethod | ||
def convert_to_pcd(geom): | ||
|
||
if isinstance(geom, o3d.geometry.Geometry): | ||
geom_type = geom.get_geometry_type().value | ||
if geom_type == MESHTYPE or geom_type == TETRATYPE: | ||
geom_pcd = geom.sample_points_uniformly(1000000) | ||
elif geom_type == PCDTYPE: | ||
geom_pcd = geom | ||
else: | ||
assert False, '{} geometry not supported'.format(geom.get_geometry_type()) | ||
elif isinstance(geom, np.ndarray): | ||
geom_pcd = o3d.geometry.PointCloud() | ||
geom_pcd.points = o3d.utility.Vector3dVector(geom[:, :3]) | ||
else: | ||
assert False, '{} type not supported'.format(type(geom)) | ||
|
||
return geom_pcd | ||
|
||
@staticmethod | ||
def is_empty(length): | ||
empty = True | ||
if length: | ||
empty = False | ||
return empty |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
import numpy as np | ||
import scipy | ||
|
||
from metric import Metrics3D | ||
|
||
|
||
class PrecisionRecall(Metrics3D): | ||
|
||
def __init__(self, min_t, max_t, num): | ||
self.thresholds = np.linspace(min_t, max_t, num) | ||
self.pr_dict = {t: [] for t in self.thresholds} | ||
self.re_dict = {t: [] for t in self.thresholds} | ||
self.f1_dict = {t: [] for t in self.thresholds} | ||
|
||
def update(self, gt, pt): | ||
if self.prediction_is_empty(pt): | ||
for t in self.thresholds: | ||
self.pr_dict[t].append(0) | ||
self.re_dict[t].append(0) | ||
self.f1_dict[t].append(0) | ||
return | ||
|
||
gt_pcd = self.convert_to_pcd(gt) | ||
pt_pcd = self.convert_to_pcd(pt) | ||
|
||
# precision: predicted --> ground truth | ||
dist_pt_2_gt = np.asarray(pt_pcd.compute_point_cloud_distance(gt_pcd)) | ||
|
||
# recall: ground truth --> predicted | ||
dist_gt_2_pt = np.asarray(gt_pcd.compute_point_cloud_distance(pt_pcd)) | ||
|
||
for t in self.thresholds: | ||
p = np.where(dist_pt_2_gt < t)[0] | ||
p = 100 / len(dist_pt_2_gt) * len(p) | ||
self.pr_dict[t].append(p) | ||
|
||
r = np.where(dist_gt_2_pt < t)[0] | ||
r = 100 / len(dist_gt_2_pt) * len(r) | ||
self.re_dict[t].append(r) | ||
|
||
# fscore | ||
if p == 0 or r == 0: | ||
f = 0 | ||
else: | ||
f = 2 * p * r / (p + r) | ||
self.f1_dict[t].append(f) | ||
|
||
def reset(self): | ||
self.pr_dict = {t: [] for t in self.thresholds} | ||
self.re_dict = {t: [] for t in self.thresholds} | ||
self.f1_dict = {t: [] for t in self.thresholds} | ||
|
||
def compute_at_threshold(self, threshold): | ||
t = self.find_nearest_threshold(threshold) | ||
pr = sum(self.pr_dict[t]) / len(self.pr_dict[t]) | ||
re = sum(self.re_dict[t]) / len(self.re_dict[t]) | ||
f1 = sum(self.f1_dict[t]) / len(self.f1_dict[t]) | ||
return pr, re, f1, t | ||
|
||
def compute_auc(self): | ||
dx = self.thresholds[1] - self.thresholds[0] | ||
perfect_predictor = scipy.integrate.simpson(np.ones_like(self.thresholds), dx=dx) | ||
|
||
pr, re, f1 = self.compute_at_all_thresholds() | ||
|
||
pr_area = scipy.integrate.simpson(pr, dx=dx) | ||
norm_pr_area = pr_area / perfect_predictor | ||
|
||
re_area = scipy.integrate.simpson(re, dx=dx) | ||
norm_re_area = re_area / perfect_predictor | ||
|
||
f1_area = scipy.integrate.simpson(f1, dx=dx) | ||
norm_f1_area = f1_area / perfect_predictor | ||
|
||
return norm_pr_area, norm_re_area, norm_f1_area | ||
|
||
def compute_at_all_thresholds(self): | ||
pr = [sum(self.pr_dict[t]) / len(self.pr_dict[t]) for t in self.thresholds] | ||
re = [sum(self.re_dict[t]) / len(self.re_dict[t]) for t in self.thresholds] | ||
f1 = [sum(self.f1_dict[t]) / len(self.f1_dict[t]) for t in self.thresholds] | ||
return pr, re, f1 | ||
|
||
def find_nearest_threshold(self, value): | ||
idx = (np.abs(self.thresholds - value)).argmin() | ||
return self.thresholds[idx] |