Skip to content

Commit

Permalink
🔀 Merge pull request #7 from wdr-data/main
Browse files Browse the repository at this point in the history
Prod merge: Talsperren scraper, general updates & maintenance
  • Loading branch information
jh0ker authored Dec 18, 2023
2 parents 184367f + 53421b1 commit 7c61263
Show file tree
Hide file tree
Showing 32 changed files with 3,969 additions and 954 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ jobs:
strategy:
matrix:
node-version: ["18.x"]
python-version: ["3.10"]
pipenv-version: ["2023.7.23"]
python-version: ["3.11"]
pipenv-version: ["2023.10.24"]

steps:
# Checkout repository
Expand Down
9 changes: 4 additions & 5 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,20 @@ requests = "~=2.28"
boto3 = "~=1.24"
google-cloud-bigquery = "~=3.2"
google-api-python-client = "~=2.54"
db-dtypes = "~=1.0"
sentry-sdk = "~=1.8"
certifi = "*"
tzdata = "*"
datawrapper = "~=0.5.3"
fastparquet = "2023.10.1"

[dev-packages]
click = "~=8.1"
black = "*"
copier = "~=6.1"
copier-templates-extensions = "~=0.2"
pydantic = "<2"
copier = "~=9.0"
copier-templates-extensions = "~=0.3"
python-slugify = "~=6.1"
pipenv = "*"
colorama = "*"

[requires]
python_version = "3.10"
python_version = "3.11"
1,319 changes: 754 additions & 565 deletions Pipfile.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ def run():
dt_earliest = df["acq_datetime"].min()
dt_latest = df["acq_datetime"].max()

if dt_latest.date() != dt_earliest.date():
if len(df) == 0:
dt_range_str = "in den letzten 24-48 Stunden"
elif dt_latest.date() != dt_earliest.date():
dt_earliest_str = dt_earliest.astimezone(TZ_BERLIN).strftime("%d.%m.%Y um %H:%M")
dt_latest_str = dt_latest.astimezone(TZ_BERLIN).strftime("%d.%m.%Y um %H:%M")
dt_range_str = f"vom {dt_earliest_str} bis zum {dt_latest_str}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def run():
languages=["en"],
).strftime("%d.%m.%Y um %H:%M")

if dt_latest.date() != dt_earliest.date():
if len(df) == 0:
dt_range_str = "in den letzten 24-48 Stunden"
elif dt_latest.date() != dt_earliest.date():
dt_earliest_str = dt_earliest.astimezone(TZ_BERLIN).strftime("%d.%m.%Y um %H:%M")
dt_latest_str = dt_latest.astimezone(TZ_BERLIN).strftime("%d.%m.%Y um %H:%M")
dt_range_str = f"vom {dt_earliest_str} bis zum {dt_latest_str}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ def run():
dt_earliest = df["acq_datetime"].min()
dt_latest = df["acq_datetime"].max()

if dt_latest.date() != dt_earliest.date():
if len(df) == 0:
dt_range_str = "in den letzten 24-48 Stunden"
elif dt_latest.date() != dt_earliest.date():
dt_earliest_str = dt_earliest.astimezone(TZ_BERLIN).strftime("%d.%m.%Y um %H:%M")
dt_latest_str = dt_latest.astimezone(TZ_BERLIN).strftime("%d.%m.%Y um %H:%M")
dt_range_str = f"vom {dt_earliest_str} bis zum {dt_latest_str}"
Expand Down
13 changes: 13 additions & 0 deletions ddj_cloud/scrapers/talsperren/.copier-answers.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Changes here will be overwritten by Copier; NEVER EDIT MANUALLY
_src_path: X:\work\ddj\wdr-ddj-cloud\scraper_template
contact_email: [email protected]
contact_name: Jannes Höke
description: Sammelt die Füllstände verschiedener Talsperren in NRW.
display_name: Talsperren
ephemeral_storage: '512'
interval: hourly
interval_custom: null
memory_size: '512'
module_name: talsperren
preset: pandas

5 changes: 5 additions & 0 deletions ddj_cloud/scrapers/talsperren/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Talsperren

**Contact:** Jannes Höke ([email protected])

Sammelt die Füllstände verschiedener Talsperren in NRW.
8 changes: 8 additions & 0 deletions ddj_cloud/scrapers/talsperren/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Load lxml because for some reason bs4 doesn't find it otherwise?
from lxml import etree

# Ensure that federation subclasses are loaded
from . import federations

# Ensure that exporter subclasses are loaded
from . import exporters
85 changes: 85 additions & 0 deletions ddj_cloud/scrapers/talsperren/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from dataclasses import dataclass
import datetime as dt
from io import BytesIO
from typing import Callable, Generator, Iterable, Optional, TypeVar, Protocol, TypedDict
from zoneinfo import ZoneInfo
import pandas as pd

import sentry_sdk

TZ_UTC = ZoneInfo("UTC")
TZ_BERLIN = ZoneInfo("Europe/Berlin")


@dataclass
class ReservoirRecord:
federation_name: str
name: str
ts_measured: dt.datetime
capacity_mio_m3: float
content_mio_m3: float


class ReservoirMeta(TypedDict):
capacity_mio_m3: float
lat: float
lon: float


class Federation(Protocol):
name: str

reservoirs: dict[str, ReservoirMeta]

def __init__(self) -> None:
...

def get_data(
self,
*,
start: Optional[dt.datetime] = None,
end: Optional[dt.datetime] = None,
) -> Iterable[ReservoirRecord]:
...


T1 = TypeVar("T1")
T2 = TypeVar("T2")


def apply_guarded(
func: Callable[[T2], Optional[T1]],
data: Iterable[T2],
) -> Generator[T1, None, None]:
for item in data:
try:
result = func(item)
if result is not None:
yield result
except Exception as e:
print("Skipping due to error:")
print(e)
sentry_sdk.capture_exception(e)


def to_parquet_bio(df: pd.DataFrame, **kwargs) -> BytesIO:
data: BytesIO = BytesIO()

orig_close = data.close
data.close = lambda: None
try:
df.to_parquet(data, engine="fastparquet", **kwargs)
finally:
data.close = orig_close

return data


class Exporter(Protocol):
filename: str

def __init__(self) -> None:
...

def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
...
16 changes: 16 additions & 0 deletions ddj_cloud/scrapers/talsperren/exporters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pathlib import Path
import importlib

current_dir = Path(__file__).parent

# Get a list of all Python files in the current directory
module_files = current_dir.glob("*.py")

# Import each submodule dynamically to ensure that the federation
# subclasses are loaded
for module_file in module_files:
if module_file.name.startswith("__"):
continue # Skip special files, including self

module_name = module_file.name[:-3] # Remove the ".py" extension
importlib.import_module("." + module_name, package=__name__)
112 changes: 112 additions & 0 deletions ddj_cloud/scrapers/talsperren/exporters/daily.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import pandas as pd
from dateutil.relativedelta import relativedelta

from ddj_cloud.scrapers.talsperren.common import Exporter
from ddj_cloud.utils.date_and_time import local_today_midnight


class DailyExporter(Exporter):
filename = "daily"

def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"])

# Drop all data before one month ago (plus some extra so we don't underfill any medians/means)
df_base = df_base.loc[
df_base["ts_measured"] > local_today_midnight() - relativedelta(months=3)
]

# First, set the 'ts_measured' column as the DataFrame index
df_base.set_index("ts_measured", inplace=True)
df_base.index = df_base.index.tz_convert("Europe/Berlin") # type: ignore

# Group by 'federation_name' and 'name', then resample to daily frequency using median
df_daily: pd.DataFrame = (
df_base.groupby(
["id"],
)
.resample("D")
.aggregate( # type: ignore
{
"content_mio_m3": "median",
"capacity_mio_m3": "median",
}
)
)

# Create a new MultiIndex with all permutations of 'id' and 'ts_measured'
idx = df_daily.index
multi_idx = pd.MultiIndex.from_product(
[idx.get_level_values(level=0).unique(), idx.get_level_values(level=1).unique()],
names=["id", "ts_measured"],
)

# Reindex DataFrame and forward fill missing values from those of the same station
df_daily = df_daily.reindex(multi_idx)
df_daily = df_daily.groupby(level=0).ffill()

# Reconstruct the 'federation_name' (and 'name') columns
# df_weekly["federation_name"], df_weekly["name"] = (
# df_weekly.index.get_level_values(level=0).str.split("_", 1).str
# )
df_daily["federation_name"] = (
df_daily.index.get_level_values(level=0).str.split("_", n=1).str[0]
)

# Drop the 'id' column from the index
df_daily.reset_index(level=0, drop=True, inplace=True)
df_daily.reset_index(inplace=True)
df_daily.set_index("ts_measured", inplace=True)

# Create a new dataframe with columns for each "federation_name"
# with the mean fill_percent for each week across all reservoirs
# of that federation
df_daily_fed = df_daily.groupby(["ts_measured", "federation_name"]).aggregate(
{
"content_mio_m3": sum_nan,
"capacity_mio_m3": sum_nan,
}
)

# Add "fill_percent" columns
df_daily_fed["fill_percent"] = (
df_daily_fed["content_mio_m3"] / df_daily_fed["capacity_mio_m3"] * 100
)

# Drop "content_mio_m3" and "capacity_mio_m3" columns
df_daily_fed.drop(columns=["content_mio_m3", "capacity_mio_m3"], inplace=True)

df_daily_fed = df_daily_fed.unstack()
df_daily_fed.columns = df_daily_fed.columns.droplevel(0)

# Add "Gesamt" column
df_weekly_gesamt = df_daily.groupby(["ts_measured"]).aggregate(
{
"content_mio_m3": sum_nan,
"capacity_mio_m3": sum_nan,
}
)
df_weekly_gesamt["fill_percent"] = (
df_weekly_gesamt["content_mio_m3"] / df_weekly_gesamt["capacity_mio_m3"] * 100
)
df_weekly_gesamt.drop(columns=["content_mio_m3", "capacity_mio_m3"], inplace=True)
df_weekly_gesamt.columns = ["Gesamt"]
df_daily_fed = df_daily_fed.join(df_weekly_gesamt, how="outer")

df_daily_fed.reset_index(inplace=True)

# Drop all data before one month ago cleanly
df_daily_fed = df_daily_fed.loc[
df_daily_fed["ts_measured"] > local_today_midnight() - relativedelta(months=1)
]

df_daily_fed.rename(columns={"ts_measured": "date"}, inplace=True)

# Convert datetime to iso date string
df_daily_fed["date"] = df_daily_fed["date"].dt.strftime("%Y-%m-%d")

return df_daily_fed


def sum_nan(x):
return x.sum(min_count=1)
Loading

0 comments on commit 7c61263

Please sign in to comment.