Skip to content

Commit

Permalink
Merge pull request #15 from tgrandje/bugfix/check_loops_results
Browse files Browse the repository at this point in the history
Bugfix/check loops results
  • Loading branch information
tgrandje authored Oct 17, 2024
2 parents ac25ad1 + e4f98b5 commit f64be27
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 98 deletions.
2 changes: 1 addition & 1 deletion cl_hubeau/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
"SIZE": 1000, # Default size for each API's result
"RATE_LIMITER": 10, # queries per second
"TQDM_LEAVE": None, # keep tqdm progressbar after completion
"THREADS": 10, # Max number of threads to permform one request to an endpoint
"THREADS": 10, # Max number of threads to perform a request to an endpoint
}
76 changes: 30 additions & 46 deletions cl_hubeau/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from requests.exceptions import JSONDecodeError
from requests_cache import CacheMixin
from requests_ratelimiter import LimiterMixin
from tqdm import tqdm
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

from cl_hubeau.constants import DIR_CACHE, CACHE_NAME, RATELIMITER_NAME
from cl_hubeau import _config
Expand All @@ -30,7 +31,6 @@ def map_func(
threads: int,
func: Callable,
iterables: list,
disable: bool = False,
) -> list:
"""
Map a function against an iterable of arguments.
Expand All @@ -52,8 +52,6 @@ def map_func(
Function do map
iterables : list
Collection of arguments for func
disable : bool, optional
Set to True do disable the tqdm progressbar. The default is False
Returns
-------
Expand All @@ -62,36 +60,26 @@ def map_func(
"""

total = len(iterables)
results = []
with tqdm(
desc="querying",
total=total,
leave=_config["TQDM_LEAVE"],
disable=disable,
position=tqdm._get_free_pos(),
) as pbar:

if threads > 1:
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
".*Connection pool is full, discarding connection.*",
)
with pebble.ThreadPool(threads) as pool:
future = pool.map(func, iterables)
iterator = future.result()
while True:
try:
results += next(iterator)
except StopIteration:
break

pbar.update()
else:
for x in iterables:
results += func(x)
pbar.update()

if threads > 1:
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
".*Connection pool is full, discarding connection.*",
)
with pebble.ThreadPool(threads) as pool:
future = pool.map(func, iterables)
iterator = future.result()
while True:
try:
results += next(iterator)
except StopIteration:
break

else:
for x in iterables:
results += func(x)

return results

Expand Down Expand Up @@ -181,6 +169,13 @@ def __init__(
}
)

retry = Retry(
10, backoff_factor=1, status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
self.mount("http://", adapter)
self.mount("https://", adapter)

@staticmethod
def list_to_str_param(
x: list,
Expand Down Expand Up @@ -382,7 +377,6 @@ def func(params):
cursor = parse_qs(urlparse(next_url).query)["cursor"][0]
except KeyError:
yield result
pbar.update()
try:
new_params = deepcopy(params)
new_params["cursor"] = cursor
Expand All @@ -391,24 +385,14 @@ def func(params):
except UnboundLocalError:
pass

# Deactivate progress bar if less pages than available threads
disable = count_pages <= threads

if page == "page":
# if integer cursor ("page" param), use multithreading to gather
# data faster
results = map_func(threads, func, iterables, disable)
results = map_func(threads, func, iterables)
else:
# if hashed cursor ("cursor" param), use recursive function to
# gather all results
with tqdm(
desc="querying",
total=count_pages,
leave=_config["TQDM_LEAVE"],
disable=disable,
position=tqdm._get_free_pos(),
) as pbar:
results = [y for x in func(params) for y in x]
results = [y for x in func(params) for y in x]

if "format" in params and params["format"] == "geojson":
if results:
Expand Down
18 changes: 13 additions & 5 deletions cl_hubeau/utils/prepare_loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@


def prepare_kwargs_loops(
key_start: str, key_end: str, kwargs: dict, start_auto_determination: bool
key_start: str,
key_end: str,
kwargs: dict,
start_auto_determination: bool,
split_months: int = 6,
) -> dict:
"""
Prepare a list of kwargs of arguments to prepare a temporal loop.
Expand All @@ -27,6 +31,8 @@ def prepare_kwargs_loops(
kwargs passed to a higher level function.
start_auto_determination : bool
Whether the dates were automatically set by the algorithm.
split_months : int
Number of consecutive months to split the data into
Returns
-------
Expand All @@ -39,10 +45,12 @@ def prepare_kwargs_loops(
"""
start = datetime.strptime(kwargs.pop(key_start), "%Y-%m-%d").date()
end = datetime.strptime(kwargs.pop(key_end), "%Y-%m-%d").date()
ranges = pd.date_range(start, end=end, freq="6ME")
dates = pd.Series(ranges).to_frame("max")
dates["min"] = dates["max"].shift(1) + timedelta(days=1)
dates = dates.dropna().loc[:, ["min", "max"]]

ranges = pd.date_range(start, end=end, freq=f"{split_months}MS")
dates = pd.Series(ranges).to_frame("min")
dates["max"] = dates["min"].shift(-1) - timedelta(days=1)
dates.at[dates.index.max(), "max"] = pd.Timestamp(end)

for d in "min", "max":
dates[d] = dates[d].dt.strftime("%Y-%m-%d")
dates = dates.reset_index(drop=True)
Expand Down
62 changes: 39 additions & 23 deletions cl_hubeau/watercourses_flow/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import date
import geopandas as gpd
import pandas as pd
from tqdm import tqdm
from datetime import date, datetime
from itertools import product

from cl_hubeau.watercourses_flow.watercourses_flow_scraper import (
WatercoursesFlowSession,
)
from cl_hubeau import _config
from cl_hubeau.utils import get_departements, prepare_kwargs_loops
from cl_hubeau.utils import (
get_departements,
prepare_kwargs_loops,
)


def get_all_stations(**kwargs) -> gpd.GeoDataFrame:
Expand All @@ -18,9 +22,9 @@ def get_all_stations(**kwargs) -> gpd.GeoDataFrame:
Parameters
----------
**kwargs :
kwargs passed to WatercoursesFlowSession.get_stations (hence mostly intended
for hub'eau API's arguments). Do not use `format` or `code_departement`
as they are set by the current function.
kwargs passed to WatercoursesFlowSession.get_stations (hence mostly
intended for hub'eau API's arguments). Do not use `format` or
`code_departement` as they are set by the current function.
Returns
-------
Expand All @@ -33,7 +37,9 @@ def get_all_stations(**kwargs) -> gpd.GeoDataFrame:

deps = get_departements()
results = [
session.get_stations(code_departement=dep, format="geojson", **kwargs)
session.get_stations(
code_departement=dep, format="geojson", **kwargs
)
for dep in tqdm(
deps,
desc="querying dep/dep",
Expand All @@ -58,18 +64,16 @@ def get_all_observations(**kwargs) -> gpd.GeoDataFrame:
Parameters
----------
**kwargs :
kwargs passed to WatercoursesFlowSession.get_observations (hence mostly intended
for hub'eau API's arguments). Do not use `format` or `code_departement`
as they are set by the current function.
kwargs passed to WatercoursesFlowSession.get_observations (hence mostly
intended for hub'eau API's arguments). Do not use `format` as this is
set by the current function.
Returns
-------
results : gpd.GeoDataFrame
GeoDataFrame of observations
"""

deps = get_departements()

# Set a loop for yearly querying as dataset are big
start_auto_determination = False
if "date_observation_min" not in kwargs:
Expand All @@ -78,9 +82,15 @@ def get_all_observations(**kwargs) -> gpd.GeoDataFrame:
if "date_observation_max" not in kwargs:
kwargs["date_observation_max"] = date.today().strftime("%Y-%m-%d")

# deps = get_departements()

# ranges = pd.date_range(
# start=datetime.strptime(kwargs.pop("date_observation_min"), "%Y-%m-%d").date(),
# end=datetime.strptime(kwargs.pop("date_observation_max"), "%Y-%m-%d").date(),
# start=datetime.strptime(
# kwargs.pop("date_observation_min"), "%Y-%m-%d"
# ).date(),
# end=datetime.strptime(
# kwargs.pop("date_observation_max"), "%Y-%m-%d"
# ).date(),
# )
# dates = pd.Series(ranges).to_frame("date")
# dates["year"] = dates["date"].dt.year
Expand All @@ -91,7 +101,7 @@ def get_all_observations(**kwargs) -> gpd.GeoDataFrame:
# dates = pd.concat(
# [
# dates,
# pd.DataFrame([{"min": "1900-01-01", "max": "2015-12-31"}]),
# pd.DataFrame([{"min": "1900-01-01", "max": "1959-12-31"}]),
# ],
# ignore_index=False,
# ).sort_index()
Expand All @@ -110,19 +120,22 @@ def get_all_observations(**kwargs) -> gpd.GeoDataFrame:
# )
# for chunk, (date_min, date_max) in tqdm(
# args,
# desc="querying station/station and year/year",
# desc="querying dep/dep and year/year",
# leave=_config["TQDM_LEAVE"],
# position=tqdm._get_free_pos(),
# )
# ]

desc = "querying year/year" + (" & dep/dep" if "code_departement" in kwargs else "")
desc = "querying 4months/4months" + (
" & dep/dep" if "code_departement" in kwargs else ""
)

kwargs_loop = prepare_kwargs_loops(
"date_observation_min",
"date_observation_max",
kwargs,
start_auto_determination,
split_months=4,
)

with WatercoursesFlowSession() as session:
Expand All @@ -143,6 +156,7 @@ def get_all_observations(**kwargs) -> gpd.GeoDataFrame:

results = [x.dropna(axis=1, how="all") for x in results if not x.empty]
results = pd.concat(results, ignore_index=True)
results = results.drop_duplicates().reset_index()
return results


Expand All @@ -153,9 +167,9 @@ def get_all_campagnes(**kwargs) -> gpd.GeoDataFrame:
Parameters
----------
**kwargs :
kwargs passed to WatercoursesFlowSession.get_campagnes (hence mostly intended
for hub'eau API's arguments). Do not use `code_departement`
as they are set by the current function.
kwargs passed to WatercoursesFlowSession.get_campagnes (hence mostly
intended for hub'eau API's arguments). Do not use `code_departement`
as this is set by the current function.
Returns
-------
Expand All @@ -178,12 +192,14 @@ def get_all_campagnes(**kwargs) -> gpd.GeoDataFrame:
position=tqdm._get_free_pos(),
)
]
results = [x.dropna(axis=1, how="all") for x in results if not x.empty]
results = [
x.dropna(axis=1, how="all") for x in results if not x.empty
]
results = gpd.pd.concat(results, ignore_index=True)
return results


# if __name__ == "__main__":
# # print(get_all_stations())
# # print(get_all_observations())
# print(get_all_campagnes())
# df1 = get_all_observations()
# # print(get_all_campagnes())
15 changes: 9 additions & 6 deletions cl_hubeau/watercourses_flow/watercourses_flow_scraper.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 13 10:57:00 2024
low level class to collect data from the watercourses-flow API from hub'eau
"""

import pandas as pd

from cl_hubeau.session import BaseHubeauSession


Expand All @@ -19,7 +18,6 @@ def __init__(self, *args, **kwargs):

super().__init__(version="1.0.0", *args, **kwargs)

# TODO où trouve-t-on cette taille dans la doc ?
# Set default size for API queries, based on hub'eau piezo's doc
self.size = 1000

Expand All @@ -45,7 +43,9 @@ def get_stations(self, **kwargs):
pass

try:
params["bbox"] = self.list_to_str_param(kwargs.pop("bbox"), None, 4)
params["bbox"] = self.list_to_str_param(
kwargs.pop("bbox"), None, 4
)
except KeyError:
pass

Expand Down Expand Up @@ -135,7 +135,9 @@ def get_observations(self, **kwargs):
pass

try:
params["bbox"] = self.list_to_str_param(kwargs.pop("bbox"), None, 4)
params["bbox"] = self.list_to_str_param(
kwargs.pop("bbox"), None, 4
)
except KeyError:
pass

Expand Down Expand Up @@ -278,7 +280,8 @@ def get_campagnes(self, **kwargs):
params["libelle_type_campagne"] = variable.capitalize()
else:
raise ValueError(
"libelle_type_campagne must be among ('Usuelle', 'Complémentaire'), "
"libelle_type_campagne must be among "
"('Usuelle', 'Complémentaire'), "
f"found sort='{variable}' instead"
)
except KeyError:
Expand Down
Loading

0 comments on commit f64be27

Please sign in to comment.