From 55e83ec265561aac2728daeefe9fede85d7dea40 Mon Sep 17 00:00:00 2001 From: Orion Weller <31665361+orionw@users.noreply.github.com> Date: Thu, 11 Jul 2024 11:28:40 -0700 Subject: [PATCH] Add HF File System Support to Streaming (#711) * init commit for hf * fix reqs * fix test * change error name; throw error; fix reqs * Update docs/source/how_to_guides/configure_cloud_storage_credentials.md Co-authored-by: Saaketh Narayan * fix test credential failure * cleanup * Remove duplicate `dbfs:` prefix from error message (#712) * fix typo in tests * docs * Try to figure out what is wrong with lint; test * fix os join * try to fix precommit * a (#713) * Upgrade ci_testing, remove codeql (#714) * linting_codeql * yo * yo * Wrap with nparray (#719) * Bump pydantic from 2.7.4 to 2.8.2 (#718) Bumps [pydantic](https://github.com/pydantic/pydantic) from 2.7.4 to 2.8.2. - [Release notes](https://github.com/pydantic/pydantic/releases) - [Changelog](https://github.com/pydantic/pydantic/blob/main/HISTORY.md) - [Commits](https://github.com/pydantic/pydantic/compare/v2.7.4...v2.8.2) --- updated-dependencies: - dependency-name: pydantic dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Xiaohan Zhang * Bump databricks-sdk from 0.28.0 to 0.29.0 (#715) Bumps [databricks-sdk](https://github.com/databricks/databricks-sdk-py) from 0.28.0 to 0.29.0. - [Release notes](https://github.com/databricks/databricks-sdk-py/releases) - [Changelog](https://github.com/databricks/databricks-sdk-py/blob/main/CHANGELOG.md) - [Commits](https://github.com/databricks/databricks-sdk-py/compare/v0.28.0...v0.29.0) --- updated-dependencies: - dependency-name: databricks-sdk dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Xiaohan Zhang * update docstring * add isort --------- Signed-off-by: dependabot[bot] Co-authored-by: Saaketh Narayan Co-authored-by: Vansh Singh Co-authored-by: bigning Co-authored-by: Saaketh Narayan Co-authored-by: Xiaohan Zhang Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .../configure_cloud_storage_credentials.md | 18 +++++ setup.py | 4 + streaming/base/storage/__init__.py | 17 +++-- streaming/base/storage/download.py | 27 +++++++ streaming/base/storage/upload.py | 76 +++++++++++++++++++ tests/conftest.py | 6 ++ tests/test_download.py | 20 ++++- tests/test_upload.py | 29 ++++++- 8 files changed, 187 insertions(+), 10 deletions(-) diff --git a/docs/source/how_to_guides/configure_cloud_storage_credentials.md b/docs/source/how_to_guides/configure_cloud_storage_credentials.md index 8431e5a9e..6c46679c3 100644 --- a/docs/source/how_to_guides/configure_cloud_storage_credentials.md +++ b/docs/source/how_to_guides/configure_cloud_storage_credentials.md @@ -7,6 +7,7 @@ Streaming dataset supports the following cloud storage providers to stream your - [Oracle Cloud Storage](#oracle-cloud-storage) - [Azure Blob Storage](#azure-blob-storage-and-azure-datalake) - [Databricks](#databricks) +- [Huggingface Datasets](#huggingface-datasets) ## Amazon S3 @@ -251,6 +252,23 @@ export AZURE_ACCOUNT_ACCESS_KEY='NN1KHxKKkj20ZO92EMiDQjx3wp2kZG4UUvfAGlgGWRn6sPR ``` ```` +## Huggingface Datasets + +To authenticate Huggingface Hub access, users must set their HuggingFace token ([HF_TOKEN](https://huggingface.co/docs/huggingface_hub/main/en/package_reference/environment_variables#hftoken)) in the run environment. See the [HF's documentation](https://huggingface.co/docs/huggingface_hub/guides/hf_file_system) on the URL format. + +Set the Huggingface token in the run environment as shown below + +````{tabs} +```{code-tab} py +import os +os.environ['HF_TOKEN'] = 'EXAMPLEFODNN7EXAMPLE' +``` + +```{code-tab} sh +export HF_TOKEN='EXAMPLEFODNN7EXAMPLE' +``` +```` + ## Databricks To authenticate Databricks access for both Unity Catalog and Databricks File System (DBFS), users must set their Databricks host (`DATABRICKS_HOST`) and access token (`DATABRICKS_TOKEN`) in the run environment. diff --git a/setup.py b/setup.py index ae15b31cd..4c3255c1c 100644 --- a/setup.py +++ b/setup.py @@ -123,6 +123,10 @@ 'AliPCS-Py>=0.8,<1', ] +extra_deps['hf'] = [ + 'huggingface_hub>=0.23.4,<0.24', +] + extra_deps['testing'] = [ 'mosaicml-cli>=0.5.25,<0.7', ] diff --git a/streaming/base/storage/__init__.py b/streaming/base/storage/__init__.py index e9653db9d..bfe9ce6f5 100644 --- a/streaming/base/storage/__init__.py +++ b/streaming/base/storage/__init__.py @@ -2,15 +2,14 @@ # SPDX-License-Identifier: Apache-2.0 """Base module for downloading/uploading files from/to cloud storage.""" - -from streaming.base.storage.download import (download_file, download_from_alipan, - download_from_azure, download_from_azure_datalake, - download_from_databricks_unity_catalog, - download_from_dbfs, download_from_gcs, - download_from_local, download_from_oci, - download_from_s3, download_from_sftp) +# isort: off +from streaming.base.storage.download import ( + download_file, download_from_alipan, download_from_azure, download_from_azure_datalake, + download_from_databricks_unity_catalog, download_from_dbfs, download_from_gcs, + download_from_hf, download_from_local, download_from_oci, download_from_s3, download_from_sftp) from streaming.base.storage.upload import (AzureDataLakeUploader, AzureUploader, CloudUploader, - GCSUploader, LocalUploader, OCIUploader, S3Uploader) + GCSUploader, HFUploader, LocalUploader, OCIUploader, + S3Uploader) __all__ = [ 'download_file', @@ -21,6 +20,7 @@ 'LocalUploader', 'AzureUploader', 'AzureDataLakeUploader', + 'HFUploader', 'download_from_s3', 'download_from_sftp', 'download_from_gcs', @@ -31,4 +31,5 @@ 'download_from_dbfs', 'download_from_alipan', 'download_from_local', + 'download_from_hf', ] diff --git a/streaming/base/storage/download.py b/streaming/base/storage/download.py index 9780836dc..e5392c3c6 100644 --- a/streaming/base/storage/download.py +++ b/streaming/base/storage/download.py @@ -19,6 +19,7 @@ 'download_from_oci', 'download_from_azure', 'download_from_azure_datalake', + 'download_from_hf', 'download_from_databricks_unity_catalog', 'download_from_dbfs', 'download_from_alipan', @@ -275,6 +276,30 @@ def download_from_oci(remote: str, local: str) -> None: os.rename(local_tmp, local) +def download_from_hf(remote: str, local: str) -> None: + """Download a file from remote Hugging Face to local. + + Args: + remote (str): Remote path (Hugging Face). + local (str): Local path (local filesystem). + """ + from huggingface_hub import hf_hub_download + + obj = urllib.parse.urlparse(remote) + if obj.scheme != 'hf': + raise ValueError(f'Expected remote path to start with `hf://`, got {remote}.') + + _, _, _, repo_org, repo_name, path = remote.split('/', 5) + local_dirname = os.path.dirname(local) + hf_hub_download(repo_id=f'{repo_org}/{repo_name}', + filename=path, + repo_type='dataset', + local_dir=local_dirname) + + downloaded_name = os.path.join(local_dirname, path) + os.rename(downloaded_name, local) + + def download_from_azure(remote: str, local: str) -> None: """Download a file from remote Microsoft Azure to local. @@ -514,6 +539,8 @@ def download_file(remote: Optional[str], local: str, timeout: float): download_from_gcs(remote, local) elif remote.startswith('oci://'): download_from_oci(remote, local) + elif remote.startswith('hf://'): + download_from_hf(remote, local) elif remote.startswith('azure://'): download_from_azure(remote, local) elif remote.startswith('azure-dl://'): diff --git a/streaming/base/storage/upload.py b/streaming/base/storage/upload.py index 6a8c67e3c..1c296bb89 100644 --- a/streaming/base/storage/upload.py +++ b/streaming/base/storage/upload.py @@ -24,6 +24,7 @@ 'S3Uploader', 'GCSUploader', 'OCIUploader', + 'HFUploader', 'AzureUploader', 'DatabricksUnityCatalogUploader', 'DBFSUploader', @@ -37,6 +38,7 @@ 's3': 'S3Uploader', 'gs': 'GCSUploader', 'oci': 'OCIUploader', + 'hf': 'HFUploader', 'azure': 'AzureUploader', 'azure-dl': 'AzureDataLakeUploader', 'dbfs:/Volumes': 'DatabricksUnityCatalogUploader', @@ -616,6 +618,80 @@ def list_objects(self, prefix: Optional[str] = None) -> Optional[List[str]]: return [] +class HFUploader(CloudUploader): + """Upload file from local machine to a Huggingface Dataset. + + Args: + out (str): Output dataset directory to save shard files. + + 1. If ``out`` is a local directory, shard files are saved locally. + 2. If ``out`` is a remote directory then the shard files are uploaded to the + remote location. + keep_local (bool): If the dataset is uploaded, whether to keep the local dataset + shard file or remove it after uploading. Defaults to ``False``. + progress_bar (bool): Display TQDM progress bars for uploading output dataset files to + a remote location. Default to ``False``. + retry (int): Number of times to retry uploading a file. Defaults to ``2``. + exist_ok (bool): When exist_ok = False, raise error if the local part of ``out`` already + exists and has contents. Defaults to ``False``. + """ + + def __init__(self, + out: str, + keep_local: bool = False, + progress_bar: bool = False, + retry: int = 2, + exist_ok: bool = False) -> None: + super().__init__(out, keep_local, progress_bar, retry, exist_ok) + + import huggingface_hub + self.api = huggingface_hub.HfApi() + self.fs = huggingface_hub.HfFileSystem(token=os.environ.get('HF_TOKEN', None)) + + obj = urllib.parse.urlparse(out) + if obj.scheme != 'hf': + raise ValueError(f'Expected remote path to start with `hf://`, got {out}.') + + _, _, _, self.repo_org, self.repo_name, self.path = out.split('/', 5) + self.dataset_id = os.path.join(self.repo_org, self.repo_name) + self.check_dataset_exists() # pyright: ignore + + def upload_file(self, filename: str): + """Upload file from local instance to HF. + + Args: + filename (str): File to upload. + """ + + @retry(num_attempts=self.retry) + def _upload_file(): + local_filename = filename + local_filename = local_filename.replace('\\', '/') + remote_filename = os.path.join('datasets', self.dataset_id, filename) + remote_filename = remote_filename.replace('\\', '/') + logger.debug(f'Uploading to {remote_filename}') + + with self.fs.open(remote_filename, 'wb') as f: + with open(local_filename, 'rb') as data: + f.write(data.read()) + + _upload_file() + + def check_dataset_exists(self): + """Raise an exception if the dataset does not exist. + + Raises: + error: Dataset does not exist. + """ + import huggingface_hub + try: + _ = list(huggingface_hub.list_repo_tree(self.dataset_id, repo_type='dataset')) + except Exception: + raise FileNotFoundError( + f'The HF dataset {self.dataset_id} could not be found. Please make sure ' + + f'that the dataset exists and you have the correct access permissions.') + + class AzureUploader(CloudUploader): """Upload file from local machine to Microsoft Azure bucket. diff --git a/tests/conftest.py b/tests/conftest.py index ac7844539..3b8a416c5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -51,6 +51,12 @@ def aws_credentials(): os.environ['AWS_SESSION_TOKEN'] = 'testing' +@pytest.fixture(scope='class', autouse=True) +def hf_credentials(): + """Mocked HF Credentials.""" + os.environ['HF_TOKEN'] = 'testing' + + @pytest.fixture() def s3_client(aws_credentials: Any): with mock_aws(): diff --git a/tests/test_download.py b/tests/test_download.py index 8d9a0c1b2..50bee57d1 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -14,7 +14,8 @@ download_from_azure_datalake, download_from_databricks_unity_catalog, download_from_dbfs, download_from_gcs, - download_from_local, download_from_s3) + download_from_hf, download_from_local, + download_from_s3) from tests.conftest import GCS_URL, MY_BUCKET, R2_URL MY_PREFIX = 'train' @@ -47,6 +48,15 @@ def test_invalid_cloud_prefix(self, remote_local_file: Any): download_from_azure_datalake(mock_remote_filepath, mock_local_filepath) +class TestHFClient: + + @pytest.mark.usefixtures('remote_local_file') + def test_invalid_cloud_prefix(self, remote_local_file: Any): + with pytest.raises(ValueError): + mock_remote_filepath, mock_local_filepath = remote_local_file(cloud_prefix='hf://') + download_from_hf(mock_remote_filepath, mock_local_filepath) + + class TestS3Client: @pytest.mark.usefixtures('s3_client', 's3_test', 'remote_local_file') @@ -183,6 +193,14 @@ def test_download_from_gcs_gets_called(self, mocked_requests: Mock, remote_local mocked_requests.assert_called_once() mocked_requests.assert_called_once_with(mock_remote_filepath, mock_local_filepath) + @patch('streaming.base.storage.download.download_from_hf') + @pytest.mark.usefixtures('remote_local_file') + def test_download_from_hf_gets_called(self, mocked_requests: Mock, remote_local_file: Any): + mock_remote_filepath, mock_local_filepath = remote_local_file(cloud_prefix='hf://') + download_file(mock_remote_filepath, mock_local_filepath, 60) + mocked_requests.assert_called_once() + mocked_requests.assert_called_once_with(mock_remote_filepath, mock_local_filepath) + @patch('streaming.base.storage.download.download_from_azure') @pytest.mark.usefixtures('remote_local_file') def test_download_from_azure_gets_called(self, mocked_requests: Mock, remote_local_file: Any): diff --git a/tests/test_upload.py b/tests/test_upload.py index 455b6b8c4..b280ac968 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -14,7 +14,7 @@ from streaming.base.storage.upload import (AlipanUploader, AzureDataLakeUploader, AzureUploader, CloudUploader, DatabricksUnityCatalogUploader, DBFSUploader, GCSAuthentication, GCSUploader, - LocalUploader, S3Uploader) + HFUploader, LocalUploader, S3Uploader) from tests.conftest import MY_BUCKET, R2_URL MY_PREFIX = 'train' @@ -425,6 +425,33 @@ def test_local_directory_is_empty(self, local_remote_dir: Tuple[str, str]): _ = AzureDataLakeUploader(out=local) +class TestHFUploader: + + @patch('streaming.base.storage.upload.HFUploader.check_dataset_exists') + @pytest.mark.usefixtures('hf_credentials') + @pytest.mark.parametrize('out', ['hf://datasets/org_name/repo_name/path']) + def test_instantiation(self, mocked_requests: Mock, out: Any): + mocked_requests.side_effect = None + _ = HFUploader(out=out) + if not isinstance(out, str): + shutil.rmtree(out[0], ignore_errors=True) + + @pytest.mark.parametrize('out', ['ss4://container/dir']) + def test_invalid_remote_str(self, out: str): + with pytest.raises(ValueError, match=f'Invalid Cloud provider prefix.*'): + _ = HFUploader(out=out) + + def test_local_directory_is_empty(self, local_remote_dir: Tuple[str, str]): + with pytest.raises(FileExistsError, match=f'Directory is not empty.*'): + local, _ = local_remote_dir + os.makedirs(local, exist_ok=True) + local_file_path = os.path.join(local, 'file.txt') + # Creating an empty file at specified location + with open(local_file_path, 'w') as _: + pass + _ = HFUploader(out=local) + + class TestDatabricksUnityCatalogUploader: @patch('streaming.base.storage.upload.DatabricksUploader._create_workspace_client')