Skip to content

Commit

Permalink
✨ Add new daily and weekly exporters
Browse files Browse the repository at this point in the history
  • Loading branch information
jh0ker committed Nov 15, 2023
1 parent 240fbd2 commit 25e5101
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 0 deletions.
112 changes: 112 additions & 0 deletions ddj_cloud/scrapers/talsperren/exporters/daily.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import pandas as pd
from dateutil.relativedelta import relativedelta

from ddj_cloud.scrapers.talsperren.common import Exporter
from ddj_cloud.utils.date_and_time import local_today_midnight


class DailyExporter(Exporter):
filename = "daily"

def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"])

# Drop all data before one month ago (plus some extra so we don't underfill any medians/means)
df_base = df_base.loc[
df_base["ts_measured"] > local_today_midnight() - relativedelta(months=3)
]

# First, set the 'ts_measured' column as the DataFrame index
df_base.set_index("ts_measured", inplace=True)
df_base.index = df_base.index.tz_convert("Europe/Berlin") # type: ignore

# Group by 'federation_name' and 'name', then resample to daily frequency using median
df_daily: pd.DataFrame = (
df_base.groupby(
["id"],
)
.resample("D")
.aggregate( # type: ignore
{
"content_mio_m3": "median",
"capacity_mio_m3": "median",
}
)
)

# Create a new MultiIndex with all permutations of 'id' and 'ts_measured'
idx = df_daily.index
multi_idx = pd.MultiIndex.from_product(
[idx.get_level_values(level=0).unique(), idx.get_level_values(level=1).unique()],
names=["id", "ts_measured"],
)

# Reindex DataFrame and forward fill missing values from those of the same station
df_daily = df_daily.reindex(multi_idx)
df_daily = df_daily.groupby(level=0).ffill()

# Reconstruct the 'federation_name' (and 'name') columns
# df_weekly["federation_name"], df_weekly["name"] = (
# df_weekly.index.get_level_values(level=0).str.split("_", 1).str
# )
df_daily["federation_name"] = (
df_daily.index.get_level_values(level=0).str.split("_", n=1).str[0]
)

# Drop the 'id' column from the index
df_daily.reset_index(level=0, drop=True, inplace=True)
df_daily.reset_index(inplace=True)
df_daily.set_index("ts_measured", inplace=True)

# Create a new dataframe with columns for each "federation_name"
# with the mean fill_percent for each week across all reservoirs
# of that federation
df_daily_fed = df_daily.groupby(["ts_measured", "federation_name"]).aggregate(
{
"content_mio_m3": sum_nan,
"capacity_mio_m3": sum_nan,
}
)

# Add "fill_percent" columns
df_daily_fed["fill_percent"] = (
df_daily_fed["content_mio_m3"] / df_daily_fed["capacity_mio_m3"] * 100
)

# Drop "content_mio_m3" and "capacity_mio_m3" columns
df_daily_fed.drop(columns=["content_mio_m3", "capacity_mio_m3"], inplace=True)

df_daily_fed = df_daily_fed.unstack()
df_daily_fed.columns = df_daily_fed.columns.droplevel(0)

# Add "Gesamt" column
df_weekly_gesamt = df_daily.groupby(["ts_measured"]).aggregate(
{
"content_mio_m3": sum_nan,
"capacity_mio_m3": sum_nan,
}
)
df_weekly_gesamt["fill_percent"] = (
df_weekly_gesamt["content_mio_m3"] / df_weekly_gesamt["capacity_mio_m3"] * 100
)
df_weekly_gesamt.drop(columns=["content_mio_m3", "capacity_mio_m3"], inplace=True)
df_weekly_gesamt.columns = ["Gesamt"]
df_daily_fed = df_daily_fed.join(df_weekly_gesamt, how="outer")

df_daily_fed.reset_index(inplace=True)

# Drop all data before one month ago cleanly
df_daily_fed = df_daily_fed.loc[
df_daily_fed["ts_measured"] > local_today_midnight() - relativedelta(months=1)
]

df_daily_fed.rename(columns={"ts_measured": "date"}, inplace=True)

# Convert datetime to iso date string
df_daily_fed["date"] = df_daily_fed["date"].dt.strftime("%Y-%m-%d")

return df_daily_fed


def sum_nan(x):
return x.sum(min_count=1)
115 changes: 115 additions & 0 deletions ddj_cloud/scrapers/talsperren/exporters/weekly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import pandas as pd
from dateutil.relativedelta import relativedelta

from ddj_cloud.scrapers.talsperren.common import Exporter
from ddj_cloud.utils.date_and_time import local_today_midnight


class WeeklyExporter(Exporter):
filename = "weekly"

def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"])

# Drop all data before one year ago (plus some extra so we don't underfill any medians/means)
df_base = df_base.loc[
df_base["ts_measured"] > local_today_midnight() - relativedelta(years=1, weeks=3)
]

# First, set the 'ts_measured' column as the DataFrame index
df_base.set_index("ts_measured", inplace=True)
df_base.index = df_base.index.tz_convert("Europe/Berlin") # type: ignore

# Group by 'federation_name' and 'name', then resample to daily frequency using median
df_weekly: pd.DataFrame = (
df_base.groupby(
["id"],
)
.resample("W")
.aggregate( # type: ignore
{
"content_mio_m3": "median",
"capacity_mio_m3": "median",
}
)
)

# Create a new MultiIndex with all permutations of 'id' and 'ts_measured'
idx = df_weekly.index
multi_idx = pd.MultiIndex.from_product(
[idx.get_level_values(level=0).unique(), idx.get_level_values(level=1).unique()],
names=["id", "ts_measured"],
)

# Reindex DataFrame and forward fill missing values from those of the same station
df_weekly = df_weekly.reindex(multi_idx)
df_weekly = df_weekly.groupby(level=0).ffill()

# Reconstruct the 'federation_name' (and 'name') columns
# df_weekly["federation_name"], df_weekly["name"] = (
# df_weekly.index.get_level_values(level=0).str.split("_", 1).str
# )
df_weekly["federation_name"] = (
df_weekly.index.get_level_values(level=0).str.split("_", n=1).str[0]
)

# Drop the 'id' column from the index
df_weekly.reset_index(level=0, drop=True, inplace=True)
df_weekly.reset_index(inplace=True)
df_weekly.set_index("ts_measured", inplace=True)

# Create a new dataframe with columns for each "federation_name"
# with the mean fill_percent for each week across all reservoirs
# of that federation
df_weekly_fed = df_weekly.groupby(["ts_measured", "federation_name"]).aggregate(
{
"content_mio_m3": sum_nan,
"capacity_mio_m3": sum_nan,
}
)

# Add "fill_percent" columns
df_weekly_fed["fill_percent"] = (
df_weekly_fed["content_mio_m3"] / df_weekly_fed["capacity_mio_m3"] * 100
)

# Drop "content_mio_m3" and "capacity_mio_m3" columns
df_weekly_fed.drop(columns=["content_mio_m3", "capacity_mio_m3"], inplace=True)

df_weekly_fed = df_weekly_fed.unstack()
df_weekly_fed.columns = df_weekly_fed.columns.droplevel(0)

# Add "Gesamt" column
df_weekly_gesamt = df_weekly.groupby(["ts_measured"]).aggregate(
{
"content_mio_m3": sum_nan,
"capacity_mio_m3": sum_nan,
}
)
df_weekly_gesamt["fill_percent"] = (
df_weekly_gesamt["content_mio_m3"] / df_weekly_gesamt["capacity_mio_m3"] * 100
)
df_weekly_gesamt.drop(columns=["content_mio_m3", "capacity_mio_m3"], inplace=True)
df_weekly_gesamt.columns = ["Gesamt"]
df_weekly_fed = df_weekly_fed.join(df_weekly_gesamt, how="outer")

df_weekly_fed.reset_index(inplace=True)

# Drop all data before one year ago cleanly
df_weekly_fed = df_weekly_fed.loc[
df_weekly_fed["ts_measured"] > local_today_midnight() - relativedelta(years=1)
]

df_weekly_fed.rename(columns={"ts_measured": "date"}, inplace=True)

# Use start of week as date instead of end of week
# df_weekly_fed["date"] = df_weekly_fed["date"] - to_offset("6D")

# Convert date to ISO Week format
df_weekly_fed["date"] = df_weekly_fed["date"].dt.strftime("%G-W%V")

return df_weekly_fed


def sum_nan(x):
return x.sum(min_count=1)

0 comments on commit 25e5101

Please sign in to comment.