diff --git a/docs/_static/img/EgocentricalAligner_2.webm b/docs/_static/img/EgocentricalAligner_2.webm new file mode 100644 index 000000000..9caf589ca Binary files /dev/null and b/docs/_static/img/EgocentricalAligner_2.webm differ diff --git a/docs/tables/egocentrically_align_pose_numba.csv b/docs/tables/egocentrically_align_pose_numba.csv new file mode 100644 index 000000000..df244643c --- /dev/null +++ b/docs/tables/egocentrically_align_pose_numba.csv @@ -0,0 +1,10 @@ +FRAMES (MILLIONS),NUMBA TIME (S),NUMBA TIME (STEV),NUMPY TIME (S),NUMPY TIME (STEV) +1,,,10.138,0.4589 +2,,,16.894,0.264 +4,,,33.813,0.3712255 +8,,,73.43435,0.526412 +16,,,134.0284325,0.858443488 +32,,,270.4346202,1.3789 +64,,,540.896359,1.781485522 +7 BODY-PARTS PER FRAME,,,, +3 ITERATIONS,,,, diff --git a/setup.py b/setup.py index 7e54c8c24..d2a4a5ec9 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ # Setup configuration setuptools.setup( name="Simba-UW-tf-dev", - version="2.3.8", + version="2.3.9", author="Simon Nilsson, Jia Jie Choong, Sophia Hwang", author_email="sronilsson@gmail.com", description="Toolkit for computer classification and analysis of behaviors in experimental animals", diff --git a/simba/data_processors/egocentric_aligner.py b/simba/data_processors/egocentric_aligner.py index 5a9adc5c6..3313c454e 100644 --- a/simba/data_processors/egocentric_aligner.py +++ b/simba/data_processors/egocentric_aligner.py @@ -7,19 +7,19 @@ import numpy as np import pandas as pd -from simba.mixins.config_reader import ConfigReader -from simba.utils.checks import ( - check_all_file_names_are_represented_in_video_log, check_if_dir_exists, - check_int, check_valid_dataframe) -from simba.utils.enums import Formats +from simba.utils.checks import (check_all_file_names_are_represented_in_video_log, + check_if_dir_exists, check_int, check_valid_dataframe, + check_str, check_valid_tuple, check_valid_boolean, check_instance, check_if_valid_rgb_tuple) +from simba.utils.enums import Formats, Options from simba.utils.printing import SimbaTimer +from simba.utils.data import egocentrically_align_pose from simba.utils.read_write import (concatenate_videos_in_folder, find_core_cnt, find_files_of_filetypes_in_directory, find_video_of_file, get_fn_ext, get_video_meta_data, read_df, read_frm_of_video, remove_a_folder, - write_df) + write_df, read_video_info_csv, bgr_to_rgb_tuple) from simba.utils.warnings import FrameRangeWarning @@ -30,7 +30,8 @@ def _egocentric_aligner(frm_range: np.ndarray, centers: List[Tuple[int, int]], rotation_vectors: np.ndarray, target: Tuple[int, int], - color: Tuple[int, int, int] = (255, 255, 255)): + fill_clr: Tuple[int, int, int] = (255, 255, 255), + verbose: bool = False): video_meta = get_video_meta_data(video_path=video_path) cap = cv2.VideoCapture(video_path) @@ -43,19 +44,19 @@ def _egocentric_aligner(frm_range: np.ndarray, img = read_frm_of_video(video_path=cap, frame_index=frm_id) R, center = rotation_vectors[frm_id], centers[frm_id] M_rotate = np.hstack([R, np.array([[-center[0] * R[0, 0] - center[1] * R[0, 1] + center[0]], [-center[0] * R[1, 0] - center[1] * R[1, 1] + center[1]]])]) - rotated_frame = cv2.warpAffine(img, M_rotate, (video_meta['width'], video_meta['height']), borderValue=color) + rotated_frame = cv2.warpAffine(img, M_rotate, (video_meta['width'], video_meta['height']), borderValue=fill_clr) translation_x = target[0] - center[0] translation_y = target[1] - center[1] M_translate = np.float32([[1, 0, translation_x], [0, 1, translation_y]]) - final_frame = cv2.warpAffine(rotated_frame, M_translate, (video_meta['width'], video_meta['height']), borderValue=color) + final_frame = cv2.warpAffine(rotated_frame, M_translate, (video_meta['width'], video_meta['height']), borderValue=fill_clr) writer.write(final_frame) - print(f'Creating frame {frm_id} ({video_name}, CPU core: {batch+1}).') + if verbose: + print(f'Creating frame {frm_id} ({video_name}, CPU core: {batch+1}).') - cap.release() writer.release() return batch+1 -class EgocentricalAligner(ConfigReader): +class EgocentricalAligner(): """ Aligns and rotates movement data and associated video frames based on specified anchor points to produce an egocentric view of the subject. The class aligns frames around a selected anchor point, optionally rotating the subject to a consistent direction and saving the output video. @@ -65,6 +66,11 @@ class EgocentricalAligner(ConfigReader): :autoplay: :loop: + .. video:: _static/img/EgocentricalAligner_2.webm + :width: 800 + :autoplay: + :loop: + :param Union[str, os.PathLike] config_path: Path to the configuration file. :param Union[str, os.PathLike] save_dir: Directory where the processed output will be saved. :param Optional[Union[str, os.PathLike]] data_dir: Directory containing CSV files with movement data. @@ -72,136 +78,144 @@ class EgocentricalAligner(ConfigReader): :param Optional[str] anchor_2: Secondary anchor point (e.g., 'nose') defining the alignment direction. :param int direction: Target angle, in degrees, for alignment; e.g., `0` aligns along the x-axis. :param Optional[Tuple[int, int]] anchor_location: Pixel location in the output where `anchor_1` should appear; default is `(250, 250)`. + :param Tuple[int, int, int] fill_clr: If rotating the videos, the color of the additional pixels. :param Optional[bool] rotate_video: Whether to rotate the video to align with the specified direction. :param Optional[int] cores: Number of CPU cores to use for video rotation; `-1` uses all available cores. :example: - >>> aligner = EgocentricalAligner(config_path=r"C:\troubleshooting\mitra\project_folder\project_config.ini", rotate_video=True, anchor_1='tail_base', anchor_2='nose', data_dir=r'C:\troubleshooting\mitra\project_folder\csv\outlier_corrected_movement_location\test', save_dir=r'C:\out_dir') - >>> aligner.run() + >>> aligner = EgocentricalAligner(rotate_video=True, anchor_1='tail_base', anchor_2='nose', data_dir=r"C:/Users/sroni/OneDrive/Desktop/rotate_ex/data", videos_dir=r'C:\Users\sroni\OneDrive\Desktop\rotate_ex\videos', save_dir=r"C:\troubleshooting\mitra\project_folder\videos\additional/examples/rotated", video_info=r"C:\troubleshooting\mitra\project_folder\logs\video_info.csv", direction=0, anchor_location=(250, 250), fill_clr=(0, 0, 0)) + >>> aligner.run() """ def __init__(self, - config_path: Union[str, os.PathLike], + data_dir: Union[str, os.PathLike], save_dir: Union[str, os.PathLike], - data_dir: Optional[Union[str, os.PathLike]] = None, - videos_dir: Optional[Union[str, os.PathLike]] = None, - anchor_1: Optional[str] = 'tail_base', - anchor_2: Optional[str] = 'nose', + anchor_1: str = 'tail_base', + anchor_2: str = 'nose', direction: int = 0, - anchor_location: Optional[Tuple[int, int]] = (250, 250), - rotate_video: Optional[bool] = False, - cores: Optional[int] = -1): - - ConfigReader.__init__(self, config_path=config_path, read_video_info=True, create_logger=False) - if data_dir is None: - self.data_paths = find_files_of_filetypes_in_directory(directory=self.outlier_corrected_dir, extensions=['.csv']) - else: - self.data_paths = find_files_of_filetypes_in_directory(directory=data_dir, extensions=['.csv']) - - check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.data_paths) + anchor_location: Tuple[int, int] = (250, 250), + core_cnt: int = -1, + rotate_video: bool = False, + fill_clr: Tuple[int, int, int] = (250, 250, 255), + verbose: bool = True, + videos_dir: Optional[Union[str, os.PathLike]] = None, + video_info: Optional[Union[str, os.PathLike, pd.DataFrame]] = None): + + self.data_paths = find_files_of_filetypes_in_directory(directory=data_dir, extensions=['.csv']) + check_if_dir_exists(in_dir=save_dir, source=f'{self.__class__.__name__} save_dir') + check_str(name=f'{self.__class__.__name__} anchor_1', value=anchor_1) + check_str(name=f'{self.__class__.__name__} anchor_2', value=anchor_2) + check_int(name=f'{self.__class__.__name__} core_cnt', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], unaccepted_vals=[0]) + if core_cnt == -1: self.core_cnt = find_core_cnt()[0] + check_int(name=f'{self.__class__.__name__} direction', value=direction, min_value=0, max_value=360) + check_valid_tuple(x=anchor_location, source=f'{self.__class__.__name__} anchor_location', accepted_lengths=(2,), valid_dtypes=(int,)) + for i in anchor_location: check_int(name=f'{self.__class__.__name__} anchor_location', value=i, min_value=1) + check_valid_boolean(value=[rotate_video, verbose], source=f'{self.__class__.__name__} rotate_video') + if rotate_video: + check_if_valid_rgb_tuple(data=fill_clr) + fill_clr = bgr_to_rgb_tuple(value=fill_clr) + check_if_dir_exists(in_dir=videos_dir, source=f'{self.__class__.__name__} videos_dir') + check_instance(source=f'{self.__class__.__name__} video_info', accepted_types=(str, pd.DataFrame), instance=video_info) + if isinstance(video_info, str): video_info = read_video_info_csv(file_path=video_info) + else: check_valid_dataframe(df=video_info, source=f'{self.__class__.__name__} video_info', required_fields=Formats.EXPECTED_VIDEO_INFO_COLS.value) + self.video_paths = find_files_of_filetypes_in_directory(directory=videos_dir, extensions=Options.ALL_VIDEO_FORMAT_OPTIONS.value) + for file_path in self.data_paths: + find_video_of_file(video_dir=videos_dir, filename=get_fn_ext(file_path)[1], raise_error=True) + check_all_file_names_are_represented_in_video_log(video_info_df=video_info, data_paths=self.data_paths) self.anchor_1_cols = [f'{anchor_1}_x'.lower(), f'{anchor_1}_y'.lower()] self.anchor_2_cols = [f'{anchor_2}_x'.lower(), f'{anchor_2}_y'.lower()] - self.rotate_video, self.save_dir = rotate_video, save_dir - self.anchor_1, self.anchor_2 = anchor_1, anchor_2 - self.target_angle = np.deg2rad(direction) - self.anchor_location = anchor_location - check_int(name='cores', value=cores, min_value=-1, max_value=find_core_cnt()[0]) - if cores == -1: - self.cores = find_core_cnt()[0] - else: - self.cores = cores - if videos_dir is not None: - check_if_dir_exists(in_dir=videos_dir) - self.video_dir = videos_dir + self.anchor_1, self.anchor_2, self.videos_dir = anchor_1, anchor_2, videos_dir + self.rotate_video, self.save_dir, self.verbose = rotate_video, save_dir, verbose + self.anchor_location, self.direction, self.fill_clr = np.array(anchor_location), direction, fill_clr def run(self): for file_cnt, file_path in enumerate(self.data_paths): + video_timer = SimbaTimer(start=True) _, self.video_name, _ = get_fn_ext(filepath=file_path) - save_path = os.path.join(self.save_dir, f'{self.video_name}.{self.file_type}') - if not os.path.isfile(save_path): - df = read_df(file_path=file_path, file_type=self.file_type) - original_cols, self.file_path = list(df.columns), file_path - df.columns = [x.lower() for x in list(df.columns)] - check_valid_dataframe(df=df, source=self.__class__.__name__, valid_dtypes=Formats.NUMERIC_DTYPES.value, required_fields=self.anchor_1_cols + self.anchor_2_cols) - self.body_parts_lst = [x.lower() for x in self.body_parts_lst] - bp_cols = [x for x in df.columns if not x.endswith('_p')] - anchor_1_idx = self.body_parts_lst.index(self.anchor_1) - anchor_2_idx = self.body_parts_lst.index(self.anchor_2) - data_arr = df[bp_cols].values.reshape(len(df), len(self.body_parts_lst), 2).astype(np.int32) - results_arr = np.zeros_like(data_arr) - self.rotation_angles, self.rotation_vectors, self.centers, self.deltas = [], [], [], [] - for frame_index in range(data_arr.shape[0]): - frame_points = data_arr[frame_index] - frame_anchor_1 = frame_points[anchor_1_idx] - self.centers.append(tuple(frame_anchor_1)) - frame_anchor_2 = frame_points[anchor_2_idx] - delta_x, delta_y = frame_anchor_2[0] - frame_anchor_1[0], frame_anchor_2[1] - frame_anchor_1[1] - self.deltas.append((delta_x, delta_x)) - current_angle = np.arctan2(delta_y, delta_x) - rotate_angle = self.target_angle - current_angle - self.rotation_angles.append(rotate_angle) - cos_theta, sin_theta = np.cos(rotate_angle), np.sin(rotate_angle) - R = np.array([[cos_theta, -sin_theta], [sin_theta, cos_theta]]) - self.rotation_vectors.append(R) - keypoints_translated = frame_points - frame_anchor_1 - keypoints_rotated = np.dot(keypoints_translated, R.T) - anchor_1_position_after_rotation = keypoints_rotated[anchor_1_idx] - translation_to_target = np.array(self.anchor_location) - anchor_1_position_after_rotation - keypoints_aligned = keypoints_rotated + translation_to_target - results_arr[frame_index] = keypoints_aligned - - results_arr = results_arr.reshape(len(df), len(bp_cols)) - self.out_df = pd.DataFrame(results_arr, columns=bp_cols) - df.update(self.out_df) - df.columns = original_cols - write_df(df=df, file_type=self.file_type, save_path=save_path) - if self.rotate_video: - self.run_video_rotation() + if self.verbose: + print(f'Analyzing video {self.video_name}... ({file_cnt+1}/{len(self.data_paths)})') + save_path = os.path.join(self.save_dir, f'{self.video_name}.{Formats.CSV.value}') + df = read_df(file_path=file_path, file_type=Formats.CSV.value) + original_cols, self.file_path = list(df.columns), file_path + df.columns = [x.lower() for x in list(df.columns)] + check_valid_dataframe(df=df, source=self.__class__.__name__, valid_dtypes=Formats.NUMERIC_DTYPES.value, required_fields=self.anchor_1_cols + self.anchor_2_cols) + + bp_cols = [x for x in df.columns if not x.endswith('_p')] + body_parts_lst = [] + _= [body_parts_lst.append(x[:-2]) for x in bp_cols if x[:-2] not in body_parts_lst] + anchor_1_idx, anchor_2_idx = body_parts_lst.index(self.anchor_1), body_parts_lst.index(self.anchor_2) + data_arr = df[bp_cols].values.reshape(len(df), len(body_parts_lst), 2).astype(np.int32) + results_arr, self.centers, self.rotation_vectors = egocentrically_align_pose(data=data_arr, anchor_1_idx=anchor_1_idx, anchor_2_idx=anchor_2_idx, direction=self.direction, anchor_location=self.anchor_location) + results_arr = results_arr.reshape(len(df), len(bp_cols)) + self.out_df = pd.DataFrame(results_arr, columns=bp_cols) + df.update(self.out_df) + df.columns = original_cols + write_df(df=df, file_type=Formats.CSV.value, save_path=save_path) + video_timer.stop_timer() + print(f'{self.video_name} complete, saved at {save_path} (elapsed time: {video_timer.elapsed_time_str}s)') + if self.rotate_video: + #self.out_df = self.out_df.head(500) + self.run_video_rotation() + + def run_video_rotation(self): video_timer = SimbaTimer(start=True) - video_path = find_video_of_file(video_dir=self.video_dir, filename=self.video_name, raise_error=False) - if video_path is not None: - video_meta = get_video_meta_data(video_path=video_path) - save_path = os.path.join(self.save_dir, f'{self.video_name}.mp4') - temp_dir = os.path.join(self.save_dir, 'temp') - if not os.path.isdir(temp_dir): - os.makedirs(temp_dir) - else: - remove_a_folder(folder_dir=temp_dir) - os.makedirs(temp_dir) - if video_meta['frame_count'] != len(self.out_df): - FrameRangeWarning(msg=f'The video {video_path} contains {video_meta["frame_count"]} frames while the file {self.file_path} contains {len(self.out_df)} frames', source=self.__class__.__name__) - frm_list = np.arange(0, video_meta['frame_count']) - frm_list = np.array_split(frm_list, self.cores) - frm_list = [(cnt, x) for cnt, x in enumerate(frm_list)] - print(f"Creating rotated videos, multiprocessing (chunksize: {self.multiprocess_chunksize}, cores: {self.cores})...") - with multiprocessing.Pool(self.cores, maxtasksperchild=self.maxtasksperchild) as pool: - constants = functools.partial(_egocentric_aligner, - temp_dir=temp_dir, - video_name=self.video_name, - video_path=video_path, - centers=self.centers, - rotation_vectors=self.rotation_vectors, - target=self.anchor_location) - for cnt, result in enumerate(pool.imap(constants, frm_list, chunksize=self.multiprocess_chunksize)): - print(f"Rotate batch {result}/{self.cores} complete...") - pool.terminate() - pool.join() - - concatenate_videos_in_folder(in_folder=temp_dir, save_path=save_path, remove_splits=True, gpu=False) - video_timer.stop_timer() - print(f"Egocentric rotation video {save_path} complete (elapsed time: {video_timer.elapsed_time_str}s) ...") - - -if __name__ == "__main__": - aligner = EgocentricalAligner(config_path=r"C:\troubleshooting\mitra\project_folder\project_config.ini", - rotate_video=True, - anchor_1='tail_base', - anchor_2='nose', - data_dir=r'C:\troubleshooting\mitra\project_folder\csv\outlier_corrected_movement_location\temp', - videos_dir=r'C:\troubleshooting\mitra\project_folder\videos\additional\bg_removed', - save_dir=r"C:\troubleshooting\mitra\project_folder\videos\additional\bg_removed\rotated") - aligner.run() + video_path = find_video_of_file(video_dir=self.videos_dir, filename=self.video_name, raise_error=False) + video_meta = get_video_meta_data(video_path=video_path) + save_path = os.path.join(self.save_dir, f'{self.video_name}.mp4') + temp_dir = os.path.join(self.save_dir, 'temp') + if not (os.path.isdir(temp_dir)): + os.makedirs(temp_dir) + else: + remove_a_folder(folder_dir=temp_dir) + os.makedirs(temp_dir) + if video_meta['frame_count'] != len(self.out_df): + FrameRangeWarning(msg=f'The video {video_path} contains {video_meta["frame_count"]} frames while the file {self.file_path} contains {len(self.out_df)} frames', source=self.__class__.__name__) + frm_list = np.arange(0, video_meta['frame_count']) + #frm_list = np.arange(0, 500) + frm_list = np.array_split(frm_list, self.core_cnt) + frm_list = [(cnt, x) for cnt, x in enumerate(frm_list)] + print(f"Creating rotated video {self.video_name}, multiprocessing (chunksize: {1}, cores: {self.core_cnt})...") + with multiprocessing.Pool(self.core_cnt, maxtasksperchild=100) as pool: + constants = functools.partial(_egocentric_aligner, + temp_dir=temp_dir, + video_name=self.video_name, + video_path=video_path, + centers=self.centers, + rotation_vectors=self.rotation_vectors, + target=self.anchor_location, + verbose=self.verbose, + fill_clr=self.fill_clr) + for cnt, result in enumerate(pool.imap(constants, frm_list, chunksize=1)): + print(f"Rotate batch {result}/{self.core_cnt} complete...") + pool.terminate() + pool.join() + + concatenate_videos_in_folder(in_folder=temp_dir, save_path=save_path, remove_splits=True, gpu=False) + video_timer.stop_timer() + print(f"Egocentric rotation video {save_path} complete (elapsed time: {video_timer.elapsed_time_str}s) ...") + +# if __name__ == "__main__": +# aligner = EgocentricalAligner(rotate_video=True, +# anchor_1='tail_base', +# anchor_2='nose', +# data_dir=r'C:\Users\sroni\OneDrive\Desktop\rotate_ex\data', +# videos_dir=r'C:\Users\sroni\OneDrive\Desktop\rotate_ex\videos', +# save_dir=r"C:\troubleshooting\mitra\project_folder\videos\additional\examples\rotated", +# video_info=r"C:\troubleshooting\mitra\project_folder\logs\video_info.csv", +# direction=0, +# anchor_location=(250, 250), +# fill_clr=(0, 0, 0)) +# aligner.run() + + # aligner = EgocentricalAligner(rotate_video=True, + # anchor_1='tail_base', + # anchor_2='nose', + # data_dir=r'C:\troubleshooting\mitra\project_folder\csv\outlier_corrected_movement_location', + # videos_dir=r'C:\troubleshooting\mitra\project_folder\videos', + # save_dir=r"C:\troubleshooting\mitra\project_folder\videos\additional\bg_removed\rotated", + # video_info=r"C:\troubleshooting\mitra\project_folder\logs\video_info.csv") + # aligner.run() diff --git a/simba/mixins/train_model_mixin.py b/simba/mixins/train_model_mixin.py index 4312604b9..1ed491de7 100644 --- a/simba/mixins/train_model_mixin.py +++ b/simba/mixins/train_model_mixin.py @@ -326,10 +326,10 @@ def calc_permutation_importance(self, print("Calculating feature permutation importances...") timer = SimbaTimer(start=True) p_importances = permutation_importance(clf, x_test, y_test, n_repeats=n_repeats, random_state=0) - df = pd.DataFrame( - np.column_stack([feature_names, p_importances.importances_mean, p_importances.importances_std]), - columns=["FEATURE_NAME", "FEATURE_IMPORTANCE_MEAN", "FEATURE_IMPORTANCE_STDEV"]) + df = pd.DataFrame(np.column_stack([feature_names, p_importances.importances_mean, p_importances.importances_std]), columns=["FEATURE_NAME", "FEATURE_IMPORTANCE_MEAN", "FEATURE_IMPORTANCE_STDEV"]) df = df.sort_values(by=["FEATURE_IMPORTANCE_MEAN"], ascending=False) + df["FEATURE_IMPORTANCE_MEAN"] = df["FEATURE_IMPORTANCE_MEAN"].astype(np.float64) + df["FEATURE_IMPORTANCE_STDEV"] = df["FEATURE_IMPORTANCE_STDEV"].astype(np.float64) if save_file_no != None: save_file_path = os.path.join(save_dir, f'{clf_name}_{save_file_no}_permutations_importances.csv') save_file_path_plot = os.path.join(save_dir, f'{clf_name}_{save_file_no}_permutations_importances.png') diff --git a/simba/model/regression/model.py b/simba/model/regression/model.py index 6ab994e41..6c518c618 100644 --- a/simba/model/regression/model.py +++ b/simba/model/regression/model.py @@ -42,6 +42,7 @@ def fit_xgb(x: pd.DataFrame, check_int(name=f'{fit_xgb.__name__} verbosity', value=verbosity, min_value=0, max_value=3) check_float(name=f'{fit_xgb.__name__} learning_rate', value=learning_rate, min_value=0.1, max_value=1.0) xgb_reg = xgb.XGBRegressor(objective=objective, max_depth=max_depth, n_estimators=n_estimators, verbosity=verbosity) + return xgb_reg.fit(X=x, y=y) def transform_xgb(x: pd.DataFrame, model: xgb.XGBRegressor): diff --git a/simba/outlier_tools/outlier_corrector_location.py b/simba/outlier_tools/outlier_corrector_location.py index 711235574..520dd9d07 100644 --- a/simba/outlier_tools/outlier_corrector_location.py +++ b/simba/outlier_tools/outlier_corrector_location.py @@ -108,7 +108,7 @@ def __correct_outliers(self, df: pd.DataFrame, above_criteria_dict: dict): col_names = [f'{body_part_name}_x', f'{body_part_name}_y'] if len(frm_idx) > 0: df.loc[frm_idx, col_names] = np.nan - return df.fillna(method='ffill', axis=1).fillna(0) + return df.fillna(method='ffill', axis=0).fillna(0) def run(self): self.logs, self.frm_cnts = {}, {} @@ -131,7 +131,6 @@ def run(self): bp_name = animal_bps["X_bps"][bp_cnt][:-2] bp_dict[animal_name][bp_name] = animal_arr[:, bp_col_start: bp_col_start + 2] above_criteria_dict = self.__find_location_outliers(bp_dict=bp_dict, animal_criteria=animal_criteria) - df = self.__correct_outliers(df=df, above_criteria_dict=above_criteria_dict) write_df(df=df, file_type=self.file_type, save_path=save_path) self.logs[video_name], self.frm_cnts[video_name] = above_criteria_dict, len(df) diff --git a/simba/outlier_tools/outlier_corrector_location_mp.py b/simba/outlier_tools/outlier_corrector_location_mp.py index 3a043a72f..9deabb9e2 100644 --- a/simba/outlier_tools/outlier_corrector_location_mp.py +++ b/simba/outlier_tools/outlier_corrector_location_mp.py @@ -50,7 +50,7 @@ def __correct_outliers(df: pd.DataFrame, above_criteria_dict: dict): col_names = [f'{body_part_name}_x', f'{body_part_name}_y'] if len(frm_idx) > 0: df.loc[frm_idx, col_names] = np.nan - return df.fillna(method='ffill', axis=1).fillna(0) + return df.fillna(method='ffill', axis=0).fillna(0) video_timer = SimbaTimer(start=True) _, video_name, _ = get_fn_ext(data_path) diff --git a/simba/utils/data.py b/simba/utils/data.py index eaaae9758..85f1af88a 100644 --- a/simba/utils/data.py +++ b/simba/utils/data.py @@ -1423,6 +1423,75 @@ def get_library_version(library_name: str) -> str: +def egocentrically_align_pose(data: np.ndarray, + anchor_1_idx: int, + anchor_2_idx: int, + anchor_location: np.ndarray, + direction: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + + """ + Aligns a set of 2D points egocentrically based on two anchor points and a target direction. + + Rotates and translates a 3D array of 2D points (e.g., time-series of frame-wise data) such that + one anchor point is aligned to a specified location, and the direction between the two anchors is aligned + to a target angle. + + + .. video:: _static/img/EgocentricalAligner_2.webm + :width: 800 + :autoplay: + :loop: + + :param np.ndarray data: A 3D array of shape `(num_frames, num_points, 2)` containing 2D points for each frame. Each frame is represented as a 2D array of shape `(num_points, 2)`, where each row corresponds to a point's (x, y) coordinates. + :param int anchor_1_idx: The index of the first anchor point in `data` used as the center of alignment. This body-part will be placed in the center of the image. + :param int anchor_2_idx: The index of the second anchor point in `data` used to calculate the direction vector. This bosy-part will be located `direction` degrees from the anchor_1 body-part. + :param int direction: The target direction in degrees to which the vector between the two anchors will be aligned. + :param np.ndarray anchor_location: A 1D array of shape `(2,)` specifying the target (x, y) location for `anchor_1_idx` after alignment. + :return: A tuple containing the rotated data, and variables required for also rotating the video using the same rules: + - `aligned_data`: A 3D array of shape `(num_frames, num_points, 2)` with the aligned 2D points. + - `centers`: A 2D array of shape `(num_frames, 2)` containing the original locations of `anchor_1_idx` in each frame before alignment. + - `rotation_vectors`: A 3D array of shape `(num_frames, 2, 2)` containing the rotation matrices applied to each frame. + :rtype: Tuple[np.ndarray, np.ndarray, np.ndarray] + + :example: + >>> data = np.random.randint(0, 500, (100, 7, 2)) + >>> anchor_1_idx = 5 # E.g., the animal tail-base is the 5th body-part + >>> anchor_2_idx = 7 # E.g., the animal nose is the 7th row in the data + >>> anchor_location = np.array([250, 250]) # the tail-base (index 5) is placed at x=250, y=250 in the image. + >>> direction = 90 # The nose (index 7) will be placed in direction 90 degrees (S) relative to the tailbase. + >>> results, centers, rotation_vectors = egocentrically_align_pose(data=data, anchor_1_idx=anchor_1_idx, anchor_2_idx=anchor_2_idx, direction=direction) + """ + + check_valid_array(data=data, source=egocentrically_align_pose.__name__, accepted_ndims=(3,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_int(name=f'{egocentrically_align_pose.__name__} anchor_1_idx', min_value=0, max_value=data.shape[1], value=anchor_1_idx) + check_int(name=f'{egocentrically_align_pose.__name__} anchor_2_idx', min_value=0, max_value=data.shape[1], value=anchor_2_idx) + if anchor_1_idx == anchor_2_idx: raise InvalidInputError(msg=f'Anchor 1 index ({anchor_1_idx}) cannot be the same as Anchor 2 index ({anchor_2_idx})', source=egocentrically_align_pose.__name__) + check_int(name=f'{egocentrically_align_pose.__name__} direction', value=direction, min_value=0, max_value=360) + check_valid_array(data=anchor_location, source=egocentrically_align_pose.__name__, accepted_ndims=(1,), accepted_axis_0_shape=[2,], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + target_angle = np.deg2rad(direction) + centers = np.full((data.shape[0], 2), fill_value=-1, dtype=np.int32) + rotation_vectors = np.full((data.shape[0], 2, 2), fill_value=-1, dtype=np.float64) + results = np.zeros_like(data, dtype=np.int32) + for frm_idx in range(data.shape[0]): + frm_points = data[frm_idx] + frm_anchor_1, frm_anchor_2 = frm_points[anchor_1_idx], frm_points[anchor_2_idx] + centers[frm_idx] = frm_anchor_1 + delta_x, delta_y = frm_anchor_2[0] - frm_anchor_1[0], frm_anchor_2[1] - frm_anchor_1[1] + frm_angle = np.arctan2(delta_y, delta_x) + frm_rotation_angle = target_angle - frm_angle + #frm_angle = (np.pi / 2 - np.arctan2(delta_y, delta_x)) % (2 * np.pi) + #frm_rotation_angle = target_angle - frm_angle + frm_cos_theta, frm_sin_theta = np.cos(frm_rotation_angle), np.sin(frm_rotation_angle) + R = np.array([[frm_cos_theta, -frm_sin_theta], [frm_sin_theta, frm_cos_theta]]) + rotation_vectors[frm_idx] = R + keypoints_rotated = np.dot(frm_points - frm_anchor_1, R.T) + anchor_1_position_after_rotation = keypoints_rotated[anchor_1_idx] + translation_to_target = np.array(anchor_location) - anchor_1_position_after_rotation + results[frm_idx] = keypoints_rotated + translation_to_target + + return results, centers, rotation_vectors + + # run_user_defined_feature_extraction_class(config_path='/Users/simon/Desktop/envs/troubleshooting/circular_features_zebrafish/project_folder/project_config.ini', file_path='/Users/simon/Desktop/fish_feature_extractor_2023_version_5.py') diff --git a/simba/utils/enums.py b/simba/utils/enums.py index bbd95dc58..1825db9c7 100644 --- a/simba/utils/enums.py +++ b/simba/utils/enums.py @@ -171,7 +171,7 @@ class Formats(Enum): "box": ["bx.h5", "bx_filtered.h5"], "ellipse": ["el.h5", "el_filtered.h5"], } - + EXPECTED_VIDEO_INFO_COLS = ["Video", "fps", "Resolution_width", "Resolution_height", "Distance_in_mm", "pixels/mm"] class Options(Enum): ROLLING_WINDOW_DIVISORS = [2, 5, 6, 7.5, 15] diff --git a/simba/utils/read_write.py b/simba/utils/read_write.py index 7f31a8183..e1af415cf 100644 --- a/simba/utils/read_write.py +++ b/simba/utils/read_write.py @@ -48,7 +48,7 @@ check_instance, check_int, check_nvidea_gpu_available, check_str, check_valid_array, check_valid_boolean, - check_valid_dataframe, check_valid_lst) + check_valid_dataframe, check_valid_lst, check_if_valid_rgb_tuple) from simba.utils.enums import ConfigKey, Dtypes, Formats, Keys, Options from simba.utils.errors import (DataHeaderError, DuplicationError, FFMPEGCodecGPUError, FileExistError, @@ -353,6 +353,11 @@ def read_project_path_and_file_type(config: configparser.ConfigParser) -> Tuple[ return project_path, file_type +def bgr_to_rgb_tuple(value: Tuple[int, int, int]) -> Tuple[int, int, int]: + """ convert bgr tuple to rgb tuple""" + check_if_valid_rgb_tuple(data=value) + return (value[2], value[1], value[0]) + def read_video_info_csv(file_path: Union[str, os.PathLike]) -> pd.DataFrame: """ Helper to read the project_folder/logs/video_info.csv of the SimBA project in as a pd.DataFrame diff --git a/tests/data/test_projects/two_c57/models/generated_models/Attack_permutations_importances.png b/tests/data/test_projects/two_c57/models/generated_models/Attack_permutations_importances.png new file mode 100644 index 000000000..0bfb4ea56 Binary files /dev/null and b/tests/data/test_projects/two_c57/models/generated_models/Attack_permutations_importances.png differ diff --git a/tests/test_train_model_mixin.py b/tests/test_train_model_mixin.py index 39a8edaec..389c4457f 100644 --- a/tests/test_train_model_mixin.py +++ b/tests/test_train_model_mixin.py @@ -5,7 +5,7 @@ from sklearn.ensemble import RandomForestClassifier from simba.mixins.train_model_mixin import TrainModelMixin -from simba.utils.read_write import read_config_file, read_df +from simba.utils.read_write import read_config_file, read_df, read_pickle IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true" @@ -47,10 +47,11 @@ def test_random_undersampler(sample_ratio): assert y_train_out.reset_index(drop=True).equals(pd.Series([1, 0], name='Test')) @pytest.mark.parametrize("clf_path", ['tests/data/test_projects/two_c57/models/generated_models/Attack.sav']) +#@pytest.mark.parametrize("clf_path", [r"C:\projects\simba\simba\tests\data\test_projects\two_c57\models\generated_models\Attack.sav"]) def test_calc_permutation_importance(clf_path): x_test = np.array([[1, 2], [1, 2], [1, 2]]) y_test = np.array([[1], [1], [0]]) - clf = read_df(file_path=clf_path, file_type='pickle') + clf = read_pickle(data_path=clf_path) _ = TrainModelMixin().calc_permutation_importance(x_test=x_test, y_test=y_test, clf=clf, feature_names=['Feature_1', 'Feature_2'], clf_name='Attack', save_dir=os.path.dirname(clf_path)) assert os.path.isfile(os.path.join(os.path.dirname(clf_path), 'Attack_permutations_importances.csv'))