Skip to content

Commit

Permalink
Merge pull request #157 from zmoon/fix/ish2
Browse files Browse the repository at this point in the history
Add retry and timeout options for ISH reader
  • Loading branch information
zmoon authored Feb 29, 2024
2 parents 55c7902 + 924046b commit a5aa524
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 25 deletions.
82 changes: 65 additions & 17 deletions monetio/obs/ish.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def add_data(
window="H",
download=False,
n_procs=1,
request_timeout=10,
request_retries=4,
verbose=False,
):
"""Retrieve and load ISH data as a DataFrame.
Expand All @@ -38,6 +40,10 @@ def add_data(
Resampling window, e.g. ``'3H'``.
n_procs : int
For Dask.
request_timeout : float
Timeout (seconds) for requests when downloading ISH data files.
request_retries : int
Number of retries for requests when downloading ISH data files.
verbose : bool
Print debugging messages.
Expand All @@ -56,6 +62,8 @@ def add_data(
window=window,
download=download,
n_procs=n_procs,
request_timeout=request_timeout,
request_retries=request_retries,
verbose=verbose,
)
return df
Expand Down Expand Up @@ -192,7 +200,7 @@ def _decode_bytes(df):
)
return df

def read_data_frame(self, url_or_file):
def read_data_frame(self, url_or_file, *, request_timeout=10, request_retries=4):
"""Create a data frame from an ISH file.
URL is assumed if `url_or_file` is a string that starts with ``http``.
Expand All @@ -203,8 +211,24 @@ def read_data_frame(self, url_or_file):

import requests

r = requests.get(url_or_file, timeout=10, stream=True)
r.raise_for_status()
if not request_retries >= 0:
raise ValueError(f"`request_retries` must be >= 0, got {request_retries!r}")

tries = 0
while tries - 1 < request_retries:
try:
r = requests.get(url_or_file, timeout=request_timeout, stream=True)
r.raise_for_status()
except requests.exceptions.RequestException as e:
tries += 1
if tries - 1 == request_retries:
raise RuntimeError(
f"Failed to connect to server for URL {url_or_file}. "
f"timeout={request_timeout}, retries={request_retries}."
) from e
else:
break

with gzip.open(io.BytesIO(r.content), "rb") as f:
frame_as_array = np.genfromtxt(f, delimiter=self.WIDTHS, dtype=self.DTYPES)
else:
Expand Down Expand Up @@ -283,6 +307,8 @@ def add_data(
window="H",
download=False,
n_procs=1,
request_timeout=10,
request_retries=4,
verbose=False,
):
"""Retrieve and load ISH data as a DataFrame.
Expand All @@ -302,6 +328,10 @@ def add_data(
Resampling window, e.g. ``'3H'``.
n_procs : int
For Dask.
request_timeout : float
Timeout (seconds) for requests when downloading ISH data files.
request_retries : int
Number of retries for requests when downloading ISH data files.
verbose : bool
Print debugging messages.
Expand Down Expand Up @@ -344,24 +374,36 @@ def add_data(
if download:
objs = self.get_url_file_objs(urls.name)
print(" Reading ISH into pandas DataFrame...")
dfs = [dask.delayed(self.read_data_frame)(f) for f in objs]

def func(fname):
return self.read_data_frame(
fname,
request_timeout=request_timeout,
request_retries=request_retries,
)

dfs = [dask.delayed(func)(f) for f in objs]
dff = dd.from_delayed(dfs)
self.df = dff.compute(num_workers=n_procs)
else:
if verbose:
print(f"Aggregating {len(urls.name)} URLs...")
self.df = self.aggregrate_files(urls, n_procs=n_procs)
self.df = self.aggregrate_files(
urls,
n_procs=n_procs,
request_timeout=request_timeout,
request_retries=request_retries,
)

if resample and not self.df.empty:
if verbose:
print("Resampling to every " + window)
self.df.index = self.df.time
self.df = self.df.groupby("station_id").resample(window).mean().reset_index()
# TODO: mean(numeric_only=True)

self.df = self.df.merge(dfloc, on="station_id", how="left")
self.df = self.df.rename(columns={"station_id": "siteid", "ctry": "country"}).drop(
columns=["fname"]
)
self.df = self.df.rename(columns={"station_id": "siteid", "ctry": "country"})

return self.df

Expand Down Expand Up @@ -463,14 +505,13 @@ def build_urls(self, dates=None, sites=None):
all_urls = pd.read_html(f"{url}/{year}/")[0]["Name"].iloc[2:-1].to_frame(name="name")
all_urls = f"{url}/{year}/" + all_urls

# get the dfloc meta data
sites["fname"] = sites.usaf.astype(str) + "-" + sites.wban.astype(str) + "-"
for date in unique_years.strftime("%Y"):
sites["fname"] = (
sites.usaf.astype(str) + "-" + sites.wban.astype(str) + "-" + date + ".gz"
# Construct expected URLs based on sites and year(s) requested
for syear in unique_years.strftime("%Y"):
year_fnames = (
sites.usaf.astype(str) + "-" + sites.wban.astype(str) + "-" + syear + ".gz"
)
for fname in sites.fname.values:
furls.append(f"{url}/{date[0:4]}/{fname}")
for fname in year_fnames:
furls.append(f"{url}/{syear}/{fname}")

# files needed for comparison
url = pd.Series(furls, index=None)
Expand All @@ -480,7 +521,7 @@ def build_urls(self, dates=None, sites=None):

return final_urls

def aggregrate_files(self, urls, n_procs=1):
def aggregrate_files(self, urls, n_procs=1, request_timeout=10, request_retries=4):
import dask
import dask.dataframe as dd

Expand All @@ -493,7 +534,14 @@ def aggregrate_files(self, urls, n_procs=1):
# print(u)
# dfs.append(self.read_csv(u))

dfs = [dask.delayed(self.read_data_frame)(f) for f in urls.name]
def func(url):
return self.read_data_frame(
url,
request_timeout=request_timeout,
request_retries=request_retries,
)

dfs = [dask.delayed(func)(url) for url in urls.name]
dff = dd.from_delayed(dfs)
df = dff.compute(num_workers=n_procs)

Expand Down
16 changes: 8 additions & 8 deletions monetio/obs/ish_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,13 @@ def build_urls(self, dates=None, sites=None):
all_urls = pd.read_html(f"{url}/{year}/")[0]["Name"].iloc[2:-1].to_frame(name="name")
all_urls = f"{url}/{year}/" + all_urls

# Get the meta data
sites["fname"] = sites.usaf.astype(str) + "-" + sites.wban.astype(str) + "-"
for date in unique_years.strftime("%Y"):
sites["fname"] = (
sites.usaf.astype(str) + "-" + sites.wban.astype(str) + "-" + date + ".gz"
# Construct expected URLs based on sites and year(s) requested
for syear in unique_years.strftime("%Y"):
year_fnames = (
sites.usaf.astype(str) + "-" + sites.wban.astype(str) + "-" + syear + ".gz"
)
for fname in sites.fname.values:
furls.append(f"{url}/{date[0:4]}/{fname}")
for fname in year_fnames:
furls.append(f"{url}/{syear}/{fname}")

# Files needed for comparison
url = pd.Series(furls, index=None)
Expand Down Expand Up @@ -318,12 +317,13 @@ def add_data(
if resample and not df.empty:
print("Resampling to every " + window)
df = df.set_index("time").groupby("siteid").resample(window).mean().reset_index()
# TODO: mean(numeric_only=True)

# Add site metadata
df = pd.merge(df, dfloc, how="left", left_on="siteid", right_on="station_id").rename(
columns={"ctry": "country"}
)
return df.drop(["station_id", "fname"], axis=1)
return df.drop(["station_id"], axis=1)

def get_url_file_objs(self, fname):
"""Short summary.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ filterwarnings = [
"ignore:The NPY_CHAR type_num is deprecated. Please port your code to use NPY_STRING instead.:DeprecationWarning::",
"ignore:'cgi' is deprecated and slated for removal in Python 3.13:DeprecationWarning::",
"ignore:The default dtype for empty Series will be 'object' instead of 'float64' in a future version. Specify a dtype explicitly to silence this warning.:FutureWarning::",
"ignore:np.find_common_type is deprecated.:DeprecationWarning:pandas:",
]
16 changes: 16 additions & 0 deletions tests/test_ish.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,19 @@ def test_ish_read_url_direct():
assert set(orig_names) - set(df.columns) == {"date", "htime", "latitude", "longitude"}

assert type(df.t_quality[0]) == str


def test_ish_small_timeout_fails():
dates = pd.date_range("2020-09-01", "2020-09-02")
site = "72224400358" # "College Park AP"

with pytest.raises(RuntimeError, match="^Failed to connect"):
ish.add_data(dates, site=site, request_timeout=1e-6, request_retries=0)


def test_ish_bad_retries_error():
dates = pd.date_range("2020-09-01", "2020-09-02")
site = "72224400358" # "College Park AP"

with pytest.raises(ValueError, match="^`request_retries` must be >= 0"):
ish.add_data(dates, site=site, request_retries=-1)

0 comments on commit a5aa524

Please sign in to comment.