Skip to content

Commit

Permalink
fix: wrong file path for get_filesystem_for_path
Browse files Browse the repository at this point in the history
Signed-off-by: machichima <[email protected]>
  • Loading branch information
machichima committed Jan 5, 2025
1 parent 7ba66e2 commit 0ef7c05
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from fsspec.asyn import AsyncFileSystem
from fsspec.utils import get_protocol
from obstore.fsspec import AsyncFsspecStore
from obstore.store import GCSStore, S3Store, AzureStore
from obstore.store import AzureStore, GCSStore, S3Store
from typing_extensions import Unpack

from flytekit import configuration
Expand Down Expand Up @@ -86,6 +86,7 @@ def s3_setup_args(s3_cfg: configuration.S3Config, bucket: str = "", anonymous: b

return kwargs


def gs_setup_args(gcs_cfg: configuration.GCSConfig, bucket: str = "", anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}

Expand Down Expand Up @@ -140,7 +141,9 @@ def split_path(path: str) -> Tuple[str, str]:
return bucket, path


def azure_setup_args(azure_cfg: configuration.AzureBlobStorageConfig, container: str = "", anonymous: bool = False) -> Dict[str, Any]:
def azure_setup_args(
azure_cfg: configuration.AzureBlobStorageConfig, container: str = "", anonymous: bool = False
) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}
store_kwargs: Dict[str, Any] = {}

Expand Down Expand Up @@ -311,7 +314,9 @@ async def get_async_filesystem_for_path(
protocol, anonymous=anonymous, path=path, bucket=bucket, asynchronous=True, loop=loop, **kwargs
)

def get_filesystem_for_path(self, path: str = "", bucket: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem:
def get_filesystem_for_path(
self, path: str = "", bucket: str = "", anonymous: bool = False, **kwargs
) -> fsspec.AbstractFileSystem:
protocol = get_protocol(path)
return self.get_filesystem(protocol, anonymous=anonymous, path=path, bucket=bucket, **kwargs)

Expand Down Expand Up @@ -513,13 +518,13 @@ async def async_put_raw_data(
r = await self._put(from_path, to_path, **kwargs)
return r or to_path

bucket, to_path_file_only = split_path(to_path)
bucket, _ = split_path(to_path)

# See https://github.com/fsspec/s3fs/issues/871 for more background and pending work on the fsspec side to
# support effectively async open(). For now these use-cases below will revert to sync calls.
# raw bytes
if isinstance(lpath, bytes):
fs = self.get_filesystem_for_path(to_path_file_only, bucket)
fs = self.get_filesystem_for_path(to_path, bucket)
with fs.open(to_path, "wb", **kwargs) as s:
s.write(lpath)
return to_path
Expand All @@ -528,7 +533,7 @@ async def async_put_raw_data(
if isinstance(lpath, io.BufferedReader) or isinstance(lpath, io.BytesIO):
if not lpath.readable():
raise FlyteAssertion("Buffered reader must be readable")
fs = self.get_filesystem_for_path(to_path_file_only, bucket)
fs = self.get_filesystem_for_path(to_path, bucket)
lpath.seek(0)
with fs.open(to_path, "wb", **kwargs) as s:
while data := lpath.read(read_chunk_size_bytes):
Expand All @@ -538,7 +543,7 @@ async def async_put_raw_data(
if isinstance(lpath, io.StringIO):
if not lpath.readable():
raise FlyteAssertion("Buffered reader must be readable")
fs = self.get_filesystem_for_path(to_path_file_only, bucket)
fs = self.get_filesystem_for_path(to_path, bucket)
lpath.seek(0)
with fs.open(to_path, "wb", **kwargs) as s:
while data_str := lpath.read(read_chunk_size_bytes):
Expand Down

0 comments on commit 0ef7c05

Please sign in to comment.