From aaf84da13077266591bc3eb870f1bec535f2bdb4 Mon Sep 17 00:00:00 2001 From: Sherwin-14 Date: Sat, 24 Aug 2024 16:20:33 +0530 Subject: [PATCH 01/11] added fail_fast --- earthaccess/store.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/earthaccess/store.py b/earthaccess/store.py index fd399d44..4e751790 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -69,7 +69,7 @@ def multi_thread_open(data: tuple[str, Optional[DataGranule]]) -> EarthAccessFil url, granule = data return EarthAccessFile(fs.open(url), granule) # type: ignore - fileset = pqdm(url_mapping.items(), multi_thread_open, n_jobs=threads) + fileset = pqdm(url_mapping.items(), multi_thread_open, n_jobs=threads,exception_behaviour=exception_behavior) return fileset @@ -447,6 +447,7 @@ def _open_urls( url_mapping, fs=s3_fs, threads=threads, + fail_fast = fail_fast ) except Exception as e: raise RuntimeError( @@ -544,6 +545,7 @@ def _get_urls( local_path: Path, provider: Optional[str] = None, threads: int = 8, + fail_fast: bool = True ) -> List[str]: data_links = granules downloaded_files: List = [] @@ -565,7 +567,7 @@ def _get_urls( else: # if we are not in AWS - return self._download_onprem_granules(data_links, local_path, threads) + return self._download_onprem_granules(data_links, local_path, threads,fail_fast=fail_fast) @_get.register def _get_granules( @@ -574,6 +576,7 @@ def _get_granules( local_path: Path, provider: Optional[str] = None, threads: int = 8, + fail_fast: bool = True ) -> List[str]: data_links: List = [] downloaded_files: List = [] @@ -614,7 +617,7 @@ def _get_granules( else: # if the data are cloud-based, but we are not in AWS, # it will be downloaded as if it was on prem - return self._download_onprem_granules(data_links, local_path, threads) + return self._download_onprem_granules(data_links, local_path, threads,fail_fast=fail_fast) def _download_file(self, url: str, directory: Path) -> str: """Download a single file from an on-prem location, a DAAC data center. @@ -652,7 +655,7 @@ def _download_file(self, url: str, directory: Path) -> str: return str(path) def _download_onprem_granules( - self, urls: List[str], directory: Path, threads: int = 8 + self, urls: List[str], directory: Path, threads: int = 8,fail_fast: bool = True ) -> List[Any]: """Downloads a list of URLS into the data directory. @@ -674,11 +677,15 @@ def _download_onprem_granules( directory.mkdir(parents=True, exist_ok=True) arguments = [(url, directory) for url in urls] + + exception_behavior = "immediate" if fail_fast else "deferred" + results = pqdm( arguments, self._download_file, n_jobs=threads, argument_type="args", + exception_behaviour=exception_behavior ) return results From 1e3d5235c81e6eff5a734696ca6ccf0801faec54 Mon Sep 17 00:00:00 2001 From: Sherwin-14 Date: Sat, 24 Aug 2024 23:08:08 +0530 Subject: [PATCH 02/11] Made changes like adding excpetion_behaviour and fail_fast parameter --- CHANGELOG.md | 6 ++++++ earthaccess/store.py | 31 ++++++++++++++++++++----------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc892679..9e043d21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html) ## [Unreleased] +- Fix `earthaccess.download` to not ignore errors by default + ([#581](https://github.com/nsidc/earthaccess/issues/581)) + ([**@Sherwin-14**](https://github.com/Sherwin-14), + [**@chuckwondo**](https://github.com/chuckwondo), + [**@mfisher87**](https://github.com/mfisher87)) + ### Changed - Use built-in `assert` statements instead of `unittest` assertions in diff --git a/earthaccess/store.py b/earthaccess/store.py index 4e751790..44b76a25 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -69,7 +69,12 @@ def multi_thread_open(data: tuple[str, Optional[DataGranule]]) -> EarthAccessFil url, granule = data return EarthAccessFile(fs.open(url), granule) # type: ignore - fileset = pqdm(url_mapping.items(), multi_thread_open, n_jobs=threads,exception_behaviour=exception_behavior) + fileset = pqdm( + url_mapping.items(), + multi_thread_open, + n_jobs=threads, + exception_behaviour=exception_behavior, + ) return fileset @@ -444,10 +449,7 @@ def _open_urls( if s3_fs is not None: try: fileset = _open_files( - url_mapping, - fs=s3_fs, - threads=threads, - fail_fast = fail_fast + url_mapping, fs=s3_fs, threads=threads, fail_fast=fail_fast ) except Exception as e: raise RuntimeError( @@ -545,7 +547,7 @@ def _get_urls( local_path: Path, provider: Optional[str] = None, threads: int = 8, - fail_fast: bool = True + fail_fast: bool = True, ) -> List[str]: data_links = granules downloaded_files: List = [] @@ -567,7 +569,9 @@ def _get_urls( else: # if we are not in AWS - return self._download_onprem_granules(data_links, local_path, threads,fail_fast=fail_fast) + return self._download_onprem_granules( + data_links, local_path, threads, fail_fast=fail_fast + ) @_get.register def _get_granules( @@ -576,7 +580,7 @@ def _get_granules( local_path: Path, provider: Optional[str] = None, threads: int = 8, - fail_fast: bool = True + fail_fast: bool = True, ) -> List[str]: data_links: List = [] downloaded_files: List = [] @@ -617,7 +621,9 @@ def _get_granules( else: # if the data are cloud-based, but we are not in AWS, # it will be downloaded as if it was on prem - return self._download_onprem_granules(data_links, local_path, threads,fail_fast=fail_fast) + return self._download_onprem_granules( + data_links, local_path, threads, fail_fast=fail_fast + ) def _download_file(self, url: str, directory: Path) -> str: """Download a single file from an on-prem location, a DAAC data center. @@ -655,7 +661,7 @@ def _download_file(self, url: str, directory: Path) -> str: return str(path) def _download_onprem_granules( - self, urls: List[str], directory: Path, threads: int = 8,fail_fast: bool = True + self, urls: List[str], directory: Path, threads: int = 8, fail_fast: bool = True ) -> List[Any]: """Downloads a list of URLS into the data directory. @@ -664,6 +670,9 @@ def _download_onprem_granules( directory: local directory to store the downloaded files threads: parallel number of threads to use to download the files; adjust as necessary, default = 8 + fail_fast: if set to True, the download process will stop immediately + upon encountering the first error. If set to False, errors will be + deferred, allowing the download of remaining files to continue. Returns: A list of local filepaths to which the files were downloaded. @@ -685,7 +694,7 @@ def _download_onprem_granules( self._download_file, n_jobs=threads, argument_type="args", - exception_behaviour=exception_behavior + exception_behaviour=exception_behavior, ) return results From 42f97ce4a9b3bbce5354b9a5121092e1c3030473 Mon Sep 17 00:00:00 2001 From: Sherwin-14 Date: Tue, 3 Sep 2024 17:09:52 +0530 Subject: [PATCH 03/11] Changes made to public functions --- earthaccess/api.py | 10 ++++++++-- earthaccess/store.py | 25 ++++++++++++++++--------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/earthaccess/api.py b/earthaccess/api.py index 0a50e563..c6403bc5 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -205,6 +205,7 @@ def download( local_path: Optional[str], provider: Optional[str] = None, threads: int = 8, + fail_fast: bool = True, ) -> List[str]: """Retrieves data granules from a remote storage system. @@ -230,7 +231,9 @@ def download( elif isinstance(granules, str): granules = [granules] try: - results = earthaccess.__store__.get(granules, local_path, provider, threads) + results = earthaccess.__store__.get( + granules, local_path, provider, threads, fail_fast=fail_fast + ) except AttributeError as err: logger.error( f"{err}: You must call earthaccess.login() before you can download data" @@ -242,6 +245,7 @@ def download( def open( granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, + fail_fast: bool = True, ) -> List[AbstractFileSystem]: """Returns a list of file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -255,7 +259,9 @@ def open( A list of "file pointers" to remote (i.e. s3 or https) files. """ provider = _normalize_location(provider) - results = earthaccess.__store__.open(granules=granules, provider=provider) + results = earthaccess.__store__.open( + granules=granules, provider=provider, fail_fast=fail_fast + ) return results diff --git a/earthaccess/store.py b/earthaccess/store.py index 44b76a25..a4b53944 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -354,7 +354,7 @@ def open( A list of "file pointers" to remote (i.e. s3 or https) files. """ if len(granules): - return self._open(granules, provider) + return self._open(granules, provider, fail_fast=fail_fast) return [] @singledispatchmethod @@ -362,6 +362,7 @@ def _open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, + fail_fast: bool = True, ) -> List[Any]: raise NotImplementedError("granules should be a list of DataGranule or URLs") @@ -401,9 +402,7 @@ def _open_granules( if s3_fs is not None: try: fileset = _open_files( - url_mapping, - fs=s3_fs, - threads=threads, + url_mapping, fs=s3_fs, threads=threads, fail_fast=fail_fast ) except Exception as e: raise RuntimeError( @@ -412,11 +411,15 @@ def _open_granules( f"Exception: {traceback.format_exc()}" ) from e else: - fileset = self._open_urls_https(url_mapping, threads=threads) + fileset = self._open_urls_https( + url_mapping, threads=threads, fail_fast=fail_fast + ) return fileset else: url_mapping = _get_url_granule_mapping(granules, access="on_prem") - fileset = self._open_urls_https(url_mapping, threads=threads) + fileset = self._open_urls_https( + url_mapping, threads=threads, fail_fast=fail_fast + ) return fileset @_open.register @@ -469,7 +472,7 @@ def _open_urls( raise ValueError( "We cannot open S3 links when we are not in-region, try using HTTPS links" ) - fileset = self._open_urls_https(url_mapping, threads) + fileset = self._open_urls_https(url_mapping, threads, fail_fast=fail_fast) return fileset def get( @@ -478,6 +481,7 @@ def get( local_path: Union[Path, str, None] = None, provider: Optional[str] = None, threads: int = 8, + fail_fast: bool = True, ) -> List[str]: """Retrieves data granules from a remote storage system. @@ -506,7 +510,9 @@ def get( local_path = Path(local_path) if len(granules): - files = self._get(granules, local_path, provider, threads) + files = self._get( + granules, local_path, provider, threads, fail_fast=fail_fast + ) return files else: raise ValueError("List of URLs or DataGranule instances expected") @@ -518,6 +524,7 @@ def _get( local_path: Path, provider: Optional[str] = None, threads: int = 8, + fail_fast: bool = True, ) -> List[str]: """Retrieves data granules from a remote storage system. @@ -694,7 +701,7 @@ def _download_onprem_granules( self._download_file, n_jobs=threads, argument_type="args", - exception_behaviour=exception_behavior, + exception_behaviour=exception_behavior, ) return results From 0e76a239a1b0c94199dcf67b6c48c82ec80ac53e Mon Sep 17 00:00:00 2001 From: Sherwin-14 Date: Sun, 29 Sep 2024 19:17:02 +0530 Subject: [PATCH 04/11] Added pqdm_kwargs to the functions --- earthaccess/api.py | 22 +++++++++---- earthaccess/store.py | 75 +++++++++++++++++++++++++++----------------- 2 files changed, 63 insertions(+), 34 deletions(-) diff --git a/earthaccess/api.py b/earthaccess/api.py index c6403bc5..0189d7c4 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -3,7 +3,7 @@ import requests import s3fs from fsspec import AbstractFileSystem -from typing_extensions import Any, Dict, List, Optional, Union, deprecated +from typing_extensions import Any, Dict, List, Optional, Union, deprecated, Mapping import earthaccess from earthaccess.services import DataServices @@ -205,7 +205,7 @@ def download( local_path: Optional[str], provider: Optional[str] = None, threads: int = 8, - fail_fast: bool = True, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[str]: """Retrieves data granules from a remote storage system. @@ -226,14 +226,19 @@ def download( Exception: A file download failed. """ provider = _normalize_location(provider) + pqdm_kwargs = { + "exception_behavior": "immediate", + "n_jobs": threads, + **(pqdm_kwargs or {}), + } if isinstance(granules, DataGranule): granules = [granules] elif isinstance(granules, str): granules = [granules] try: results = earthaccess.__store__.get( - granules, local_path, provider, threads, fail_fast=fail_fast - ) + granules, local_path, provider, threads, pqdm_kwargs + ) except AttributeError as err: logger.error( f"{err}: You must call earthaccess.login() before you can download data" @@ -245,7 +250,7 @@ def download( def open( granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, - fail_fast: bool = True, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[AbstractFileSystem]: """Returns a list of file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. @@ -259,8 +264,13 @@ def open( A list of "file pointers" to remote (i.e. s3 or https) files. """ provider = _normalize_location(provider) + pqdm_kwargs = { + "exception_behavior": "immediate", + "n_jobs": threads, + **(pqdm_kwargs or {}), + } results = earthaccess.__store__.open( - granules=granules, provider=provider, fail_fast=fail_fast + granules=granules, provider=provider, pqdm_kwargs=pqdm_kwargs ) return results diff --git a/earthaccess/store.py b/earthaccess/store.py index a4b53944..7679495f 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -63,17 +63,18 @@ def __repr__(self) -> str: def _open_files( url_mapping: Mapping[str, Union[DataGranule, None]], fs: fsspec.AbstractFileSystem, - threads: int = 8, -) -> List[fsspec.spec.AbstractBufferedFile]: + threads: Optional[int] = 8, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, +) -> List[fsspec.AbstractFileSystem]: def multi_thread_open(data: tuple[str, Optional[DataGranule]]) -> EarthAccessFile: - url, granule = data - return EarthAccessFile(fs.open(url), granule) # type: ignore + urls, granule = data + return EarthAccessFile(fs.open(urls), granule) # type: ignore fileset = pqdm( url_mapping.items(), multi_thread_open, n_jobs=threads, - exception_behaviour=exception_behavior, + **pqdm_kwargs ) return fileset @@ -341,8 +342,9 @@ def open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[fsspec.spec.AbstractBufferedFile]: - """Returns a list of file-like objects that can be used to access files + """Returns a list of fsspec file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. Parameters: @@ -354,7 +356,7 @@ def open( A list of "file pointers" to remote (i.e. s3 or https) files. """ if len(granules): - return self._open(granules, provider, fail_fast=fail_fast) + return self._open(granules, provider,**pqdm_kwargs) return [] @singledispatchmethod @@ -362,7 +364,7 @@ def _open( self, granules: Union[List[str], List[DataGranule]], provider: Optional[str] = None, - fail_fast: bool = True, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[Any]: raise NotImplementedError("granules should be a list of DataGranule or URLs") @@ -372,6 +374,7 @@ def _open_granules( granules: List[DataGranule], provider: Optional[str] = None, threads: int = 8, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[Any]: fileset: List = [] total_size = round(sum([granule.size() for granule in granules]) / 1024, 2) @@ -397,13 +400,23 @@ def _open_granules( else: access = "on_prem" s3_fs = None - + access = "direct" + provider = granules[0]["meta"]["provider-id"] + # if the data has its own S3 credentials endpoint, we will use it + endpoint = self._own_s3_credentials(granules[0]["umm"]["RelatedUrls"]) + if endpoint is not None: + logger.info(f"using endpoint: {endpoint}") + s3_fs = self.get_s3_filesystem(endpoint=endpoint) + else: + logger.info(f"using provider: {provider}") + s3_fs = self.get_s3_filesystem(provider=provider) + url_mapping = _get_url_granule_mapping(granules, access) if s3_fs is not None: try: fileset = _open_files( - url_mapping, fs=s3_fs, threads=threads, fail_fast=fail_fast - ) + url_mapping, fs=s3_fs, threads=threads, **pqdm_kwargs + ) except Exception as e: raise RuntimeError( "An exception occurred while trying to access remote files on S3. " @@ -412,13 +425,13 @@ def _open_granules( ) from e else: fileset = self._open_urls_https( - url_mapping, threads=threads, fail_fast=fail_fast + url_mapping, threads=threads, **pqdm_kwargs ) return fileset else: url_mapping = _get_url_granule_mapping(granules, access="on_prem") fileset = self._open_urls_https( - url_mapping, threads=threads, fail_fast=fail_fast + url_mapping, threads=threads, **pqdm_kwargs ) return fileset @@ -428,6 +441,7 @@ def _open_urls( granules: List[str], provider: Optional[str] = None, threads: int = 8, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[Any]: fileset: List = [] @@ -452,7 +466,7 @@ def _open_urls( if s3_fs is not None: try: fileset = _open_files( - url_mapping, fs=s3_fs, threads=threads, fail_fast=fail_fast + url_mapping, fs=s3_fs, threads=threads, **pqdm_kwargs ) except Exception as e: raise RuntimeError( @@ -472,7 +486,7 @@ def _open_urls( raise ValueError( "We cannot open S3 links when we are not in-region, try using HTTPS links" ) - fileset = self._open_urls_https(url_mapping, threads, fail_fast=fail_fast) + fileset = self._open_urls_https(url_mapping, threads,**pqdm_kwargs) return fileset def get( @@ -481,7 +495,7 @@ def get( local_path: Union[Path, str, None] = None, provider: Optional[str] = None, threads: int = 8, - fail_fast: bool = True, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[str]: """Retrieves data granules from a remote storage system. @@ -509,9 +523,15 @@ def get( elif isinstance(local_path, str): local_path = Path(local_path) + pqdm_kwargs = { + "exception_behavior": "immediate", + "n_jobs": threads, + **pqdm_kwargs, + } + if len(granules): files = self._get( - granules, local_path, provider, threads, fail_fast=fail_fast + granules, local_path, provider, threads, **pqdm_kwargs ) return files else: @@ -524,7 +544,7 @@ def _get( local_path: Path, provider: Optional[str] = None, threads: int = 8, - fail_fast: bool = True, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[str]: """Retrieves data granules from a remote storage system. @@ -554,7 +574,7 @@ def _get_urls( local_path: Path, provider: Optional[str] = None, threads: int = 8, - fail_fast: bool = True, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[str]: data_links = granules downloaded_files: List = [] @@ -577,8 +597,8 @@ def _get_urls( else: # if we are not in AWS return self._download_onprem_granules( - data_links, local_path, threads, fail_fast=fail_fast - ) + data_links, local_path, threads, **pqdm_kwargs + ) @_get.register def _get_granules( @@ -587,7 +607,7 @@ def _get_granules( local_path: Path, provider: Optional[str] = None, threads: int = 8, - fail_fast: bool = True, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[str]: data_links: List = [] downloaded_files: List = [] @@ -629,7 +649,7 @@ def _get_granules( # if the data are cloud-based, but we are not in AWS, # it will be downloaded as if it was on prem return self._download_onprem_granules( - data_links, local_path, threads, fail_fast=fail_fast + data_links, local_path, threads, **pqdm_kwargs ) def _download_file(self, url: str, directory: Path) -> str: @@ -668,7 +688,7 @@ def _download_file(self, url: str, directory: Path) -> str: return str(path) def _download_onprem_granules( - self, urls: List[str], directory: Path, threads: int = 8, fail_fast: bool = True + self, urls: List[str], directory: Path, threads: int = 8, pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[Any]: """Downloads a list of URLS into the data directory. @@ -694,14 +714,13 @@ def _download_onprem_granules( arguments = [(url, directory) for url in urls] - exception_behavior = "immediate" if fail_fast else "deferred" - results = pqdm( arguments, self._download_file, n_jobs=threads, argument_type="args", - exception_behaviour=exception_behavior, + exception_behaviour=exception_behavior, + **pqdm_kwargs ) return results @@ -713,7 +732,7 @@ def _open_urls_https( https_fs = self.get_fsspec_session() try: - return _open_files(url_mapping, https_fs, threads) + return _open_files(url_mapping, https_fs, threads,**pqdm_kwargs) except Exception: logger.exception( "An exception occurred while trying to access remote files via HTTPS" From fc59282b9589de60fe6059ab9f0618065583683b Mon Sep 17 00:00:00 2001 From: Sherwin-14 Date: Sun, 29 Sep 2024 19:43:17 +0530 Subject: [PATCH 05/11] Fixed some glarring issues --- CHANGELOG.md | 30 +++++++++++++++++++++------ earthaccess/api.py | 4 ++-- earthaccess/store.py | 49 ++++++++++++++++++++------------------------ 3 files changed, 48 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e043d21..2b2552fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,12 +68,30 @@ and this project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html) [@betolink](https://github.com/betolink)) - Add example PR links to pull request template ([#756](https://github.com/nsidc/earthaccess/issues/756)) - ([@Sherwin-14](https://github.com/Sherwin-14), - [@mfisher87](https://github.com/mfisher87)) -- Add Contributing Naming Convention document - ([#532](https://github.com/nsidc/earthaccess/issues/532)) - ([@Sherwin-14](https://github.com/Sherwin-14), - [@mfisher87](https://github.com/mfisher87)) + [**@Sherwin-14**](https://github.com/Sherwin-14), + [**@mfisher87**](https://github.com/mfisher87) + +- Added Contributing Naming Convention document + ([#532](https://github.com/nsidc/earthaccess/issues/532)) + [**@Sherwin-14**](https://github.com/Sherwin-14), + [**@mfisher87**](https://github.com/mfisher87) + +### Fixed + +- Removed Broken Link "Introduction to NASA earthaccess" + ([#779](https://github.com/nsidc/earthaccess/issues/779)) + ([**@Sherwin-14**](https://github.com/Sherwin-14)) +- Restore automation for tidying notebooks used in documentation + ([#788](https://github.com/nsidc/earthaccess/issues/788)) + ([**@itcarroll**](https://github.com/itcarroll)) +- Remove the base class on `EarthAccessFile` to fix method resolution + ([#610](https://github.com/nsidc/earthaccess/issues/610)) + ([**@itcarroll**](https://github.com/itcarroll)) +- Fixed earthaccess.download() ignoring errors + ([#581](https://github.com/nsidc/earthaccess/issues/581)) + ([**@Sherwin-14**](https://github.com/Sherwin-14)), + ([**@chuckwondo**](https://github.com/chuckwondo), + [**@mfisher87**](https://github.com/mfisher87) ### Removed diff --git a/earthaccess/api.py b/earthaccess/api.py index 0189d7c4..c9a2221a 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -3,7 +3,7 @@ import requests import s3fs from fsspec import AbstractFileSystem -from typing_extensions import Any, Dict, List, Optional, Union, deprecated, Mapping +from typing_extensions import Any, Dict, List, Mapping, Optional, Union, deprecated import earthaccess from earthaccess.services import DataServices @@ -238,7 +238,7 @@ def download( try: results = earthaccess.__store__.get( granules, local_path, provider, threads, pqdm_kwargs - ) + ) except AttributeError as err: logger.error( f"{err}: You must call earthaccess.login() before you can download data" diff --git a/earthaccess/store.py b/earthaccess/store.py index 7679495f..3e7dd48f 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -71,10 +71,7 @@ def multi_thread_open(data: tuple[str, Optional[DataGranule]]) -> EarthAccessFil return EarthAccessFile(fs.open(urls), granule) # type: ignore fileset = pqdm( - url_mapping.items(), - multi_thread_open, - n_jobs=threads, - **pqdm_kwargs + url_mapping.items(), multi_thread_open, n_jobs=threads, **pqdm_kwargs ) return fileset @@ -356,7 +353,7 @@ def open( A list of "file pointers" to remote (i.e. s3 or https) files. """ if len(granules): - return self._open(granules, provider,**pqdm_kwargs) + return self._open(granules, provider, pqdm_kwargs) return [] @singledispatchmethod @@ -410,13 +407,13 @@ def _open_granules( else: logger.info(f"using provider: {provider}") s3_fs = self.get_s3_filesystem(provider=provider) - + url_mapping = _get_url_granule_mapping(granules, access) if s3_fs is not None: try: fileset = _open_files( - url_mapping, fs=s3_fs, threads=threads, **pqdm_kwargs - ) + url_mapping, fs=s3_fs, threads=threads, pqdm_kwargs=pqdm_kwargs + ) except Exception as e: raise RuntimeError( "An exception occurred while trying to access remote files on S3. " @@ -425,13 +422,13 @@ def _open_granules( ) from e else: fileset = self._open_urls_https( - url_mapping, threads=threads, **pqdm_kwargs + url_mapping, threads=threads, pqdm_kwargs=pqdm_kwargs ) return fileset else: url_mapping = _get_url_granule_mapping(granules, access="on_prem") fileset = self._open_urls_https( - url_mapping, threads=threads, **pqdm_kwargs + url_mapping, threads=threads, pqdm_kwargs=pqdm_kwargs ) return fileset @@ -466,7 +463,10 @@ def _open_urls( if s3_fs is not None: try: fileset = _open_files( - url_mapping, fs=s3_fs, threads=threads, **pqdm_kwargs + url_mapping, + fs=s3_fs, + threads=threads, + pqdm_kwargs=pqdm_kwargs, ) except Exception as e: raise RuntimeError( @@ -486,7 +486,7 @@ def _open_urls( raise ValueError( "We cannot open S3 links when we are not in-region, try using HTTPS links" ) - fileset = self._open_urls_https(url_mapping, threads,**pqdm_kwargs) + fileset = self._open_urls_https(url_mapping, threads, pqdm_kwargs) return fileset def get( @@ -523,16 +523,8 @@ def get( elif isinstance(local_path, str): local_path = Path(local_path) - pqdm_kwargs = { - "exception_behavior": "immediate", - "n_jobs": threads, - **pqdm_kwargs, - } - if len(granules): - files = self._get( - granules, local_path, provider, threads, **pqdm_kwargs - ) + files = self._get(granules, local_path, provider, threads, pqdm_kwargs) return files else: raise ValueError("List of URLs or DataGranule instances expected") @@ -597,8 +589,8 @@ def _get_urls( else: # if we are not in AWS return self._download_onprem_granules( - data_links, local_path, threads, **pqdm_kwargs - ) + data_links, local_path, threads, pqdm_kwargs + ) @_get.register def _get_granules( @@ -649,7 +641,7 @@ def _get_granules( # if the data are cloud-based, but we are not in AWS, # it will be downloaded as if it was on prem return self._download_onprem_granules( - data_links, local_path, threads, **pqdm_kwargs + data_links, local_path, threads, pqdm_kwargs ) def _download_file(self, url: str, directory: Path) -> str: @@ -688,7 +680,11 @@ def _download_file(self, url: str, directory: Path) -> str: return str(path) def _download_onprem_granules( - self, urls: List[str], directory: Path, threads: int = 8, pqdm_kwargs: Optional[Mapping[str, Any]] = None, + self, + urls: List[str], + directory: Path, + threads: int = 8, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[Any]: """Downloads a list of URLS into the data directory. @@ -719,7 +715,6 @@ def _download_onprem_granules( self._download_file, n_jobs=threads, argument_type="args", - exception_behaviour=exception_behavior, **pqdm_kwargs ) return results @@ -732,7 +727,7 @@ def _open_urls_https( https_fs = self.get_fsspec_session() try: - return _open_files(url_mapping, https_fs, threads,**pqdm_kwargs) + return _open_files(url_mapping, https_fs, threads, **pqdm_kwargs) except Exception: logger.exception( "An exception occurred while trying to access remote files via HTTPS" From 87cd8fc799b17b3ce9d3f917df2900f7e29fdb6f Mon Sep 17 00:00:00 2001 From: Sherwin-14 Date: Sun, 29 Sep 2024 19:44:56 +0530 Subject: [PATCH 06/11] Removed threads from open --- earthaccess/api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/earthaccess/api.py b/earthaccess/api.py index c9a2221a..3f3a5001 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -266,7 +266,6 @@ def open( provider = _normalize_location(provider) pqdm_kwargs = { "exception_behavior": "immediate", - "n_jobs": threads, **(pqdm_kwargs or {}), } results = earthaccess.__store__.open( From 374f3033bc1dc2698e21975a2b59d6161c4effc7 Mon Sep 17 00:00:00 2001 From: Sherwin-14 Date: Mon, 28 Oct 2024 19:47:49 +0530 Subject: [PATCH 07/11] Replaced fast_fail with pqdm_kwargs the code and added a test --- earthaccess/api.py | 7 ++++++ earthaccess/store.py | 50 ++++++++++++++++++++++-------------------- tests/unit/test_api.py | 25 +++++++++++++++++++++ 3 files changed, 58 insertions(+), 24 deletions(-) create mode 100644 tests/unit/test_api.py diff --git a/earthaccess/api.py b/earthaccess/api.py index 3f3a5001..a6e87352 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -218,6 +218,9 @@ def download( local_path: local directory to store the remote data granules provider: if we download a list of URLs, we need to specify the provider. threads: parallel number of threads to use to download the files, adjust as necessary, default = 8 + pqdm_kwargs: Additional keyword arguments to pass to pqdm, a parallel processing library. + See pqdm documentation for available options. Default is to use immediate exception behavior + and the number of jobs specified by the `threads` parameter. Returns: List of downloaded files @@ -226,6 +229,7 @@ def download( Exception: A file download failed. """ provider = _normalize_location(provider) + pqdm_kwargs = dict(pqdm_kwargs) if pqdm_kwargs is not None else {} pqdm_kwargs = { "exception_behavior": "immediate", "n_jobs": threads, @@ -259,6 +263,9 @@ def open( granules: a list of granule instances **or** list of URLs, e.g. `s3://some-granule`. If a list of URLs is passed, we need to specify the data provider. provider: e.g. POCLOUD, NSIDC_CPRD, etc. + pqdm_kwargs: Additional keyword arguments to pass to pqdm, a parallel processing library. + See pqdm documentation for available options. Default is to use immediate exception behavior + and the number of jobs specified by the `threads` parameter. Returns: A list of "file pointers" to remote (i.e. s3 or https) files. diff --git a/earthaccess/store.py b/earthaccess/store.py index 3e7dd48f..bbc1c7d1 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -68,7 +68,12 @@ def _open_files( ) -> List[fsspec.AbstractFileSystem]: def multi_thread_open(data: tuple[str, Optional[DataGranule]]) -> EarthAccessFile: urls, granule = data - return EarthAccessFile(fs.open(urls), granule) # type: ignore + return EarthAccessFile(fs.open(urls), granule) # type: ignore + + pqdm_kwargs = { + "exception_behavior": "immediate", + **(pqdm_kwargs or {}), + } fileset = pqdm( url_mapping.items(), multi_thread_open, n_jobs=threads, **pqdm_kwargs @@ -348,6 +353,9 @@ def open( granules: a list of granule instances **or** list of URLs, e.g. `s3://some-granule`. If a list of URLs is passed, we need to specify the data provider. provider: e.g. POCLOUD, NSIDC_CPRD, etc. + pqdm_kwargs: Additional keyword arguments to pass to pqdm, a parallel processing library. + See pqdm documentation for available options. Default is to use immediate exception behavior + and the number of jobs specified by the `threads` parameter. Returns: A list of "file pointers" to remote (i.e. s3 or https) files. @@ -371,7 +379,6 @@ def _open_granules( granules: List[DataGranule], provider: Optional[str] = None, threads: int = 8, - pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[Any]: fileset: List = [] total_size = round(sum([granule.size() for granule in granules]) / 1024, 2) @@ -397,22 +404,14 @@ def _open_granules( else: access = "on_prem" s3_fs = None - access = "direct" - provider = granules[0]["meta"]["provider-id"] - # if the data has its own S3 credentials endpoint, we will use it - endpoint = self._own_s3_credentials(granules[0]["umm"]["RelatedUrls"]) - if endpoint is not None: - logger.info(f"using endpoint: {endpoint}") - s3_fs = self.get_s3_filesystem(endpoint=endpoint) - else: - logger.info(f"using provider: {provider}") - s3_fs = self.get_s3_filesystem(provider=provider) url_mapping = _get_url_granule_mapping(granules, access) if s3_fs is not None: try: fileset = _open_files( - url_mapping, fs=s3_fs, threads=threads, pqdm_kwargs=pqdm_kwargs + url_mapping, + fs=s3_fs, + threads=threads, ) except Exception as e: raise RuntimeError( @@ -421,15 +420,11 @@ def _open_granules( f"Exception: {traceback.format_exc()}" ) from e else: - fileset = self._open_urls_https( - url_mapping, threads=threads, pqdm_kwargs=pqdm_kwargs - ) + fileset = self._open_urls_https(url_mapping, threads=threads) return fileset else: url_mapping = _get_url_granule_mapping(granules, access="on_prem") - fileset = self._open_urls_https( - url_mapping, threads=threads, pqdm_kwargs=pqdm_kwargs - ) + fileset = self._open_urls_https(url_mapping, threads=threads) return fileset @_open.register @@ -512,6 +507,9 @@ def get( provider: a valid cloud provider, each DAAC has a provider code for their cloud distributions threads: Parallel number of threads to use to download the files; adjust as necessary, default = 8. + pqdm_kwargs: Additional keyword arguments to pass to pqdm, a parallel processing library. + See pqdm documentation for available options. Default is to use immediate exception behavior + and the number of jobs specified by the `threads` parameter. Returns: List of downloaded files @@ -553,6 +551,9 @@ def _get( provider: a valid cloud provider, each DAAC has a provider code for their cloud distributions threads: Parallel number of threads to use to download the files; adjust as necessary, default = 8. + pqdm_kwargs: Additional keyword arguments to pass to pqdm, a parallel processing library. + See pqdm documentation for available options. Default is to use immediate exception behavior + and the number of jobs specified by the `threads` parameter. Returns: None @@ -693,9 +694,9 @@ def _download_onprem_granules( directory: local directory to store the downloaded files threads: parallel number of threads to use to download the files; adjust as necessary, default = 8 - fail_fast: if set to True, the download process will stop immediately - upon encountering the first error. If set to False, errors will be - deferred, allowing the download of remaining files to continue. + pqdm_kwargs: Additional keyword arguments to pass to pqdm, a parallel processing library. + See pqdm documentation for available options. Default is to use immediate exception behavior + and the number of jobs specified by the `threads` parameter. Returns: A list of local filepaths to which the files were downloaded. @@ -715,7 +716,7 @@ def _download_onprem_granules( self._download_file, n_jobs=threads, argument_type="args", - **pqdm_kwargs + **pqdm_kwargs, ) return results @@ -723,11 +724,12 @@ def _open_urls_https( self, url_mapping: Mapping[str, Union[DataGranule, None]], threads: int = 8, + pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[fsspec.AbstractFileSystem]: https_fs = self.get_fsspec_session() try: - return _open_files(url_mapping, https_fs, threads, **pqdm_kwargs) + return _open_files(url_mapping, https_fs, threads, pqdm_kwargs) except Exception: logger.exception( "An exception occurred while trying to access remote files via HTTPS" diff --git a/tests/unit/test_api.py b/tests/unit/test_api.py new file mode 100644 index 00000000..27a7717e --- /dev/null +++ b/tests/unit/test_api.py @@ -0,0 +1,25 @@ +from unittest.mock import Mock + +import earthaccess +import pytest + + +def test_download(monkeypatch): + earthaccess.login() + + results = earthaccess.search_data( + short_name="ATL06", + bounding_box=(-10, 20, 10, 50), + temporal=("1999-02", "2019-03"), + count=10, + ) + + def mock_get(*args, **kwargs): + raise Exception("Download failed") + + mock_store = Mock() + monkeypatch.setattr(earthaccess, "__store__", mock_store) + monkeypatch.setattr(mock_store, "get", mock_get) + + with pytest.raises(Exception, match="Download failed"): + earthaccess.download(results, "/home/download-folder") From de8df1cea2b460fd58629e5e3584e971273c1434 Mon Sep 17 00:00:00 2001 From: Sherwin Varghese <141290943+Sherwin-14@users.noreply.github.com> Date: Mon, 28 Oct 2024 22:53:08 +0530 Subject: [PATCH 08/11] remove an unnecessary line Co-authored-by: Chuck Daniels --- earthaccess/api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/earthaccess/api.py b/earthaccess/api.py index a6e87352..5ad75ed5 100644 --- a/earthaccess/api.py +++ b/earthaccess/api.py @@ -229,7 +229,6 @@ def download( Exception: A file download failed. """ provider = _normalize_location(provider) - pqdm_kwargs = dict(pqdm_kwargs) if pqdm_kwargs is not None else {} pqdm_kwargs = { "exception_behavior": "immediate", "n_jobs": threads, From 767e11982590a1f38bc68f509c53cba18d645f9f Mon Sep 17 00:00:00 2001 From: Sherwin Varghese <141290943+Sherwin-14@users.noreply.github.com> Date: Mon, 28 Oct 2024 22:54:51 +0530 Subject: [PATCH 09/11] Update CHANGELOG.md Co-authored-by: Chuck Daniels --- CHANGELOG.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b2552fd..be023d11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,11 +87,11 @@ and this project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html) - Remove the base class on `EarthAccessFile` to fix method resolution ([#610](https://github.com/nsidc/earthaccess/issues/610)) ([**@itcarroll**](https://github.com/itcarroll)) -- Fixed earthaccess.download() ignoring errors +- Fix `earthaccess.download` to not ignore errors by default ([#581](https://github.com/nsidc/earthaccess/issues/581)) - ([**@Sherwin-14**](https://github.com/Sherwin-14)), - ([**@chuckwondo**](https://github.com/chuckwondo), - [**@mfisher87**](https://github.com/mfisher87) + ([**@Sherwin-14**](https://github.com/Sherwin-14), + [**@chuckwondo**](https://github.com/chuckwondo), + [**@mfisher87**](https://github.com/mfisher87)) ### Removed From 2d485b3a6ad49bba32a1cba6ec3b9c6d2aae4c73 Mon Sep 17 00:00:00 2001 From: Sherwin-14 Date: Tue, 29 Oct 2024 20:26:42 +0530 Subject: [PATCH 10/11] Added a second test for download --- CHANGELOG.md | 30 ++++++------------------------ earthaccess/store.py | 10 +++++----- tests/unit/test_api.py | 25 +++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be023d11..9e043d21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,30 +68,12 @@ and this project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html) [@betolink](https://github.com/betolink)) - Add example PR links to pull request template ([#756](https://github.com/nsidc/earthaccess/issues/756)) - [**@Sherwin-14**](https://github.com/Sherwin-14), - [**@mfisher87**](https://github.com/mfisher87) - -- Added Contributing Naming Convention document - ([#532](https://github.com/nsidc/earthaccess/issues/532)) - [**@Sherwin-14**](https://github.com/Sherwin-14), - [**@mfisher87**](https://github.com/mfisher87) - -### Fixed - -- Removed Broken Link "Introduction to NASA earthaccess" - ([#779](https://github.com/nsidc/earthaccess/issues/779)) - ([**@Sherwin-14**](https://github.com/Sherwin-14)) -- Restore automation for tidying notebooks used in documentation - ([#788](https://github.com/nsidc/earthaccess/issues/788)) - ([**@itcarroll**](https://github.com/itcarroll)) -- Remove the base class on `EarthAccessFile` to fix method resolution - ([#610](https://github.com/nsidc/earthaccess/issues/610)) - ([**@itcarroll**](https://github.com/itcarroll)) -- Fix `earthaccess.download` to not ignore errors by default - ([#581](https://github.com/nsidc/earthaccess/issues/581)) - ([**@Sherwin-14**](https://github.com/Sherwin-14), - [**@chuckwondo**](https://github.com/chuckwondo), - [**@mfisher87**](https://github.com/mfisher87)) + ([@Sherwin-14](https://github.com/Sherwin-14), + [@mfisher87](https://github.com/mfisher87)) +- Add Contributing Naming Convention document + ([#532](https://github.com/nsidc/earthaccess/issues/532)) + ([@Sherwin-14](https://github.com/Sherwin-14), + [@mfisher87](https://github.com/mfisher87)) ### Removed diff --git a/earthaccess/store.py b/earthaccess/store.py index bbc1c7d1..f7b5c85e 100644 --- a/earthaccess/store.py +++ b/earthaccess/store.py @@ -63,12 +63,12 @@ def __repr__(self) -> str: def _open_files( url_mapping: Mapping[str, Union[DataGranule, None]], fs: fsspec.AbstractFileSystem, - threads: Optional[int] = 8, + threads: int = 8, pqdm_kwargs: Optional[Mapping[str, Any]] = None, -) -> List[fsspec.AbstractFileSystem]: +) -> List[fsspec.spec.AbstractBufferedFile]: def multi_thread_open(data: tuple[str, Optional[DataGranule]]) -> EarthAccessFile: - urls, granule = data - return EarthAccessFile(fs.open(urls), granule) # type: ignore + url, granule = data + return EarthAccessFile(fs.open(url), granule) # type: ignore pqdm_kwargs = { "exception_behavior": "immediate", @@ -346,7 +346,7 @@ def open( provider: Optional[str] = None, pqdm_kwargs: Optional[Mapping[str, Any]] = None, ) -> List[fsspec.spec.AbstractBufferedFile]: - """Returns a list of fsspec file-like objects that can be used to access files + """Returns a list of file-like objects that can be used to access files hosted on S3 or HTTPS by third party libraries like xarray. Parameters: diff --git a/tests/unit/test_api.py b/tests/unit/test_api.py index 27a7717e..0393243e 100644 --- a/tests/unit/test_api.py +++ b/tests/unit/test_api.py @@ -23,3 +23,28 @@ def mock_get(*args, **kwargs): with pytest.raises(Exception, match="Download failed"): earthaccess.download(results, "/home/download-folder") + + +def test_download_deferred_failure(monkeypatch): + earthaccess.login() + + results = earthaccess.search_data( + short_name="ATL06", + bounding_box=(-10, 20, 10, 50), + temporal=("1999-02", "2019-03"), + count=10, + ) + + def mock_get(*args, **kwargs): + return [Exception("Download failed")] * len(results) + + mock_store = Mock() + monkeypatch.setattr(earthaccess, "__store__", mock_store) + monkeypatch.setattr(mock_store, "get", mock_get) + + results = earthaccess.download( + results, "/home/download-folder", None, 8, {"exception_behavior": "deferred"} + ) + + assert all(isinstance(e, Exception) for e in results) + assert len(results) == 10 From 1061743d7cbc61ea8f7c2b5be4d6968b727f25d0 Mon Sep 17 00:00:00 2001 From: Sherwin Varghese <141290943+Sherwin-14@users.noreply.github.com> Date: Mon, 4 Nov 2024 12:18:57 +0530 Subject: [PATCH 11/11] Update tests/unit/test_api.py changed test name to test_download_immediate_failure Co-authored-by: Matt Fisher <3608264+mfisher87@users.noreply.github.com> --- tests/unit/test_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_api.py b/tests/unit/test_api.py index 0393243e..20980e35 100644 --- a/tests/unit/test_api.py +++ b/tests/unit/test_api.py @@ -4,7 +4,7 @@ import pytest -def test_download(monkeypatch): +def test_download_immediate_failure(monkeypatch): earthaccess.login() results = earthaccess.search_data(