From 83330d2dc1f889f5c1623c837a00a5fbd8cfcb2f Mon Sep 17 00:00:00 2001 From: Kirill Rybachuk Date: Thu, 24 Aug 2023 16:47:49 +0200 Subject: [PATCH 1/2] an option to download only added files for a given dataset version --- clearml/datasets/dataset.py | 108 ++++++++++++++++++++++++++++++++---- 1 file changed, 98 insertions(+), 10 deletions(-) diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 8db853ce..cf6c6748 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -909,8 +909,9 @@ def is_final(self): return self._task.get_status() not in ( Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created, Task.TaskStatusEnum.failed) - def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True, max_workers=None): - # type: (bool, Optional[int], Optional[int], bool, Optional[int]) -> str + def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True, max_workers=None, + only_added=False): + # type: (bool, Optional[int], Optional[int], bool, Optional[int], bool) -> str """ Return a base folder with a read-only (immutable) local copy of the entire dataset download and copy / soft-link, files from all the parent dataset versions. The dataset needs to be finalized @@ -930,6 +931,7 @@ def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_o :param raise_on_error: If True, raise exception if dataset merging failed on any file :param max_workers: Number of threads to be spawned when getting the dataset copy. Defaults to the number of logical cores. + :param only_added: If True, ignore all the parent datasets and download only files added to the latest version :return: A base folder for the entire dataset """ @@ -942,14 +944,22 @@ def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_o raise ValueError("Cannot get a local copy of a dataset that was not finalized/closed") max_workers = max_workers or psutil.cpu_count() - # now let's merge the parents - target_folder = self._merge_datasets( - use_soft_links=use_soft_links, - raise_on_error=raise_on_error, - part=part, - num_parts=num_parts, - max_workers=max_workers, - ) + if only_added: + # merge only added files, ignoring the parents + if part is not None or num_parts is not None: + LoggerRoot.get_base_logger().info("Getting only added files, ignoring parents") + target_folder = self._merge_diff( + max_workers=max_workers, + ) + else: + # now let's merge the parents + target_folder = self._merge_datasets( + use_soft_links=use_soft_links, + raise_on_error=raise_on_error, + part=part, + num_parts=num_parts, + max_workers=max_workers, + ) return target_folder def get_mutable_local_copy( @@ -2320,6 +2330,7 @@ def _download_link(link, target_path): LoggerRoot.get_base_logger().info(log_string) else: link.size = Path(target_path).stat().st_size + if not max_workers: for relative_path, link in links.items(): target_path = os.path.join(target_folder, relative_path) @@ -2454,6 +2465,51 @@ def _get_next_data_artifact_name(self, last_artifact_name=None): numbers = sorted([int(a[prefix_len:]) for a in data_artifact_entries if a.startswith(prefix)]) return '{}{:03d}'.format(prefix, numbers[-1]+1 if numbers else 1) + def _merge_diff(self, max_workers=None): + # type: (Optional[int]) -> str + """ + Download only the added files of the dataset, ignoring all the data from parents + + :param max_workers: Number of threads to be spawned when merging datasets. Defaults to the number + of logical cores. + + :return: the target folder + """ + + max_workers = max_workers or psutil.cpu_count() + + # just create the dataset target folder + target_base_folder, _ = self._create_ds_target_folder( + part=None, num_parts=None, lock_target_folder=True) + + # check if target folder is not empty, see if it contains everything we need + if target_base_folder and next(target_base_folder.iterdir(), None): + if self._verify_diff_folder(target_base_folder): + target_base_folder.touch() + self._release_lock_ds_target_folder(target_base_folder) + return target_base_folder.as_posix() + else: + LoggerRoot.get_base_logger().info('Dataset diff needs refreshing, downloading added files') + # we should delete the entire cache folder + shutil.rmtree(target_base_folder.as_posix()) + # make sure we recreate the dataset target folder + target_base_folder.mkdir(parents=True, exist_ok=True) + + self._get_dataset_files( + force=True, + selected_chunks=None, + cleanup_target_folder=True, + target_folder=target_base_folder, + max_workers=max_workers + ) + + # update target folder timestamp + target_base_folder.touch() + + # if we have no dependencies, we can just return now + self._release_lock_ds_target_folder(target_base_folder) + return target_base_folder.absolute().as_posix() + def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, num_parts=None, max_workers=None): # type: (bool, bool, Optional[int], Optional[int], Optional[int]) -> str """ @@ -3210,6 +3266,38 @@ def copy_file(file_entry): raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None])) pool.close() + + def _verify_diff_folder(self, target_base_folder): + # type: (Path) -> bool + target_base_folder = Path(target_base_folder) + # check the file size for the added portion of the dataset, regardless of parents + verified = True + + datasets = self._dependency_graph[self._id] + unified_list = set() + for ds_id in datasets: + dataset = self.get(dataset_id=ds_id) + unified_list |= set(dataset._dataset_file_entries.values()) + unified_list |= set(dataset._dataset_link_entries.values()) + + added_list = [ + f + for f in list(self._dataset_file_entries.values()) + list(self._dataset_link_entries.values()) + if f not in unified_list + ] + # noinspection PyBroadException + try: + for f in set(added_list): + + # check if the local size and the stored size match (faster than comparing hash) + if (target_base_folder / f.relative_path).stat().st_size != f.size: + verified = False + break + except Exception: + verified = False + + return verified + def _verify_dataset_folder(self, target_base_folder, part, chunk_selection): # type: (Path, Optional[int], Optional[dict]) -> bool target_base_folder = Path(target_base_folder) From f3916ccd608c72b87b126c7e715278b36447e85a Mon Sep 17 00:00:00 2001 From: Kirill Rybachuk Date: Mon, 28 Aug 2023 18:13:41 +0200 Subject: [PATCH 2/2] renamed only_added -> ignore_parent_datasets --- clearml/datasets/dataset.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index cf6c6748..3c634b1f 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -910,7 +910,7 @@ def is_final(self): Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created, Task.TaskStatusEnum.failed) def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True, max_workers=None, - only_added=False): + ignore_parent_datasets=False): # type: (bool, Optional[int], Optional[int], bool, Optional[int], bool) -> str """ Return a base folder with a read-only (immutable) local copy of the entire dataset @@ -931,7 +931,7 @@ def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_o :param raise_on_error: If True, raise exception if dataset merging failed on any file :param max_workers: Number of threads to be spawned when getting the dataset copy. Defaults to the number of logical cores. - :param only_added: If True, ignore all the parent datasets and download only files added to the latest version + :param ignore_parent_datasets: If True, ignore all the parent datasets and download only files added to the latest version :return: A base folder for the entire dataset """ @@ -944,7 +944,7 @@ def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_o raise ValueError("Cannot get a local copy of a dataset that was not finalized/closed") max_workers = max_workers or psutil.cpu_count() - if only_added: + if ignore_parent_datasets: # merge only added files, ignoring the parents if part is not None or num_parts is not None: LoggerRoot.get_base_logger().info("Getting only added files, ignoring parents")