diff --git a/setup.py b/setup.py index 3c880bc..66ee34b 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ 'shapely', 'twine', 'rfc3986', - 'keyring' + 'keyring', ], 'examples': [ 'datashader', @@ -33,7 +33,7 @@ 'param', 'pyarrow >=1.0', 'python-snappy', - 'retrying', + 'tenacity', 'numpy', 'dask[complete] >=2.0' ] diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index 53d50a0..4df068a 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -2,13 +2,11 @@ import json import os import uuid -from inspect import signature import numpy as np import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from retrying import retry import dask import dask.dataframe as dd @@ -190,7 +188,11 @@ def pack_partitions(self, npartitions=None, p=15, shuffle='tasks'): # Set index to distance. This will trigger an expensive shuffle # sort operation - ddf = ddf.set_index('hilbert_distance', npartitions=npartitions, shuffle=shuffle) + ddf = ddf.set_index( + 'hilbert_distance', + npartitions=npartitions, + shuffle=shuffle, + ) if ddf.npartitions != npartitions: # set_index doesn't change the number of partitions if the partitions @@ -211,6 +213,7 @@ def pack_partitions_to_parquet( storage_options=None, engine_kwargs=None, overwrite=False, + retryit=None, ): """ Repartition and reorder dataframe spatially along a Hilbert space filling curve @@ -243,52 +246,29 @@ def pack_partitions_to_parquet( DaskGeoDataFrame backed by newly written parquet dataset """ from .io import read_parquet, read_parquet_dask - from .io.utils import validate_coerce_filesystem + from .io.utils import ( + _make_fs_retry, + _make_retry_decorator, + validate_coerce_filesystem, + ) engine_kwargs = engine_kwargs or {} # Get fsspec filesystem object - filesystem = validate_coerce_filesystem(path, filesystem, storage_options) + filesystem = validate_coerce_filesystem( + path, + filesystem, + storage_options, + ) # Decorator for operations that should be retried - if _retry_args is None: - _retry_args = dict( - wait_exponential_multiplier=100, - wait_exponential_max=120000, - stop_max_attempt_number=24, - ) - retryit = retry(**_retry_args) - - @retryit - def rm_retry(file_path): - filesystem.invalidate_cache() - if filesystem.exists(file_path): - filesystem.rm(file_path, recursive=True) - if filesystem.exists(file_path): - # Make sure we keep retrying until file does not exist - raise ValueError("Deletion of {path} not yet complete".format( - path=file_path - )) + retryit = retryit or _make_retry_decorator(**(_retry_args or {})) - @retryit - def mkdirs_retry(dir_path): - filesystem.makedirs(dir_path, exist_ok=True) - - # For filesystems that provide a "refresh" argument, set it to True - if 'refresh' in signature(filesystem.ls).parameters: - ls_kwargs = {'refresh': True} - else: - ls_kwargs = {} - - @retryit - def ls_retry(dir_path): - filesystem.invalidate_cache() - return filesystem.ls(dir_path, **ls_kwargs) - - @retryit - def move_retry(p1, p2): - if filesystem.exists(p1): - filesystem.move(p1, p2) + fs_retry = _make_fs_retry(filesystem, retryit) + rm_retry = fs_retry.rm_retry + mkdirs_retry = fs_retry.mkdirs_retry + ls_retry = fs_retry.ls_retry + move_retry = fs_retry.move_retry # Compute tempdir_format string dataset_uuid = str(uuid.uuid4()) @@ -406,7 +386,7 @@ def read_parquet_retry(parts_tmp_path, subpart_paths, part_output_path): **engine_kwargs, ) - ls_res = sorted(filesystem.ls(parts_tmp_path, **ls_kwargs)) + ls_res = sorted(ls_retry(parts_tmp_path)) subpart_paths_stripped = sorted([filesystem._strip_protocol(_) for _ in subpart_paths]) if subpart_paths_stripped != ls_res: diff --git a/spatialpandas/io/utils.py b/spatialpandas/io/utils.py index dcdf636..af90f2b 100644 --- a/spatialpandas/io/utils.py +++ b/spatialpandas/io/utils.py @@ -1,9 +1,27 @@ +import math +import time +from functools import wraps +from inspect import signature from os import PathLike from pathlib import Path from typing import Any, Dict, Iterable, Optional, Union import fsspec +try: + from tenacity import ( + retry as tenacity_retry, + stop_after_attempt, + wait_exponential, + ) +except ImportError: + tenacity_retry = stop_after_attempt = wait_exponential = None + +try: + from retrying import retry as retrying_retry +except ImportError: + retrying_retry = None + PathType = Union[PathLike, str, Path] @@ -49,7 +67,145 @@ def _maybe_prepend_protocol( ) -> Iterable[PathType]: protocol = filesystem.protocol if isinstance( filesystem.protocol, str) else filesystem.protocol[0] - if protocol not in ("file", "abstract"): + if protocol and protocol not in ("file", "abstract"): # Add back prefix (e.g. s3://) paths = ["{proto}://{p}".format(proto=protocol, p=p) for p in paths] return paths + + +def retry(tries, delay=3, backoff=2, max_delay=120, exceptions=Exception): + """Retry decorator with exponential backoff. + + Retries a function or method until it returns True. + Based on https://wiki.python.org/moin/PythonDecoratorLibrary#Retry + + Parameters + ---------- + delay: + Sets the initial delay in seconds, and backoff sets the factor by which + the delay should lengthen after each failure. + backoff: + Must be greater than 1, or else it isn't really a backoff. + tries: + Must be at least 0, and delay greater than 0. + max_delay: + Maximum delay to wait between tries. + exceptions: + Single or multiple exceptions to allow. + """ + if backoff <= 1: + raise ValueError("backoff must be greater than 1") + + tries = math.floor(tries) + if tries < 0: + raise ValueError("tries must be 0 or greater") + + if delay <= 0: + raise ValueError("delay must be greater than 0") + + def deco_retry(f): + @wraps(f) + def f_retry(*args, **kwargs): + mtries, mdelay = tries, delay # make mutable + + while mtries > 0: + try: + rv = f(*args, **kwargs) + return rv + except exceptions as e: + mtries -= 1 # consume an attempt + time.sleep(mdelay) # wait... + mdelay *= backoff # make future wait longer + mdelay = min(mdelay, max_delay) + if mtries <= 0: + raise e + + return f_retry # true decorator -> decorated function + + return deco_retry # @retry(arg[, ...]) -> true decorator + + +def _make_retry_decorator( + *args, + retry_lib=None, + stop_max_attempt_number=24, + wait_exponential_max=120000, + wait_exponential_multiplier=100, + **kwargs, +): + if tenacity_retry and stop_after_attempt and wait_exponential and (retry_lib is None or retry_lib == "tenacity"): + stop = kwargs.pop("stop", stop_after_attempt(stop_max_attempt_number)) + wait = kwargs.pop("wait", wait_exponential( + multiplier=wait_exponential_multiplier, + max=wait_exponential_max / 1000, + )) + reraise = kwargs.pop("reraise", True) + retryer = tenacity_retry( + *args, + reraise=reraise, + stop=stop, + wait=wait, + **kwargs, + ) + elif retrying_retry and (retry_lib is None or retry_lib == "retrying"): + retryer = retrying_retry( + *args, + wait_exponential_multiplier=wait_exponential_multiplier, + wait_exponential_max=wait_exponential_max, + stop_max_attempt_number=stop_max_attempt_number, + **kwargs, + ) + else: + delay = kwargs.pop("delay", 1) + retryer = retry( + stop_max_attempt_number, + *args, + delay=delay, + backoff=wait_exponential_multiplier, + max_delay=wait_exponential_max / 1000, + **kwargs, + ) + return retryer + + +def _make_fs_retry(filesystem, retryit=None): + retryit = retryit or _make_retry_decorator() + + # For filesystems that provide a "refresh" argument, set it to True + if 'refresh' in signature(filesystem.ls).parameters: + ls_kwargs = {'refresh': True} + else: + ls_kwargs = {} + + class FSretry: + + @staticmethod + @retryit + def rm_retry(file_path): + filesystem.invalidate_cache() + if filesystem.exists(file_path): + filesystem.rm(file_path, recursive=True) + if filesystem.exists(file_path): + # Make sure we keep retrying until file does not exist + raise ValueError( + "Deletion of {path} not yet complete".format( + path=file_path)) + + @staticmethod + @retryit + def mkdirs_retry(dir_path): + filesystem.makedirs(dir_path, exist_ok=True) + + @staticmethod + @retryit + def ls_retry(dir_path): + filesystem.invalidate_cache() + return filesystem.ls(dir_path, **ls_kwargs) + + @staticmethod + @retryit + def move_retry(p1, p2): + if filesystem.exists(p1): + filesystem.move(p1, p2) + + return FSretry diff --git a/spatialpandas/tests/test_parquet.py b/spatialpandas/tests/test_parquet.py index 282e43a..f5059e2 100644 --- a/spatialpandas/tests/test_parquet.py +++ b/spatialpandas/tests/test_parquet.py @@ -201,9 +201,9 @@ def test_pack_partitions_to_parquet(gp_multipoint, gp_multiline, tempdir_format = None _retry_args = dict( - wait_exponential_multiplier=10, - wait_exponential_max=20000, - stop_max_attempt_number=4 + stop_max_attempt_number=2, + wait_exponential_max=4000, + wait_exponential_multiplier=2, ) ddf_packed = ddf.pack_partitions_to_parquet(