Skip to content

Commit

Permalink
[ENH]: Allow for matching on the full path (#299)
Browse files Browse the repository at this point in the history
This adds a parameter to `Storage.walk`, allowing users to specify
whether the `matches` applies just to the filename or to the full
path, including the filename.
  • Loading branch information
Tom Augspurger authored Jun 11, 2024
1 parent 68f7822 commit 544c735
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 4 deletions.
6 changes: 5 additions & 1 deletion datasets/ecmwf-forecast/dataset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ collections:
- uri: blob://ai4edataeuwest/ecmwf/
chunks:
options:
matches: (enfo|oper|waef|wave)(?!-opendata)
# currently excluding "aifs", in favor of "ifs"
# Could put that in a different collection, or modify
# the stactools package.
matches: /ifs/(0p25|0p4-beta)/(enfo|oper|waef|wave)(?!-opendata)
match_full_path: true
extensions: [.grib2]
chunk_storage:
uri: blob://ai4edataeuwest/ecmwf-etl-data/pctasks/
4 changes: 4 additions & 0 deletions pctasks/core/pctasks/core/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def walk(
matches: Optional[str] = None,
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
match_full_path: bool = False,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
"""
Recursively walk storage.
Expand All @@ -87,6 +88,9 @@ def walk(
matches: Optional regex that path must match
walk_limit: Limit the number of times to yield
file_limit: Limit the number of files returned
match_full_path: bool, default False
Whether to match on just the file name segment of the path (the default) or
the entire path, including the base path.
Returns:
Generator of (path, files, folders) tuples. Similar to os.walk. Lists
Expand Down
13 changes: 11 additions & 2 deletions pctasks/core/pctasks/core/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ def walk(
matches: Optional[str] = None,
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
match_full_path: bool = False,
max_concurrency: int = 32,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
# Ensure UTC set
Expand Down Expand Up @@ -556,9 +557,17 @@ def _get_prefix_content(

for future in concurrent.futures.as_completed(futures):
full_prefix = futures[future]
folders, files = future.result()
folders, unfiltered_files = future.result()

files = [file for file in files if path_filter(file)]
files = []

for file in unfiltered_files:
if match_full_path:
match_on = "/".join([full_prefix.rstrip("/"), file])
else:
match_on = file
if path_filter(match_on):
files.append(file)

if file_limit and file_count + len(files) > file_limit:
files = files[: file_limit - file_count]
Expand Down
6 changes: 5 additions & 1 deletion pctasks/core/pctasks/core/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def walk(
matches: Optional[str] = None,
walk_limit: Optional[int] = None,
file_limit: Optional[int] = None,
match_full_path: bool = False,
) -> Generator[Tuple[str, List[str], List[str]], None, None]:
def _get_depth(path: str) -> int:
relpath = os.path.relpath(path, self.base_dir)
Expand All @@ -102,7 +103,10 @@ def _filter_file(root: str, p: str) -> bool:
if since_date:
if since_date > Datetime.fromtimestamp(os.path.getmtime(full_path)):
return False
return path_filter(p)
if match_full_path:
return path_filter(full_path)
else:
return path_filter(p)

for root, folders, files in os.walk(self.base_dir):

Expand Down
16 changes: 16 additions & 0 deletions pctasks/core/tests/storage/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,19 @@ def test_blob_download_timeout():
def test_maybe_rewrite_blob_storage_url(url, expected):
result = maybe_rewrite_blob_storage_url(url)
assert result == expected


def test_walk_match_full_path():
with temp_azurite_blob_storage(
HERE / ".." / "data-files" / "simple-assets"
) as storage:
result: Dict[str, Tuple[List[str], List[str]]] = {}
for root, folders, files in storage.walk(
matches="a/asset-.*.json", match_full_path=True
):
result[root] = (folders, files)

assert set(result.keys()) == {".", "a", "b"}
assert set(result["."][0]) == {"a", "b"}
assert set(result["a"][1]) == {"asset-a-1.json", "asset-a-2.json"}
assert set(result["b"][1]) == set()
12 changes: 12 additions & 0 deletions pctasks/core/tests/storage/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,15 @@ def test_fsspec_components():
storage = LocalStorage(HERE / ".." / "data-files" / "simple-assets")
assert storage.fsspec_storage_options == {}
assert storage.fsspec_path("foo/bar.csv") == "file://foo/bar.csv"


def test_walk_match_full_path():
storage = LocalStorage(HERE / ".." / "data-files" / "simple-assets")
subdirs = {
root: files
for root, _, files in storage.walk(
min_depth=1, max_depth=1, matches="a/asset-.*.json", match_full_path=True
)
}
assert subdirs["a"] == ["asset-a-1.json", "asset-a-2.json"]
assert subdirs["b"] == []
1 change: 1 addition & 0 deletions pctasks/dataset/pctasks/dataset/chunks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def create_chunks(
file_limit=input.options.limit,
max_depth=input.options.max_depth,
min_depth=input.options.min_depth,
match_full_path=input.options.match_full_path,
):
if input.options.list_folders:
gen = folders
Expand Down
3 changes: 3 additions & 0 deletions pctasks/dataset/pctasks/dataset/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class ChunkOptions(PCBaseModel):
matches: Optional[str] = None
"""Only include asset URIs that match this regex."""

match_full_path: bool = False
"""Whether to match on just the file name (the default) or the full path."""

limit: Optional[int] = None
"""Limit the number of URIs to process. """

Expand Down

0 comments on commit 544c735

Please sign in to comment.