Skip to content

Commit

Permalink
first stab at caching metadata (#4)
Browse files Browse the repository at this point in the history
* first stab at caching metadata

* more work

* update variable

* fixes

* fix cloud path references

* need full path for key

* bugfixes

* move to normal dict

* cleanup on closedown

---------

Co-authored-by: marchowes <[email protected]>
  • Loading branch information
msmitherdc and Marchowes committed Aug 30, 2024
1 parent 3f10e03 commit 785f6a8
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cloudpathlib/cloudpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def __del__(self) -> None:
# ensure file removed from cache when cloudpath object deleted
if (
hasattr(self, "client")
and self.client.file_cache_mode == FileCacheMode.cloudpath_object
#and self.client.file_cache_mode == FileCacheMode.cloudpath_object
):
self.clear_cache()

Expand Down
2 changes: 2 additions & 0 deletions cloudpathlib/local/implementations/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class LocalS3Client(LocalClient):

_cloud_meta = local_s3_implementation

def clear_metadata_cache(self):
pass

LocalS3Client.S3Path = LocalS3Client.CloudPath # type: ignore

Expand Down
98 changes: 82 additions & 16 deletions cloudpathlib/s3/s3client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import dataclasses
import mimetypes
import os
from pathlib import Path, PurePosixPath
from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
Optional,
TYPE_CHECKING,
Tuple,
Union,
MutableMapping,
)
from weakref import WeakKeyDictionary

from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
Expand All @@ -18,6 +30,13 @@
except ModuleNotFoundError:
implementation_registry["s3"].dependencies_loaded = False

@dataclasses.dataclass
class PathMetadata:
is_file_or_dir: Optional[str]
etag: Optional[str]
size: Optional[int]
last_modified: Optional[str]


@register_client_class("s3")
class S3Client(Client):
Expand Down Expand Up @@ -126,6 +145,8 @@ def __init__(
for k in ["RequestPayer", "ExpectedBucketOwner"]
if k in self._extra_args
}

self._metadata_cache: MutableMapping[S3Path, PathMetadata] = dict()

super().__init__(
local_cache_dir=local_cache_dir,
Expand All @@ -135,17 +156,27 @@ def __init__(

def _get_metadata(self, cloud_path: S3Path) -> Dict[str, Any]:
# get accepts all download extra args
data = self.s3.ObjectSummary(cloud_path.bucket, cloud_path.key).get(
**self.boto3_dl_extra_args
)

return {
"last_modified": data["LastModified"],
"size": data["ContentLength"],
"etag": data["ETag"],
"content_type": data.get("ContentType", None),
"extra": data["Metadata"],
}
size = None if not self._metadata_cache.get(cloud_path) else self._metadata_cache[cloud_path].size
if size:
return {
"last_modified": self._metadata_cache[cloud_path].last_modified,
"size": size,
"etag": self._metadata_cache[cloud_path].etag,
"content_type": None,
"extra": None,
}
else:
data = self.s3.ObjectSummary(cloud_path.bucket, cloud_path.key).get(
**self.boto3_dl_extra_args
)
self._set_metadata_cache(cloud_path, "file", data["ETag"], data["ContentLength"], data["LastModified"])
return {
"last_modified": data["LastModified"],
"size": data["ContentLength"],
"etag": data["ETag"],
"content_type": data.get("ContentType", None),
"extra": data["Metadata"],
}

def _download_file(self, cloud_path: S3Path, local_path: Union[str, os.PathLike]) -> Path:
local_path = Path(local_path)
Expand Down Expand Up @@ -249,11 +280,16 @@ def _list_dir(self, cloud_path: S3Path, recursive=False) -> Iterable[Tuple[S3Pat
for result_key in result.get("Contents", []):
# yield all the parents of any key that have not been yielded already
o_relative_path = result_key.get("Key")[len(prefix) :]
etag = result_key.get("ETag")
size = result_key.get("Size")
last_modified = result_key.get("LastModified")
for parent in PurePosixPath(o_relative_path).parents:
parent_canonical = prefix + str(parent).rstrip("/")
if parent_canonical not in yielded_dirs and str(parent) != ".":
path = self.CloudPath(f"s3://{cloud_path.bucket}/{parent_canonical}")
self._set_metadata_cache(path, "dir", etag, size, last_modified)
yield (
self.CloudPath(f"s3://{cloud_path.bucket}/{parent_canonical}"),
path,
True,
)
yielded_dirs.add(parent_canonical)
Expand All @@ -265,16 +301,20 @@ def _list_dir(self, cloud_path: S3Path, recursive=False) -> Iterable[Tuple[S3Pat

# s3 fake directories have 0 size and end with "/"
if result_key.get("Key").endswith("/") and result_key.get("Size") == 0:
path = self.CloudPath(f"s3://{cloud_path.bucket}/{canonical}")
self._set_metadata_cache(path, "dir", etag, size, last_modified)
yield (
self.CloudPath(f"s3://{cloud_path.bucket}/{canonical}"),
path,
True,
)
yielded_dirs.add(canonical)

# yield object as file
else:
path = self.CloudPath(f"s3://{cloud_path.bucket}/{result_key.get('Key')}")
self._set_metadata_cache(path, "file", etag, size, last_modified)
yield (
self.CloudPath(f"s3://{cloud_path.bucket}/{result_key.get('Key')}"),
path,
False,
)

Expand All @@ -298,12 +338,14 @@ def _move_file(self, src: S3Path, dst: S3Path, remove_src: bool = True) -> S3Pat
)

if remove_src:
self._set_metadata_cache(src, None, None, None, None)
self._remove(src)
return dst

def _remove(self, cloud_path: S3Path, missing_ok: bool = True) -> None:
file_or_dir = self._is_file_or_dir(cloud_path=cloud_path)
if file_or_dir == "file":
self._set_metadata_cache(cloud_path, None, None, None, None)
resp = self.s3.Object(cloud_path.bucket, cloud_path.key).delete(
**self.boto3_list_extra_args
)
Expand All @@ -323,6 +365,13 @@ def _remove(self, cloud_path: S3Path, missing_ok: bool = True) -> None:
resp = bucket.objects.filter(Prefix=prefix, **self.boto3_list_extra_args).delete(
**self.boto3_list_extra_args
)

files = [
path for path, is_dir in self._list_dir(cloud_path, recursive=True) if not is_dir
]
for path in files:
self._set_metadata_cache(path, None, None, None, None)

if resp[0].get("ResponseMetadata").get("HTTPStatusCode") not in (204, 200):
raise CloudPathException(
f"Delete operation failed for {cloud_path} with response: {resp}"
Expand All @@ -348,6 +397,23 @@ def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: S3Path)

obj.upload_file(str(local_path), Config=self.boto3_transfer_config, ExtraArgs=extra_args)
return cloud_path


def _set_metadata_cache(self, cloud_path: S3Path, is_file_or_dir: Optional[str],
etag: Optional[str], size: Optional[int], last_modified: Optional[str]) -> None:
if is_file_or_dir is None:
self._metadata_cache[cloud_path] = PathMetadata(is_file_or_dir=is_file_or_dir,
etag=etag, size=size, last_modified=last_modified)
# If a file/dir is now known to not exist, its parent directories may no longer exist
# either, since cloud directories only exist if they have a file in them. Since their
# state is no longer known we remove them from the cache.
for parent in cloud_path.parents:
if parent in self._metadata_cache:
del self._metadata_cache[parent]
else:
self._metadata_cache[cloud_path] = PathMetadata(is_file_or_dir=is_file_or_dir,
etag=etag, size=size, last_modified=last_modified)

def clear_metadata_cache(self) -> None:
self._metadata_cache.clear()

S3Client.S3Path = S3Client.CloudPath # type: ignore

0 comments on commit 785f6a8

Please sign in to comment.