Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Apply obstore as storage backend #3033

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ RUN SETUPTOOLS_SCM_PRETEND_VERSION_FOR_FLYTEKIT=$PSEUDO_VERSION \
-e /flytekit \
-e /flytekit/plugins/flytekit-deck-standard \
-e /flytekit/plugins/flytekit-flyteinteractive \
obstore==0.3.0b9 \
markdown \
pandas \
pillow \
Expand Down
165 changes: 133 additions & 32 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import tempfile
import typing
from time import sleep
from typing import Any, Dict, Optional, Union, cast
from typing import Any, Dict, Optional, Tuple, Union, cast
from uuid import UUID

import fsspec
from decorator import decorator
from fsspec.asyn import AsyncFileSystem
from fsspec.utils import get_protocol
from obstore.fsspec import AsyncFsspecStore
from obstore.store import AzureStore, GCSStore, S3Store
from typing_extensions import Unpack

from flytekit import configuration
Expand All @@ -46,47 +48,128 @@

# Refer to https://github.com/fsspec/s3fs/blob/50bafe4d8766c3b2a4e1fc09669cf02fb2d71454/s3fs/core.py#L198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update this link if we're going to change the args.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I updated the link in the new commit

# for key and secret
_FSSPEC_S3_KEY_ID = "key"
_FSSPEC_S3_SECRET = "secret"
_FSSPEC_S3_KEY_ID = "access_key_id"
_FSSPEC_S3_SECRET = "secret_access_key"
_ANON = "anon"

Uploadable = typing.Union[str, os.PathLike, pathlib.Path, bytes, io.BufferedReader, io.BytesIO, io.StringIO]


def s3_setup_args(s3_cfg: configuration.S3Config, anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {
"cache_regions": True,
}
def s3_setup_args(s3_cfg: configuration.S3Config, bucket: str = "", anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}
store_kwargs: Dict[str, Any] = {}

if s3_cfg.access_key_id:
kwargs[_FSSPEC_S3_KEY_ID] = s3_cfg.access_key_id
store_kwargs[_FSSPEC_S3_KEY_ID] = s3_cfg.access_key_id

if s3_cfg.secret_access_key:
kwargs[_FSSPEC_S3_SECRET] = s3_cfg.secret_access_key
store_kwargs[_FSSPEC_S3_SECRET] = s3_cfg.secret_access_key

# S3fs takes this as a special arg
if s3_cfg.endpoint is not None:
kwargs["client_kwargs"] = {"endpoint_url": s3_cfg.endpoint}
store_kwargs["endpoint_url"] = s3_cfg.endpoint
# kwargs["client_kwargs"] = {"endpoint_url": s3_cfg.endpoint}

store = S3Store.from_env(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we cache these setup args functions? i think each call to S3Store is creating a new client underneath the hood in the object store library. let's add lru_cache to this call? @pingsutw

bucket,
config={
**store_kwargs,
"aws_allow_http": "true", # Allow HTTP connections
"aws_virtual_hosted_style_request": "false", # Use path-style addressing
},
)

if anonymous:
kwargs[_ANON] = True

kwargs["store"] = store

return kwargs


def azure_setup_args(azure_cfg: configuration.AzureBlobStorageConfig, anonymous: bool = False) -> Dict[str, Any]:
def gs_setup_args(gcs_cfg: configuration.GCSConfig, bucket: str = "", anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}

store = GCSStore.from_env(
bucket,
)

if anonymous:
kwargs["token"] = _ANON

kwargs["store"] = store

return kwargs


def split_path(path: str) -> Tuple[str, str]:
"""
Split bucket and file path

Parameters
----------
path : string
Input path, like `s3://mybucket/path/to/file`

Examples
--------
>>> split_path("s3://mybucket/path/to/file")
['mybucket', 'path/to/file']
"""
if "file" in path:
# no bucket for file
return "", path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve file protocol detection precision

The condition if "file" in path may match paths containing 'file' anywhere in the string, not just the protocol. Consider using if get_protocol(path) == "file" for more precise protocol checking.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if "file" in path:
# no bucket for file
return "", path
if get_protocol(path) == "file":
# no bucket for file
return "", path

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

protocol = get_protocol(path)
if path.startswith(protocol + "://"):
path = path[len(protocol) + 3 :]
elif path.startswith(protocol + "::"):
path = path[len(protocol) + 2 :]
path = path.strip("/")

if "/" not in path:
return path, ""
else:
path_li = path.split("/")
bucket = path_li[0]
# use obstore for s3 and gcs only now, no need to split
# bucket out of path for other storage
support_types = ["s3", "gs", "abfs"]
if protocol in support_types:
file_path = "/".join(path_li[1:])
return (bucket, file_path)
else:
return bucket, path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving storage types to constant

The list of supported storage types support_types = ['s3', 'gs', 'abfs'] could be defined as a module-level constant since it's used for validation. Consider moving it outside the function to improve maintainability.

Code suggestion
Check the AI-generated fix before applying
 @@ -53,1 +53,2 @@
  _ANON = "anon"
 +SUPPORTED_STORAGE_TYPES = ["s3", "gs", "abfs"]
 @@ -136,2 +136,1 @@
 -        support_types = ["s3", "gs", "abfs"]
 -        if protocol in support_types:
 +        if protocol in SUPPORTED_STORAGE_TYPES:

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged



def azure_setup_args(
azure_cfg: configuration.AzureBlobStorageConfig, container: str = "", anonymous: bool = False
) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}
store_kwargs: Dict[str, Any] = {}

if azure_cfg.account_name:
kwargs["account_name"] = azure_cfg.account_name
store_kwargs["account_name"] = azure_cfg.account_name
if azure_cfg.account_key:
kwargs["account_key"] = azure_cfg.account_key
store_kwargs["account_key"] = azure_cfg.account_key
if azure_cfg.client_id:
kwargs["client_id"] = azure_cfg.client_id
store_kwargs["client_id"] = azure_cfg.client_id
if azure_cfg.client_secret:
kwargs["client_secret"] = azure_cfg.client_secret
store_kwargs["client_secret"] = azure_cfg.client_secret
if azure_cfg.tenant_id:
kwargs["tenant_id"] = azure_cfg.tenant_id
kwargs[_ANON] = anonymous
store_kwargs["tenant_id"] = azure_cfg.tenant_id

store = AzureStore.from_env(
container,
config={
**store_kwargs,
},
)

kwargs["store"] = store

if anonymous:
kwargs[_ANON] = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using anonymous parameter for _ANON

Consider using kwargs[_ANON] = anonymous instead of hardcoding True to maintain consistency with the input parameter value.

Code suggestion
Check the AI-generated fix before applying
Suggested change
kwargs[_ANON] = True
kwargs[_ANON] = anonymous

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


return kwargs


Expand Down Expand Up @@ -189,21 +272,27 @@ def get_filesystem(
protocol: typing.Optional[str] = None,
anonymous: bool = False,
path: typing.Optional[str] = None,
bucket: str = "",
**kwargs,
) -> fsspec.AbstractFileSystem:
# TODO: add bucket to adlfs
if not protocol:
return self._default_remote
if protocol == "file":
kwargs["auto_mkdir"] = True
return FlyteLocalFileSystem(**kwargs)
elif protocol == "s3":
s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
s3kwargs = s3_setup_args(self._data_config.s3, bucket, anonymous=anonymous)
s3kwargs.update(kwargs)
return fsspec.filesystem(protocol, **s3kwargs) # type: ignore
elif protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
return fsspec.filesystem(protocol, **kwargs) # type: ignore
gskwargs = gs_setup_args(self._data_config.gcs, bucket, anonymous=anonymous)
gskwargs.update(kwargs)
return fsspec.filesystem(protocol, **gskwargs) # type: ignore
elif protocol == "abfs":
azkwargs = azure_setup_args(self._data_config.azure, bucket, anonymous=anonymous)
azkwargs.update(kwargs)
return fsspec.filesystem(protocol, **azkwargs) # type: ignore
elif protocol == "ftp":
kwargs.update(fsspec.implementations.ftp.FTPFileSystem._get_kwargs_from_urls(path))
return fsspec.filesystem(protocol, **kwargs)
Expand All @@ -216,16 +305,20 @@ def get_filesystem(
return fsspec.filesystem(protocol, **kwargs)

async def get_async_filesystem_for_path(
self, path: str = "", anonymous: bool = False, **kwargs
self, path: str = "", bucket: str = "", anonymous: bool = False, **kwargs
) -> Union[AsyncFileSystem, fsspec.AbstractFileSystem]:
protocol = get_protocol(path)
loop = asyncio.get_running_loop()

return self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs)
return self.get_filesystem(
protocol, anonymous=anonymous, path=path, bucket=bucket, asynchronous=True, loop=loop, **kwargs
)

def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem:
def get_filesystem_for_path(
self, path: str = "", bucket: str = "", anonymous: bool = False, **kwargs
) -> fsspec.AbstractFileSystem:
protocol = get_protocol(path)
return self.get_filesystem(protocol, anonymous=anonymous, path=path, **kwargs)
return self.get_filesystem(protocol, anonymous=anonymous, path=path, bucket=bucket, **kwargs)

@staticmethod
def is_remote(path: Union[str, os.PathLike]) -> bool:
Expand Down Expand Up @@ -295,7 +388,8 @@ def exists(self, path: str) -> bool:

@retry_request
async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
file_system = await self.get_async_filesystem_for_path(from_path)
bucket, from_path_file_only = split_path(from_path)
file_system = await self.get_async_filesystem_for_path(from_path, bucket)
Comment on lines +399 to +400
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle empty bucket case for storage

Consider handling the case where split_path() returns empty bucket for non-file protocols. Currently passing empty bucket to get_async_filesystem_for_path() could cause issues with cloud storage access.

Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, from_path_file_only = split_path(from_path)
file_system = await self.get_async_filesystem_for_path(from_path, bucket)
bucket, from_path_file_only = split_path(from_path)
protocol = get_protocol(from_path)
if protocol not in ['file'] and not bucket:
raise ValueError(f'Empty bucket not allowed for protocol {protocol}')
file_system = await self.get_async_filesystem_for_path(from_path, bucket)

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

if recursive:
from_path, to_path = self.recursive_paths(from_path, to_path)
try:
Expand All @@ -307,7 +401,7 @@ async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwa
)
logger.info(f"Getting {from_path} to {to_path}")
if isinstance(file_system, AsyncFileSystem):
dst = await file_system._get(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212
dst = await file_system._get(from_path_file_only, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212
else:
dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
Expand Down Expand Up @@ -336,7 +430,8 @@ async def _put(self, from_path: str, to_path: str, recursive: bool = False, **kw
More of an internal function to be called by put_data and put_raw_data
This does not need a separate sync function.
"""
file_system = await self.get_async_filesystem_for_path(to_path)
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
Comment on lines +446 to +447
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating bucket before filesystem call

Consider validating the bucket parameter before passing it to get_async_filesystem_for_path(). An empty bucket could cause issues with certain storage backends. Similar issues were also found in:

  • flytekit/core/data_persistence.py (line 318)
  • flytekit/core/data_persistence.py (line 521)
  • flytekit/core/data_persistence.py (line 308)
Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
bucket, to_path_file_only = split_path(to_path)
protocol = get_protocol(to_path)
if protocol in ['s3', 'gs', 'abfs'] and not bucket:
raise ValueError(f'Bucket cannot be empty for {protocol} protocol')
file_system = await self.get_async_filesystem_for_path(to_path, bucket)

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +446 to +447
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting path splitting logic

Consider extracting the bucket and path splitting logic into a separate method to improve code reusability and maintainability. The split_path function is used in multiple places and could be encapsulated better.

Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
bucket, path = self._split_and_get_bucket_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

from_path = self.strip_file_header(from_path)
if recursive:
# Only check this for the local filesystem
Expand All @@ -354,7 +449,7 @@ async def _put(self, from_path: str, to_path: str, recursive: bool = False, **kw
kwargs["metadata"] = {}
kwargs["metadata"].update(self._execution_metadata)
if isinstance(file_system, AsyncFileSystem):
dst = await file_system._put(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212
dst = await file_system._put(from_path, to_path_file_only, recursive=recursive, **kwargs) # pylint: disable=W0212
else:
dst = file_system.put(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
Expand Down Expand Up @@ -423,11 +518,13 @@ async def async_put_raw_data(
r = await self._put(from_path, to_path, **kwargs)
return r or to_path

bucket, _ = split_path(to_path)

# See https://github.com/fsspec/s3fs/issues/871 for more background and pending work on the fsspec side to
# support effectively async open(). For now these use-cases below will revert to sync calls.
# raw bytes
if isinstance(lpath, bytes):
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)
with fs.open(to_path, "wb", **kwargs) as s:
s.write(lpath)
return to_path
Expand All @@ -436,7 +533,7 @@ async def async_put_raw_data(
if isinstance(lpath, io.BufferedReader) or isinstance(lpath, io.BytesIO):
if not lpath.readable():
raise FlyteAssertion("Buffered reader must be readable")
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)
lpath.seek(0)
with fs.open(to_path, "wb", **kwargs) as s:
while data := lpath.read(read_chunk_size_bytes):
Expand All @@ -446,7 +543,7 @@ async def async_put_raw_data(
if isinstance(lpath, io.StringIO):
if not lpath.readable():
raise FlyteAssertion("Buffered reader must be readable")
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)
lpath.seek(0)
with fs.open(to_path, "wb", **kwargs) as s:
while data_str := lpath.read(read_chunk_size_bytes):
Expand Down Expand Up @@ -635,6 +732,10 @@ async def async_put_data(
put_data = loop_manager.synced(async_put_data)


fsspec.register_implementation("s3", AsyncFsspecStore)
fsspec.register_implementation("gs", AsyncFsspecStore)
fsspec.register_implementation("abfs", AsyncFsspecStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider relocating fsspec implementation registrations

Consider moving the fsspec implementation registrations to a more appropriate initialization location, such as a module-level __init__.py or a dedicated setup function. This would improve code organization and make the registrations more discoverable.

Code suggestion
Check the AI-generated fix before applying
Suggested change
fsspec.register_implementation("s3", AsyncFsspecStore)
fsspec.register_implementation("gs", AsyncFsspecStore)
fsspec.register_implementation("abfs", AsyncFsspecStore)
def register_fsspec_implementations():
fsspec.register_implementation("s3", AsyncFsspecStore)
fsspec.register_implementation("gs", AsyncFsspecStore)
fsspec.register_implementation("abfs", AsyncFsspecStore)
# Call during module initialization
register_fsspec_implementations()

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


flyte_tmp_dir = tempfile.mkdtemp(prefix="flyte-")
default_local_file_access_provider = FileAccessProvider(
local_sandbox_dir=os.path.join(flyte_tmp_dir, "sandbox"),
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"marshmallow-jsonschema>=0.12.0",
"mashumaro>=3.15",
"msgpack>=1.1.0",
"obstore==0.3.0b9",
"protobuf!=4.25.0",
"pygments",
"python-json-logger>=2.0.0",
Expand Down
Loading