Skip to content

Commit

Permalink
🔀 Merge branch 'main' into production
Browse files Browse the repository at this point in the history
  • Loading branch information
jh0ker committed Oct 13, 2024
2 parents 73a2775 + 8dd7ab5 commit 9b15d79
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions ddj_cloud/scrapers/talsperren/talsperren.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,54 @@
)

from . import locator_maps
from .common import Exporter, Federation, ReservoirMeta, to_parquet_bio
from .common import Exporter, Federation, ReservoirMeta, ReservoirRecord, to_parquet_bio

IGNORE_LIST = [
"Rurtalsperre Gesamt",
]


# Thresholds for fill percentage
FILL_RATIO_THRESHOLD_LOW = 0.10 # 10%
FILL_RATIO_THRESHOLD_HIGH = 1.05 # 105%


def _cleanup_old_data(df: pd.DataFrame) -> pd.DataFrame:
### CLEANUP ###
...
return df


def _notify_about_bad_data(df: pd.DataFrame):
"""
Send a notification to Sentry if the fill percentage is above or below the configured threshold.
"""
df_bad = df.loc[
((df["content_mio_m3"] / df["capacity_mio_m3"]) < FILL_RATIO_THRESHOLD_LOW)
| ((df["content_mio_m3"] / df["capacity_mio_m3"]) > FILL_RATIO_THRESHOLD_HIGH)
]

affected_reservoirs = df_bad["name"].unique()
affected_federations = df_bad["federation_name"].unique()

if len(df_bad) > 0:
msg = f"Found {len(df_bad)} rows with fill percentages above or below the threshold. "
msg += f"Affected federations: {', '.join(affected_federations)}. "
msg += f"Affected reservoirs: {', '.join(affected_reservoirs)}."
print(msg)
sentry_sdk.capture_message(msg)


def _filter_bad_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Remove rows where the fill percentage is above or below the configured threshold.
"""
return df.loc[
(df["fill_percent"] > (FILL_RATIO_THRESHOLD_LOW * 100))
& (df["fill_percent"] < (FILL_RATIO_THRESHOLD_HIGH * 100))
]


def _get_base_dataset():
# Download existing data
df_db = None
Expand All @@ -47,7 +82,7 @@ def _get_base_dataset():
federations = [cls() for cls in federation_classes]

# Get data from all federations
data = []
data: list[ReservoirRecord] = []
for federation in federations:
try:
data.extend(federation.get_data(start=start))
Expand All @@ -59,6 +94,9 @@ def _get_base_dataset():
# Parse into data frame
df_new = pd.DataFrame(data)

# Notify about bad data
_notify_about_bad_data(df_new)

# Cast ts_measured to datetime
df_new["ts_measured"] = pd.to_datetime(df_new["ts_measured"], utc=True)

Expand Down Expand Up @@ -108,6 +146,10 @@ def _get_base_dataset():
axis=1,
)

# Filter bad data
df = df.pipe(_filter_bad_data)

df.reset_index(drop=True, inplace=True)
return df


Expand Down

0 comments on commit 9b15d79

Please sign in to comment.