Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use tenacity for retry by default, clean up #81

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
'shapely',
'twine',
'rfc3986',
'keyring'
'keyring',
],
'examples': [
'datashader',
Expand All @@ -33,7 +33,7 @@
'param',
'pyarrow >=1.0',
'python-snappy',
'retrying',
'tenacity',
'numpy',
'dask[complete] >=2.0'
]
Expand Down
66 changes: 23 additions & 43 deletions spatialpandas/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
import json
import os
import uuid
from inspect import signature

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from retrying import retry

import dask
import dask.dataframe as dd
Expand Down Expand Up @@ -190,7 +188,11 @@ def pack_partitions(self, npartitions=None, p=15, shuffle='tasks'):

# Set index to distance. This will trigger an expensive shuffle
# sort operation
ddf = ddf.set_index('hilbert_distance', npartitions=npartitions, shuffle=shuffle)
ddf = ddf.set_index(
'hilbert_distance',
npartitions=npartitions,
shuffle=shuffle,
)

if ddf.npartitions != npartitions:
# set_index doesn't change the number of partitions if the partitions
Expand All @@ -211,6 +213,7 @@ def pack_partitions_to_parquet(
storage_options=None,
engine_kwargs=None,
overwrite=False,
retryit=None,
):
"""
Repartition and reorder dataframe spatially along a Hilbert space filling curve
Expand Down Expand Up @@ -243,52 +246,29 @@ def pack_partitions_to_parquet(
DaskGeoDataFrame backed by newly written parquet dataset
"""
from .io import read_parquet, read_parquet_dask
from .io.utils import validate_coerce_filesystem
from .io.utils import (
_make_fs_retry,
_make_retry_decorator,
validate_coerce_filesystem,
)

engine_kwargs = engine_kwargs or {}

# Get fsspec filesystem object
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
filesystem = validate_coerce_filesystem(
path,
filesystem,
storage_options,
)

# Decorator for operations that should be retried
if _retry_args is None:
_retry_args = dict(
wait_exponential_multiplier=100,
wait_exponential_max=120000,
stop_max_attempt_number=24,
)
retryit = retry(**_retry_args)

@retryit
def rm_retry(file_path):
filesystem.invalidate_cache()
if filesystem.exists(file_path):
filesystem.rm(file_path, recursive=True)
if filesystem.exists(file_path):
# Make sure we keep retrying until file does not exist
raise ValueError("Deletion of {path} not yet complete".format(
path=file_path
))
retryit = retryit or _make_retry_decorator(**(_retry_args or {}))

@retryit
def mkdirs_retry(dir_path):
filesystem.makedirs(dir_path, exist_ok=True)

# For filesystems that provide a "refresh" argument, set it to True
if 'refresh' in signature(filesystem.ls).parameters:
ls_kwargs = {'refresh': True}
else:
ls_kwargs = {}

@retryit
def ls_retry(dir_path):
filesystem.invalidate_cache()
return filesystem.ls(dir_path, **ls_kwargs)

@retryit
def move_retry(p1, p2):
if filesystem.exists(p1):
filesystem.move(p1, p2)
fs_retry = _make_fs_retry(filesystem, retryit)
rm_retry = fs_retry.rm_retry
mkdirs_retry = fs_retry.mkdirs_retry
ls_retry = fs_retry.ls_retry
move_retry = fs_retry.move_retry

# Compute tempdir_format string
dataset_uuid = str(uuid.uuid4())
Expand Down Expand Up @@ -406,7 +386,7 @@ def read_parquet_retry(parts_tmp_path, subpart_paths, part_output_path):
**engine_kwargs,
)

ls_res = sorted(filesystem.ls(parts_tmp_path, **ls_kwargs))
ls_res = sorted(ls_retry(parts_tmp_path))
subpart_paths_stripped = sorted([filesystem._strip_protocol(_) for _ in subpart_paths])

if subpart_paths_stripped != ls_res:
Expand Down
158 changes: 157 additions & 1 deletion spatialpandas/io/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import math
import time
from functools import wraps
from inspect import signature
from os import PathLike
from pathlib import Path
from typing import Any, Dict, Iterable, Optional, Union

import fsspec

try:
from tenacity import (
retry as tenacity_retry,
stop_after_attempt,
wait_exponential,
)
except ImportError:
tenacity_retry = stop_after_attempt = wait_exponential = None

try:
from retrying import retry as retrying_retry
except ImportError:
retrying_retry = None

PathType = Union[PathLike, str, Path]


Expand Down Expand Up @@ -49,7 +67,145 @@ def _maybe_prepend_protocol(
) -> Iterable[PathType]:
protocol = filesystem.protocol if isinstance(
filesystem.protocol, str) else filesystem.protocol[0]
if protocol not in ("file", "abstract"):
if protocol and protocol not in ("file", "abstract"):
# Add back prefix (e.g. s3://)
paths = ["{proto}://{p}".format(proto=protocol, p=p) for p in paths]
return paths


def retry(tries, delay=3, backoff=2, max_delay=120, exceptions=Exception):
"""Retry decorator with exponential backoff.

Retries a function or method until it returns True.
Based on https://wiki.python.org/moin/PythonDecoratorLibrary#Retry

Parameters
----------
delay:
Sets the initial delay in seconds, and backoff sets the factor by which
the delay should lengthen after each failure.
backoff:
Must be greater than 1, or else it isn't really a backoff.
tries:
Must be at least 0, and delay greater than 0.
max_delay:
Maximum delay to wait between tries.
exceptions:
Single or multiple exceptions to allow.
"""
if backoff <= 1:
raise ValueError("backoff must be greater than 1")

tries = math.floor(tries)
if tries < 0:
raise ValueError("tries must be 0 or greater")

if delay <= 0:
raise ValueError("delay must be greater than 0")

def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay # make mutable

while mtries > 0:
try:
rv = f(*args, **kwargs)
return rv
except exceptions as e:
mtries -= 1 # consume an attempt
time.sleep(mdelay) # wait...
mdelay *= backoff # make future wait longer
mdelay = min(mdelay, max_delay)
if mtries <= 0:
raise e

return f_retry # true decorator -> decorated function

return deco_retry # @retry(arg[, ...]) -> true decorator


def _make_retry_decorator(
*args,
retry_lib=None,
stop_max_attempt_number=24,
wait_exponential_max=120000,
wait_exponential_multiplier=100,
**kwargs,
):
if tenacity_retry and stop_after_attempt and wait_exponential and (retry_lib is None or retry_lib == "tenacity"):
stop = kwargs.pop("stop", stop_after_attempt(stop_max_attempt_number))
wait = kwargs.pop("wait", wait_exponential(
multiplier=wait_exponential_multiplier,
max=wait_exponential_max / 1000,
))
reraise = kwargs.pop("reraise", True)
retryer = tenacity_retry(
*args,
reraise=reraise,
stop=stop,
wait=wait,
**kwargs,
)
elif retrying_retry and (retry_lib is None or retry_lib == "retrying"):
retryer = retrying_retry(
*args,
wait_exponential_multiplier=wait_exponential_multiplier,
wait_exponential_max=wait_exponential_max,
stop_max_attempt_number=stop_max_attempt_number,
**kwargs,
)
else:
delay = kwargs.pop("delay", 1)
retryer = retry(
stop_max_attempt_number,
*args,
delay=delay,
backoff=wait_exponential_multiplier,
max_delay=wait_exponential_max / 1000,
**kwargs,
)
return retryer


def _make_fs_retry(filesystem, retryit=None):
retryit = retryit or _make_retry_decorator()

# For filesystems that provide a "refresh" argument, set it to True
if 'refresh' in signature(filesystem.ls).parameters:
ls_kwargs = {'refresh': True}
else:
ls_kwargs = {}

class FSretry:

@staticmethod
@retryit
def rm_retry(file_path):
filesystem.invalidate_cache()
if filesystem.exists(file_path):
filesystem.rm(file_path, recursive=True)
if filesystem.exists(file_path):
# Make sure we keep retrying until file does not exist
raise ValueError(
"Deletion of {path} not yet complete".format(
path=file_path))

@staticmethod
@retryit
def mkdirs_retry(dir_path):
filesystem.makedirs(dir_path, exist_ok=True)

@staticmethod
@retryit
def ls_retry(dir_path):
filesystem.invalidate_cache()
return filesystem.ls(dir_path, **ls_kwargs)

@staticmethod
@retryit
def move_retry(p1, p2):
if filesystem.exists(p1):
filesystem.move(p1, p2)

return FSretry
6 changes: 3 additions & 3 deletions spatialpandas/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ def test_pack_partitions_to_parquet(gp_multipoint, gp_multiline,
tempdir_format = None

_retry_args = dict(
wait_exponential_multiplier=10,
wait_exponential_max=20000,
stop_max_attempt_number=4
stop_max_attempt_number=2,
wait_exponential_max=4000,
wait_exponential_multiplier=2,
)

ddf_packed = ddf.pack_partitions_to_parquet(
Expand Down