Skip to content

Commit

Permalink
Add HF File System Support to Streaming (#711)
Browse files Browse the repository at this point in the history
* init commit for hf

* fix reqs

* fix test

* change error name; throw error; fix reqs

* Update docs/source/how_to_guides/configure_cloud_storage_credentials.md

Co-authored-by: Saaketh Narayan <[email protected]>

* fix test credential failure

* cleanup

* Remove duplicate `dbfs:` prefix from error message (#712)

* fix typo in tests

* docs

* Try to figure out what is wrong with lint; test

* fix os join

* try to fix precommit

* a (#713)

* Upgrade ci_testing, remove codeql (#714)

* linting_codeql

* yo

* yo

* Wrap with nparray (#719)

* Bump pydantic from 2.7.4 to 2.8.2 (#718)

Bumps [pydantic](https://github.com/pydantic/pydantic) from 2.7.4 to 2.8.2.
- [Release notes](https://github.com/pydantic/pydantic/releases)
- [Changelog](https://github.com/pydantic/pydantic/blob/main/HISTORY.md)
- [Commits](pydantic/pydantic@v2.7.4...v2.8.2)

---
updated-dependencies:
- dependency-name: pydantic
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Xiaohan Zhang <[email protected]>

* Bump databricks-sdk from 0.28.0 to 0.29.0 (#715)

Bumps [databricks-sdk](https://github.com/databricks/databricks-sdk-py) from 0.28.0 to 0.29.0.
- [Release notes](https://github.com/databricks/databricks-sdk-py/releases)
- [Changelog](https://github.com/databricks/databricks-sdk-py/blob/main/CHANGELOG.md)
- [Commits](databricks/databricks-sdk-py@v0.28.0...v0.29.0)

---
updated-dependencies:
- dependency-name: databricks-sdk
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Xiaohan Zhang <[email protected]>

* update docstring

* add isort

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Saaketh Narayan <[email protected]>
Co-authored-by: Vansh Singh <[email protected]>
Co-authored-by: bigning <[email protected]>
Co-authored-by: Saaketh Narayan <[email protected]>
Co-authored-by: Xiaohan Zhang <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
7 people authored Jul 11, 2024
1 parent 2f7defa commit 55e83ec
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 10 deletions.
18 changes: 18 additions & 0 deletions docs/source/how_to_guides/configure_cloud_storage_credentials.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Streaming dataset supports the following cloud storage providers to stream your
- [Oracle Cloud Storage](#oracle-cloud-storage)
- [Azure Blob Storage](#azure-blob-storage-and-azure-datalake)
- [Databricks](#databricks)
- [Huggingface Datasets](#huggingface-datasets)

## Amazon S3

Expand Down Expand Up @@ -251,6 +252,23 @@ export AZURE_ACCOUNT_ACCESS_KEY='NN1KHxKKkj20ZO92EMiDQjx3wp2kZG4UUvfAGlgGWRn6sPR
```
````

## Huggingface Datasets

To authenticate Huggingface Hub access, users must set their HuggingFace token ([HF_TOKEN](https://huggingface.co/docs/huggingface_hub/main/en/package_reference/environment_variables#hftoken)) in the run environment. See the [HF's documentation](https://huggingface.co/docs/huggingface_hub/guides/hf_file_system) on the URL format.

Set the Huggingface token in the run environment as shown below

````{tabs}
```{code-tab} py
import os
os.environ['HF_TOKEN'] = 'EXAMPLEFODNN7EXAMPLE'
```
```{code-tab} sh
export HF_TOKEN='EXAMPLEFODNN7EXAMPLE'
```
````

## Databricks

To authenticate Databricks access for both Unity Catalog and Databricks File System (DBFS), users must set their Databricks host (`DATABRICKS_HOST`) and access token (`DATABRICKS_TOKEN`) in the run environment.
Expand Down
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@
'AliPCS-Py>=0.8,<1',
]

extra_deps['hf'] = [
'huggingface_hub>=0.23.4,<0.24',
]

extra_deps['testing'] = [
'mosaicml-cli>=0.5.25,<0.7',
]
Expand Down
17 changes: 9 additions & 8 deletions streaming/base/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
# SPDX-License-Identifier: Apache-2.0

"""Base module for downloading/uploading files from/to cloud storage."""

from streaming.base.storage.download import (download_file, download_from_alipan,
download_from_azure, download_from_azure_datalake,
download_from_databricks_unity_catalog,
download_from_dbfs, download_from_gcs,
download_from_local, download_from_oci,
download_from_s3, download_from_sftp)
# isort: off
from streaming.base.storage.download import (
download_file, download_from_alipan, download_from_azure, download_from_azure_datalake,
download_from_databricks_unity_catalog, download_from_dbfs, download_from_gcs,
download_from_hf, download_from_local, download_from_oci, download_from_s3, download_from_sftp)
from streaming.base.storage.upload import (AzureDataLakeUploader, AzureUploader, CloudUploader,
GCSUploader, LocalUploader, OCIUploader, S3Uploader)
GCSUploader, HFUploader, LocalUploader, OCIUploader,
S3Uploader)

__all__ = [
'download_file',
Expand All @@ -21,6 +20,7 @@
'LocalUploader',
'AzureUploader',
'AzureDataLakeUploader',
'HFUploader',
'download_from_s3',
'download_from_sftp',
'download_from_gcs',
Expand All @@ -31,4 +31,5 @@
'download_from_dbfs',
'download_from_alipan',
'download_from_local',
'download_from_hf',
]
27 changes: 27 additions & 0 deletions streaming/base/storage/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
'download_from_oci',
'download_from_azure',
'download_from_azure_datalake',
'download_from_hf',
'download_from_databricks_unity_catalog',
'download_from_dbfs',
'download_from_alipan',
Expand Down Expand Up @@ -275,6 +276,30 @@ def download_from_oci(remote: str, local: str) -> None:
os.rename(local_tmp, local)


def download_from_hf(remote: str, local: str) -> None:
"""Download a file from remote Hugging Face to local.
Args:
remote (str): Remote path (Hugging Face).
local (str): Local path (local filesystem).
"""
from huggingface_hub import hf_hub_download

obj = urllib.parse.urlparse(remote)
if obj.scheme != 'hf':
raise ValueError(f'Expected remote path to start with `hf://`, got {remote}.')

_, _, _, repo_org, repo_name, path = remote.split('/', 5)
local_dirname = os.path.dirname(local)
hf_hub_download(repo_id=f'{repo_org}/{repo_name}',
filename=path,
repo_type='dataset',
local_dir=local_dirname)

downloaded_name = os.path.join(local_dirname, path)
os.rename(downloaded_name, local)


def download_from_azure(remote: str, local: str) -> None:
"""Download a file from remote Microsoft Azure to local.
Expand Down Expand Up @@ -514,6 +539,8 @@ def download_file(remote: Optional[str], local: str, timeout: float):
download_from_gcs(remote, local)
elif remote.startswith('oci://'):
download_from_oci(remote, local)
elif remote.startswith('hf://'):
download_from_hf(remote, local)
elif remote.startswith('azure://'):
download_from_azure(remote, local)
elif remote.startswith('azure-dl://'):
Expand Down
76 changes: 76 additions & 0 deletions streaming/base/storage/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'S3Uploader',
'GCSUploader',
'OCIUploader',
'HFUploader',
'AzureUploader',
'DatabricksUnityCatalogUploader',
'DBFSUploader',
Expand All @@ -37,6 +38,7 @@
's3': 'S3Uploader',
'gs': 'GCSUploader',
'oci': 'OCIUploader',
'hf': 'HFUploader',
'azure': 'AzureUploader',
'azure-dl': 'AzureDataLakeUploader',
'dbfs:/Volumes': 'DatabricksUnityCatalogUploader',
Expand Down Expand Up @@ -616,6 +618,80 @@ def list_objects(self, prefix: Optional[str] = None) -> Optional[List[str]]:
return []


class HFUploader(CloudUploader):
"""Upload file from local machine to a Huggingface Dataset.
Args:
out (str): Output dataset directory to save shard files.
1. If ``out`` is a local directory, shard files are saved locally.
2. If ``out`` is a remote directory then the shard files are uploaded to the
remote location.
keep_local (bool): If the dataset is uploaded, whether to keep the local dataset
shard file or remove it after uploading. Defaults to ``False``.
progress_bar (bool): Display TQDM progress bars for uploading output dataset files to
a remote location. Default to ``False``.
retry (int): Number of times to retry uploading a file. Defaults to ``2``.
exist_ok (bool): When exist_ok = False, raise error if the local part of ``out`` already
exists and has contents. Defaults to ``False``.
"""

def __init__(self,
out: str,
keep_local: bool = False,
progress_bar: bool = False,
retry: int = 2,
exist_ok: bool = False) -> None:
super().__init__(out, keep_local, progress_bar, retry, exist_ok)

import huggingface_hub
self.api = huggingface_hub.HfApi()
self.fs = huggingface_hub.HfFileSystem(token=os.environ.get('HF_TOKEN', None))

obj = urllib.parse.urlparse(out)
if obj.scheme != 'hf':
raise ValueError(f'Expected remote path to start with `hf://`, got {out}.')

_, _, _, self.repo_org, self.repo_name, self.path = out.split('/', 5)
self.dataset_id = os.path.join(self.repo_org, self.repo_name)
self.check_dataset_exists() # pyright: ignore

def upload_file(self, filename: str):
"""Upload file from local instance to HF.
Args:
filename (str): File to upload.
"""

@retry(num_attempts=self.retry)
def _upload_file():
local_filename = filename
local_filename = local_filename.replace('\\', '/')
remote_filename = os.path.join('datasets', self.dataset_id, filename)
remote_filename = remote_filename.replace('\\', '/')
logger.debug(f'Uploading to {remote_filename}')

with self.fs.open(remote_filename, 'wb') as f:
with open(local_filename, 'rb') as data:
f.write(data.read())

_upload_file()

def check_dataset_exists(self):
"""Raise an exception if the dataset does not exist.
Raises:
error: Dataset does not exist.
"""
import huggingface_hub
try:
_ = list(huggingface_hub.list_repo_tree(self.dataset_id, repo_type='dataset'))
except Exception:
raise FileNotFoundError(
f'The HF dataset {self.dataset_id} could not be found. Please make sure ' +
f'that the dataset exists and you have the correct access permissions.')


class AzureUploader(CloudUploader):
"""Upload file from local machine to Microsoft Azure bucket.
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ def aws_credentials():
os.environ['AWS_SESSION_TOKEN'] = 'testing'


@pytest.fixture(scope='class', autouse=True)
def hf_credentials():
"""Mocked HF Credentials."""
os.environ['HF_TOKEN'] = 'testing'


@pytest.fixture()
def s3_client(aws_credentials: Any):
with mock_aws():
Expand Down
20 changes: 19 additions & 1 deletion tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
download_from_azure_datalake,
download_from_databricks_unity_catalog,
download_from_dbfs, download_from_gcs,
download_from_local, download_from_s3)
download_from_hf, download_from_local,
download_from_s3)
from tests.conftest import GCS_URL, MY_BUCKET, R2_URL

MY_PREFIX = 'train'
Expand Down Expand Up @@ -47,6 +48,15 @@ def test_invalid_cloud_prefix(self, remote_local_file: Any):
download_from_azure_datalake(mock_remote_filepath, mock_local_filepath)


class TestHFClient:

@pytest.mark.usefixtures('remote_local_file')
def test_invalid_cloud_prefix(self, remote_local_file: Any):
with pytest.raises(ValueError):
mock_remote_filepath, mock_local_filepath = remote_local_file(cloud_prefix='hf://')
download_from_hf(mock_remote_filepath, mock_local_filepath)


class TestS3Client:

@pytest.mark.usefixtures('s3_client', 's3_test', 'remote_local_file')
Expand Down Expand Up @@ -183,6 +193,14 @@ def test_download_from_gcs_gets_called(self, mocked_requests: Mock, remote_local
mocked_requests.assert_called_once()
mocked_requests.assert_called_once_with(mock_remote_filepath, mock_local_filepath)

@patch('streaming.base.storage.download.download_from_hf')
@pytest.mark.usefixtures('remote_local_file')
def test_download_from_hf_gets_called(self, mocked_requests: Mock, remote_local_file: Any):
mock_remote_filepath, mock_local_filepath = remote_local_file(cloud_prefix='hf://')
download_file(mock_remote_filepath, mock_local_filepath, 60)
mocked_requests.assert_called_once()
mocked_requests.assert_called_once_with(mock_remote_filepath, mock_local_filepath)

@patch('streaming.base.storage.download.download_from_azure')
@pytest.mark.usefixtures('remote_local_file')
def test_download_from_azure_gets_called(self, mocked_requests: Mock, remote_local_file: Any):
Expand Down
29 changes: 28 additions & 1 deletion tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from streaming.base.storage.upload import (AlipanUploader, AzureDataLakeUploader, AzureUploader,
CloudUploader, DatabricksUnityCatalogUploader,
DBFSUploader, GCSAuthentication, GCSUploader,
LocalUploader, S3Uploader)
HFUploader, LocalUploader, S3Uploader)
from tests.conftest import MY_BUCKET, R2_URL

MY_PREFIX = 'train'
Expand Down Expand Up @@ -425,6 +425,33 @@ def test_local_directory_is_empty(self, local_remote_dir: Tuple[str, str]):
_ = AzureDataLakeUploader(out=local)


class TestHFUploader:

@patch('streaming.base.storage.upload.HFUploader.check_dataset_exists')
@pytest.mark.usefixtures('hf_credentials')
@pytest.mark.parametrize('out', ['hf://datasets/org_name/repo_name/path'])
def test_instantiation(self, mocked_requests: Mock, out: Any):
mocked_requests.side_effect = None
_ = HFUploader(out=out)
if not isinstance(out, str):
shutil.rmtree(out[0], ignore_errors=True)

@pytest.mark.parametrize('out', ['ss4://container/dir'])
def test_invalid_remote_str(self, out: str):
with pytest.raises(ValueError, match=f'Invalid Cloud provider prefix.*'):
_ = HFUploader(out=out)

def test_local_directory_is_empty(self, local_remote_dir: Tuple[str, str]):
with pytest.raises(FileExistsError, match=f'Directory is not empty.*'):
local, _ = local_remote_dir
os.makedirs(local, exist_ok=True)
local_file_path = os.path.join(local, 'file.txt')
# Creating an empty file at specified location
with open(local_file_path, 'w') as _:
pass
_ = HFUploader(out=local)


class TestDatabricksUnityCatalogUploader:

@patch('streaming.base.storage.upload.DatabricksUploader._create_workspace_client')
Expand Down

0 comments on commit 55e83ec

Please sign in to comment.