Skip to content

Commit

Permalink
reveiw addressed
Browse files Browse the repository at this point in the history
  • Loading branch information
konstntokas committed Dec 13, 2024
1 parent a7409f6 commit 03973f9
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 229 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

examples/preload_cache/
examples/zenodo_cache/
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ channels:
dependencies:
# Required
- python>=3.10
- IPython
- fsspec
- numpy
- requests
- tabulate
Expand Down
188 changes: 93 additions & 95 deletions examples/zenodo_data_store.ipynb

Large diffs are not rendered by default.

156 changes: 78 additions & 78 deletions examples/zenodo_data_store_preload.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ readme = {file = "README.md", content-type = "text/markdown"}
license = {text = "MIT"}
requires-python = ">=3.10"
dependencies = [
"IPython",
"fsspec",
"numpy",
"requests",
"tabulate",
Expand Down
4 changes: 2 additions & 2 deletions xcube_zenodo/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from typing import Any, Container, Union
from typing import Any, Container, Optional

from .constants import COMPRESSED_FORMATS
from .constants import MAP_FILE_EXTENSION_FORMAT
Expand Down Expand Up @@ -66,7 +66,7 @@ def get_attrs_from_record(
return attrs


def identify_file_format(data_id: str) -> Union[str, None]:
def identify_file_format(data_id: str) -> Optional[str]:
for key, val in MAP_FILE_EXTENSION_FORMAT.items():
if data_id.endswith(key.lower()):
return val
Expand Down
6 changes: 5 additions & 1 deletion xcube_zenodo/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import logging


DATA_STORE_ID = "zenodo"
LOG = logging.getLogger("xcube.zenodo")
API_RECORDS_ENDPOINT = "https://zenodo.org/api/records"
PRELOAD_CACHE_FOLDER = "preload_cache/"
CACHE_FOLDER_NAME = "zenodo_cache"

MAP_FILE_EXTENSION_FORMAT = {
"zarr": "zarr",
Expand Down
73 changes: 36 additions & 37 deletions xcube_zenodo/preload.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import logging
import os
import shutil
import threading
import time
from typing import Callable, Union
from typing import Callable
import tarfile
import zipfile

import IPython
import IPython.display
import fsspec
import numpy as np
import tabulate
import requests
Expand All @@ -39,13 +36,12 @@

from ._utils import identify_file_format
from ._utils import translate_data_id2uri

LOG = logging.getLogger(__name__)
from .constants import LOG


class Event:

def __init__(self, data_id: str, total_size: Union[int, float]):
def __init__(self, data_id: str, total_size: int | float):
self.data_id = data_id
self.status = "Not started"
self.progress = 0.0
Expand Down Expand Up @@ -73,12 +69,13 @@ def __init__(self, cache_store: MutableDataStore, *data_ids: str, **preload_para
self._is_cancelled = False
self._is_closed = False
self._cache_store = cache_store
self._cache_fs: fsspec.AbstractFileSystem = cache_store.fs
self._cache_root = cache_store.root
self._data_ids = data_ids
self._preload_params = preload_params
self._cache_root = preload_params.pop("cache_root")
self._download_folder_name = "downloads"
self._download_folder = os.path.join(
self._cache_root, self._download_folder_name
self._download_folder = self._cache_fs.sep.join(

Check warning on line 77 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L69-L77

Added lines #L69 - L77 were not covered by tests
[self._cache_root, self._download_folder_name]
)
self._events = [Event(data_id, np.nan) for data_id in data_ids]
self.lock = threading.Lock()
Expand Down Expand Up @@ -123,7 +120,7 @@ def close(self):
]
for data_id in list_data_ids_mod:
self._cache_store.delete_data(data_id)
if os.path.isdir(self._download_folder):
if self._cache_fs.isdir(self._download_folder):
shutil.rmtree(self._download_folder)

Check warning on line 124 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L121-L124

Added lines #L121 - L124 were not covered by tests

def _monitor_preload(self):
Expand All @@ -137,20 +134,15 @@ def _monitor_preload(self):
for event in self._events
]
if is_jupyter():
import IPython.display

Check warning on line 137 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L136-L137

Added lines #L136 - L137 were not covered by tests

table = tabulate.tabulate(

Check warning on line 139 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L139

Added line #L139 was not covered by tests
rows,
headers=["Data ID", "Status", "Progress", "Message"],
tablefmt="html",
)
IPython.display.clear_output(wait=True)
IPython.display.display(table)

Check warning on line 145 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L144-L145

Added lines #L144 - L145 were not covered by tests
else:
table = tabulate.tabulate(
rows,
headers=["Dataset", "Status", "Progress", "Message"],
)
os.system("clear" if os.name == "posix" else "cls")
print(table)

def preload_data(self, *data_ids: str, **preload_params):
self._download_data(*data_ids)
Expand Down Expand Up @@ -180,10 +172,12 @@ def download():
if not response.ok:
raise DataStoreError(response.raise_for_status())
record, filename = data_id.split("/")
record_folder = os.path.join(self._download_folder, record)
if not os.path.exists(record_folder):
os.makedirs(record_folder)
download_path = os.path.join(record_folder, filename)
record_folder = self._cache_fs.sep.join(

Check warning on line 175 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L166-L175

Added lines #L166 - L175 were not covered by tests
[self._download_folder, record]
)
if not self._cache_fs.isdir(record_folder):
self._cache_fs.makedirs(record_folder)
download_path = self._cache_fs.sep.join([record_folder, filename])
with open(download_path, "wb") as file:
for chunk in response.iter_content(chunk_size=chunk_size):
file.write(chunk)
Expand All @@ -209,26 +203,28 @@ def decompress():
time.sleep(1)
self._events[i].update("Decompression started", np.nan, "")
record, filename = data_id.split("/")
file_path = os.path.join(self._download_folder, record, filename)
file_path = self._cache_fs.sep.join(

Check warning on line 206 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L200-L206

Added lines #L200 - L206 were not covered by tests
[self._download_folder, record, filename]
)
if zipfile.is_zipfile(file_path):
with zipfile.ZipFile(file_path, "r") as zip_ref:
dirname = filename.replace(".zip", "")
extract_dir = os.path.join(
self._download_folder, record, dirname
extract_dir = self._cache_fs.sep.join(

Check warning on line 212 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L209-L212

Added lines #L209 - L212 were not covered by tests
[self._download_folder, record, dirname]
)
zip_ref.extractall(extract_dir)
elif file_path.endswith(".tar"):
with tarfile.open(file_path, "r") as tar_ref:
dirname = filename.replace(".tar", "")
extract_dir = os.path.join(
self._download_folder, record, dirname
extract_dir = self._cache_fs.sep.join(

Check warning on line 219 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L215-L219

Added lines #L215 - L219 were not covered by tests
[self._download_folder, record, dirname]
)
tar_ref.extractall(path=extract_dir)
elif file_path.endswith(".tar.gz"):
with tarfile.open(file_path, "r:gz") as tar_ref:
dirname = filename.replace(".tar.gz", "")
extract_dir = os.path.join(
self._download_folder, record, dirname
extract_dir = self._cache_fs.sep(

Check warning on line 226 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L222-L226

Added lines #L222 - L226 were not covered by tests
[self._download_folder, record, dirname]
)
tar_ref.extractall(path=extract_dir)
self._events[i].update("Decompressed", np.nan, "")
Expand All @@ -249,13 +245,13 @@ def prepare():
record, filename = data_id.split("/")
format_id = identify_file_format(data_id)
dirname = filename.replace(f".{format_id}", "")
extract_dir = os.path.join(self._download_folder, record, dirname)
extract_dir = self._cache_fs.sep.join(

Check warning on line 248 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L240-L248

Added lines #L240 - L248 were not covered by tests
[self._download_folder, record, dirname]
)
dss = []
sub_fnames = os.listdir(extract_dir)
for sub_fname in sub_fnames:
sub_data_id = (
f"{self._download_folder_name}/{record}/{dirname}/{sub_fname}"
)
sub_files = self._cache_fs.listdir(extract_dir)
for sub_file in sub_files:
sub_data_id = sub_file["name"].replace(f"{self._cache_root}/", "")
if not self._cache_store.has_data(sub_data_id):
LOG.debug(

Check warning on line 256 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L251-L256

Added lines #L251 - L256 were not covered by tests
f"File with data ID {sub_data_id} cannot be opened, "
Expand All @@ -272,7 +268,8 @@ def prepare():
ds, data_id, writer_id="dataset:zarr:file"
)
else:
for ds, sub_fname in zip(dss, sub_fnames):
for ds, sub_file in zip(dss, sub_files):
sub_fname = sub_file["name"].split("/")[-1]
data_id = (

Check warning on line 273 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L271-L273

Added lines #L271 - L273 were not covered by tests
f"{record}/{dirname}/"
f"{".".join(sub_fname.split(".")[:-1])}.zarr"
Expand All @@ -296,4 +293,6 @@ def prepare():


def is_jupyter():
import IPython

Check warning on line 296 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L296

Added line #L296 was not covered by tests

return "ZMQInteractiveShell" in IPython.get_ipython().__class__.__name__

Check warning on line 298 in xcube_zenodo/preload.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/preload.py#L298

Added line #L298 was not covered by tests
27 changes: 14 additions & 13 deletions xcube_zenodo/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import logging
import os
from typing import Tuple, Iterator, Container, Any, Union
from typing import Tuple, Iterator, Container, Any

import requests
import xarray as xr
Expand All @@ -40,29 +39,32 @@

from .constants import API_RECORDS_ENDPOINT
from .constants import COMPRESSED_FORMATS
from .constants import PRELOAD_CACHE_FOLDER
from .constants import CACHE_FOLDER_NAME
from .preload import PreloadHandle
from ._utils import identify_file_format
from ._utils import get_attrs_from_record
from ._utils import is_supported_file_format
from ._utils import is_supported_compressed_file_format
from ._utils import translate_data_id2fs_path


LOG = logging.getLogger(__name__)
from .constants import LOG


class ZenodoDataStore(DataStore):
"""Implementation of the Zenodo data store defined in the ``xcube_zenodo``
plugin."""

def __init__(self, access_token: str, preload_cache_folder: str = None):
def __init__(
self,
access_token: str,
cache_store_id: str = "file",
cache_store_params: dict = None,
):
self._requests_params = {"access_token": access_token}
self._https_data_store = new_data_store("https", root="zenodo.org")
self._cache_root = os.path.join(
os.getcwd(), preload_cache_folder or PRELOAD_CACHE_FOLDER
)
self.cache_store = new_data_store("file", root=self._cache_root, max_depth=3)
if cache_store_params is None:
cache_store_params = dict(root=CACHE_FOLDER_NAME)
cache_store_params["max_depth"] = cache_store_params.pop("max_depth", 3)
self.cache_store = new_data_store(cache_store_id, **cache_store_params)

@classmethod
def get_data_store_params_schema(cls) -> JsonObjectSchema:
Expand Down Expand Up @@ -95,7 +97,7 @@ def get_data_types_for_data(self, data_id: str) -> Tuple[str, ...]:

def get_data_ids(
self, data_type: DataTypeLike = None, include_attrs: Container[str] = None
) -> Union[Iterator[str], Iterator[tuple[str, dict[str, Any]]]]:
) -> Iterator[str] | Iterator[tuple[str, dict[str, Any]]]:
params = self._requests_params
page = 1
while True:
Expand Down Expand Up @@ -210,7 +212,6 @@ def preload_data(self, *data_ids: str, **preload_params) -> PreloadHandle:
preload_handle = PreloadHandle(

Check warning on line 212 in xcube_zenodo/store.py

View check run for this annotation

Codecov / codecov/patch

xcube_zenodo/store.py#L212

Added line #L212 was not covered by tests
self.cache_store,
*data_ids_sel,
cache_root=self._cache_root,
**preload_params,
)
if data_ids_sel:
Expand Down

0 comments on commit 03973f9

Please sign in to comment.