From 3c31bfc26d398024a778332188d6bdab60c1445b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Fri, 11 Oct 2024 22:34:19 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Filter=20too=20high/low=20data=20an?= =?UTF-8?q?d=20send=20a=20Sentry=20event=20when=20new=20data=20exceeding?= =?UTF-8?q?=20those=20limits=20is=20scraped?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ddj_cloud/scrapers/talsperren/talsperren.py | 46 ++++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/ddj_cloud/scrapers/talsperren/talsperren.py b/ddj_cloud/scrapers/talsperren/talsperren.py index 2f57ed9..c4c7e6d 100644 --- a/ddj_cloud/scrapers/talsperren/talsperren.py +++ b/ddj_cloud/scrapers/talsperren/talsperren.py @@ -14,19 +14,54 @@ ) from . import locator_maps -from .common import Exporter, Federation, ReservoirMeta, to_parquet_bio +from .common import Exporter, Federation, ReservoirMeta, ReservoirRecord, to_parquet_bio IGNORE_LIST = [ "Rurtalsperre Gesamt", ] +# Thresholds for fill percentage +FILL_RATIO_THRESHOLD_LOW = 0.10 # 10% +FILL_RATIO_THRESHOLD_HIGH = 1.05 # 105% + + def _cleanup_old_data(df: pd.DataFrame) -> pd.DataFrame: ### CLEANUP ### ... return df +def _notify_about_bad_data(df: pd.DataFrame): + """ + Send a notification to Sentry if the fill percentage is above or below the configured threshold. + """ + df_bad = df.loc[ + ((df["content_mio_m3"] / df["capacity_mio_m3"]) < FILL_RATIO_THRESHOLD_LOW) + | ((df["content_mio_m3"] / df["capacity_mio_m3"]) > FILL_RATIO_THRESHOLD_HIGH) + ] + + affected_reservoirs = df_bad["name"].unique() + affected_federations = df_bad["federation_name"].unique() + + if len(df_bad) > 0: + msg = f"Found {len(df_bad)} rows with fill percentages above or below the threshold. " + msg += f"Affected federations: {', '.join(affected_federations)}. " + msg += f"Affected reservoirs: {', '.join(affected_reservoirs)}." + print(msg) + sentry_sdk.capture_message(msg) + + +def _filter_bad_data(df: pd.DataFrame) -> pd.DataFrame: + """ + Remove rows where the fill percentage is above or below the configured threshold. + """ + return df.loc[ + (df["fill_percent"] > (FILL_RATIO_THRESHOLD_LOW * 100)) + | (df["fill_percent"] < (FILL_RATIO_THRESHOLD_HIGH * 100)) + ] + + def _get_base_dataset(): # Download existing data df_db = None @@ -47,7 +82,7 @@ def _get_base_dataset(): federations = [cls() for cls in federation_classes] # Get data from all federations - data = [] + data: list[ReservoirRecord] = [] for federation in federations: try: data.extend(federation.get_data(start=start)) @@ -59,6 +94,9 @@ def _get_base_dataset(): # Parse into data frame df_new = pd.DataFrame(data) + # Notify about bad data + _notify_about_bad_data(df_new) + # Cast ts_measured to datetime df_new["ts_measured"] = pd.to_datetime(df_new["ts_measured"], utc=True) @@ -108,6 +146,10 @@ def _get_base_dataset(): axis=1, ) + # Filter bad data + df = df.pipe(_filter_bad_data) + + df.reset_index(drop=True, inplace=True) return df