Skip to content

Commit

Permalink
use ufp for utilsforecast.processing
Browse files Browse the repository at this point in the history
  • Loading branch information
jmoralez committed Nov 23, 2023
1 parent f7b58b3 commit da82798
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 83 deletions.
45 changes: 17 additions & 28 deletions nbs/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,9 @@
"import numpy as np\n",
"import pandas as pd\n",
"import torch\n",
"import utilsforecast.processing as ufp\n",
"from utilsforecast.compat import DataFrame, Series, pl_DataFrame, pl_Series\n",
"from utilsforecast.grouped_array import GroupedArray\n",
"from utilsforecast.processing import (\n",
" assign_columns,\n",
" counts_by_id,\n",
" cv_times,\n",
" horizontal_concat,\n",
" is_none,\n",
" join,\n",
" offset_times,\n",
" repeat,\n",
" sort,\n",
" time_ranges,\n",
")\n",
"from utilsforecast.target_transforms import (\n",
" BaseTargetTransform,\n",
" LocalBoxCox,\n",
Expand Down Expand Up @@ -134,14 +123,14 @@
" df_constructor = pd.DataFrame\n",
" out = df_constructor(\n",
" {\n",
" 'unique_id': repeat(uids, h * windows_per_serie),\n",
" 'unique_id': ufp.repeat(uids, h * windows_per_serie),\n",
" 'ds': ds,\n",
" 'cutoff': cutoffs,\n",
" }\n",
" )\n",
" # the first cutoff is before the first train date\n",
" actual_cutoffs = offset_times(out['cutoff'], freq, -1)\n",
" out = assign_columns(out, 'cutoff', actual_cutoffs)\n",
" actual_cutoffs = ufp.offset_times(out['cutoff'], freq, -1)\n",
" out = ufp.assign_columns(out, 'cutoff', actual_cutoffs)\n",
" return out"
]
},
Expand Down Expand Up @@ -461,11 +450,11 @@
" df_constructor = pl_DataFrame\n",
" else:\n",
" df_constructor = pd.DataFrame\n",
" starts = offset_times(last_dates, self.freq, 1)\n",
" starts = ufp.offset_times(last_dates, self.freq, 1)\n",
" fcsts_df = df_constructor(\n",
" {\n",
" 'unique_id': repeat(self.uids, self.h),\n",
" 'ds': time_ranges(starts, freq=self.freq, periods=self.h),\n",
" 'unique_id': ufp.repeat(self.uids, self.h),\n",
" 'ds': ufp.time_ranges(starts, freq=self.freq, periods=self.h),\n",
" }\n",
" )\n",
"\n",
Expand All @@ -474,7 +463,7 @@
" futr_dataset = dataset.align(fcsts_df)\n",
" else:\n",
" futr_orig_rows = futr_df.shape[0]\n",
" futr_df = join(futr_df, fcsts_df, on=['unique_id', 'ds'])\n",
" futr_df = ufp.join(futr_df, fcsts_df, on=['unique_id', 'ds'])\n",
" base_err_msg = f'`futr_df` must have one row per id and ds in the forecasting horizon ({self.h}).'\n",
" if futr_df.shape[0] < fcsts_df.shape[0]:\n",
" raise ValueError(base_err_msg)\n",
Expand All @@ -483,7 +472,7 @@
" warnings.warn(\n",
" f'Dropped {dropped_rows:,} unused rows from `futr_df`. ' + base_err_msg\n",
" )\n",
" if any(is_none(futr_df[col]).any() for col in needed_futr_exog):\n",
" if any(ufp.is_none(futr_df[col]).any() for col in needed_futr_exog):\n",
" raise ValueError('Found null values in `futr_df`')\n",
" futr_dataset = dataset.align(futr_df)\n",
" self._scalers_transform(futr_dataset)\n",
Expand All @@ -509,7 +498,7 @@
" fcsts = pl_DataFrame(fcsts, schema=cols)\n",
" else:\n",
" fcsts = pd.DataFrame(fcsts, columns=cols)\n",
" fcsts_df = horizontal_concat([fcsts_df, fcsts])\n",
" fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])\n",
"\n",
" return fcsts_df\n",
" \n",
Expand Down Expand Up @@ -602,7 +591,7 @@
" if self.dataset.min_size < (val_size+test_size):\n",
" warnings.warn('Validation and test sets are larger than the shorter time-series.')\n",
"\n",
" fcsts_df = cv_times(\n",
" fcsts_df = ufp.cv_times(\n",
" times=self.ds,\n",
" uids=self.uids,\n",
" indptr=self.dataset.indptr,\n",
Expand All @@ -611,7 +600,7 @@
" step_size=step_size,\n",
" )\n",
" # the cv_times is sorted by window and then id\n",
" fcsts_df = sort(fcsts_df, ['unique_id', 'cutoff', 'ds'])\n",
" fcsts_df = ufp.sort(fcsts_df, ['unique_id', 'cutoff', 'ds'])\n",
"\n",
" col_idx = 0\n",
" fcsts = np.full((self.dataset.n_groups * h * n_windows, len(cols)),\n",
Expand All @@ -638,7 +627,7 @@
" fcsts = pl_DataFrame(fcsts, schema=cols)\n",
" else:\n",
" fcsts = pd.DataFrame(fcsts, columns=cols)\n",
" fcsts_df = horizontal_concat([fcsts_df, fcsts])\n",
" fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])\n",
"\n",
" # Add original input df's y to forecasts DataFrame \n",
" return join(fcsts_df, df, how='left', on=['unique_id', 'ds'])\n",
Expand Down Expand Up @@ -722,7 +711,7 @@
"\n",
" # original y\n",
" original_y = {\n",
" 'unique_id': repeat(self.uids, np.diff(self.dataset.indptr)),\n",
" 'unique_id': ufp.repeat(self.uids, np.diff(self.dataset.indptr)),\n",
" 'ds': self.ds,\n",
" 'y': self.dataset.temporal[:, 0].numpy(),\n",
" }\n",
Expand All @@ -734,12 +723,12 @@
" else:\n",
" fcsts = pd.DataFrame(fcsts, columns=cols)\n",
" Y_df = pd.DataFrame(original_y).reset_index(drop=True)\n",
" fcsts_df = horizontal_concat([fcsts_df, fcsts])\n",
" fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])\n",
"\n",
" # Add original input df's y to forecasts DataFrame\n",
" fcsts_df = join(fcsts_df, Y_df, how='left', on=['unique_id', 'ds'])\n",
" fcsts_df = ufp.join(fcsts_df, Y_df, how='left', on=['unique_id', 'ds'])\n",
" if self.scalers_:\n",
" sizes = counts_by_id(fcsts_df, 'unique_id')['counts'].to_numpy()\n",
" sizes = ufp.counts_by_id(fcsts_df, 'unique_id')['counts'].to_numpy()\n",
" indptr = np.append(0, sizes.cumsum())\n",
" invert_cols = cols + ['y']\n",
" fcsts_df[invert_cols] = self._scalers_target_inverse_transform(\n",
Expand Down
22 changes: 8 additions & 14 deletions nbs/tsdataset.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,9 @@
"import pandas as pd\n",
"import pytorch_lightning as pl\n",
"import torch\n",
"import utilsforecast.processing as ufp\n",
"from torch.utils.data import Dataset, DataLoader\n",
"from utilsforecast.compat import DataFrame, pl_Series\n",
"from utilsforecast.processing import (\n",
" assign_columns,\n",
" copy_if_pandas,\n",
" process_df,\n",
" sort,\n",
" to_numpy,\n",
")"
"from utilsforecast.compat import DataFrame, pl_Series"
]
},
{
Expand Down Expand Up @@ -210,15 +204,15 @@
"\n",
" def align(self, df: DataFrame) -> 'TimeSeriesDataset':\n",
" # Protect consistency\n",
" df = copy_if_pandas(df, deep=False)\n",
" df = ufp.copy_if_pandas(df, deep=False)\n",
"\n",
" # Add Nones to missing columns (without available_mask)\n",
" temporal_cols = self.temporal_cols.copy()\n",
" for col in temporal_cols:\n",
" if col not in df.columns:\n",
" df = assign_columns(df, col, np.nan)\n",
" df = ufp.assign_columns(df, col, np.nan)\n",
" if col == 'available_mask':\n",
" df = assign_columns(df, col, 1.0)\n",
" df = ufp.assign_columns(df, col, 1.0)\n",
" \n",
" # Sort columns to match self.temporal_cols (without available_mask)\n",
" df = df[ ['unique_id','ds'] + temporal_cols.tolist() ]\n",
Expand Down Expand Up @@ -320,9 +314,9 @@
" DeprecationWarning,\n",
" )\n",
" if sort_df:\n",
" static_df = sort(static_df, by='unique_id')\n",
" static_df = ufp.sort(static_df, by='unique_id')\n",
"\n",
" ids, times, data, indptr, sort_idxs = process_df(df, 'unique_id', 'ds', 'y')\n",
" ids, times, data, indptr, sort_idxs = ufp.process_df(df, 'unique_id', 'ds', 'y')\n",
" # processor sets y as the first column\n",
" temporal_cols = pd.Index(['y'] + [c for c in df.columns if c not in ('unique_id', 'ds', 'y')]) \n",
" temporal = data.astype(np.float32, copy=False)\n",
Expand All @@ -344,7 +338,7 @@
" # Static features\n",
" if static_df is not None:\n",
" static_cols = static_df.columns.drop('unique_id')\n",
" static = to_numpy(static_df[static_cols])\n",
" static = ufp.to_numpy(static_df[static_cols])\n",
" else:\n",
" static = None\n",
" static_cols = None\n",
Expand Down
45 changes: 17 additions & 28 deletions neuralforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,9 @@
import numpy as np
import pandas as pd
import torch
import utilsforecast.processing as ufp
from utilsforecast.compat import DataFrame, Series, pl_DataFrame, pl_Series
from utilsforecast.grouped_array import GroupedArray
from utilsforecast.processing import (
assign_columns,
counts_by_id,
cv_times,
horizontal_concat,
is_none,
join,
offset_times,
repeat,
sort,
time_ranges,
)
from utilsforecast.target_transforms import (
BaseTargetTransform,
LocalBoxCox,
Expand Down Expand Up @@ -89,14 +78,14 @@ def _insample_times(
df_constructor = pd.DataFrame
out = df_constructor(
{
"unique_id": repeat(uids, h * windows_per_serie),
"unique_id": ufp.repeat(uids, h * windows_per_serie),
"ds": ds,
"cutoff": cutoffs,
}
)
# the first cutoff is before the first train date
actual_cutoffs = offset_times(out["cutoff"], freq, -1)
out = assign_columns(out, "cutoff", actual_cutoffs)
actual_cutoffs = ufp.offset_times(out["cutoff"], freq, -1)
out = ufp.assign_columns(out, "cutoff", actual_cutoffs)
return out

# %% ../nbs/core.ipynb 7
Expand Down Expand Up @@ -388,11 +377,11 @@ def predict(
df_constructor = pl_DataFrame
else:
df_constructor = pd.DataFrame
starts = offset_times(last_dates, self.freq, 1)
starts = ufp.offset_times(last_dates, self.freq, 1)
fcsts_df = df_constructor(
{
"unique_id": repeat(self.uids, self.h),
"ds": time_ranges(starts, freq=self.freq, periods=self.h),
"unique_id": ufp.repeat(self.uids, self.h),
"ds": ufp.time_ranges(starts, freq=self.freq, periods=self.h),
}
)

Expand All @@ -401,7 +390,7 @@ def predict(
futr_dataset = dataset.align(fcsts_df)
else:
futr_orig_rows = futr_df.shape[0]
futr_df = join(futr_df, fcsts_df, on=["unique_id", "ds"])
futr_df = ufp.join(futr_df, fcsts_df, on=["unique_id", "ds"])
base_err_msg = f"`futr_df` must have one row per id and ds in the forecasting horizon ({self.h})."
if futr_df.shape[0] < fcsts_df.shape[0]:
raise ValueError(base_err_msg)
Expand All @@ -411,7 +400,7 @@ def predict(
f"Dropped {dropped_rows:,} unused rows from `futr_df`. "
+ base_err_msg
)
if any(is_none(futr_df[col]).any() for col in needed_futr_exog):
if any(ufp.is_none(futr_df[col]).any() for col in needed_futr_exog):
raise ValueError("Found null values in `futr_df`")
futr_dataset = dataset.align(futr_df)
self._scalers_transform(futr_dataset)
Expand All @@ -437,7 +426,7 @@ def predict(
fcsts = pl_DataFrame(fcsts, schema=cols)
else:
fcsts = pd.DataFrame(fcsts, columns=cols)
fcsts_df = horizontal_concat([fcsts_df, fcsts])
fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])

return fcsts_df

Expand Down Expand Up @@ -535,7 +524,7 @@ def cross_validation(
"Validation and test sets are larger than the shorter time-series."
)

fcsts_df = cv_times(
fcsts_df = ufp.cv_times(
times=self.ds,
uids=self.uids,
indptr=self.dataset.indptr,
Expand All @@ -544,7 +533,7 @@ def cross_validation(
step_size=step_size,
)
# the cv_times is sorted by window and then id
fcsts_df = sort(fcsts_df, ["unique_id", "cutoff", "ds"])
fcsts_df = ufp.sort(fcsts_df, ["unique_id", "cutoff", "ds"])

col_idx = 0
fcsts = np.full(
Expand Down Expand Up @@ -574,7 +563,7 @@ def cross_validation(
fcsts = pl_DataFrame(fcsts, schema=cols)
else:
fcsts = pd.DataFrame(fcsts, columns=cols)
fcsts_df = horizontal_concat([fcsts_df, fcsts])
fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])

# Add original input df's y to forecasts DataFrame
return join(fcsts_df, df, how="left", on=["unique_id", "ds"])
Expand Down Expand Up @@ -666,7 +655,7 @@ def predict_insample(self, step_size: int = 1):

# original y
original_y = {
"unique_id": repeat(self.uids, np.diff(self.dataset.indptr)),
"unique_id": ufp.repeat(self.uids, np.diff(self.dataset.indptr)),
"ds": self.ds,
"y": self.dataset.temporal[:, 0].numpy(),
}
Expand All @@ -678,12 +667,12 @@ def predict_insample(self, step_size: int = 1):
else:
fcsts = pd.DataFrame(fcsts, columns=cols)
Y_df = pd.DataFrame(original_y).reset_index(drop=True)
fcsts_df = horizontal_concat([fcsts_df, fcsts])
fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])

# Add original input df's y to forecasts DataFrame
fcsts_df = join(fcsts_df, Y_df, how="left", on=["unique_id", "ds"])
fcsts_df = ufp.join(fcsts_df, Y_df, how="left", on=["unique_id", "ds"])
if self.scalers_:
sizes = counts_by_id(fcsts_df, "unique_id")["counts"].to_numpy()
sizes = ufp.counts_by_id(fcsts_df, "unique_id")["counts"].to_numpy()
indptr = np.append(0, sizes.cumsum())
invert_cols = cols + ["y"]
fcsts_df[invert_cols] = self._scalers_target_inverse_transform(
Expand Down
20 changes: 7 additions & 13 deletions neuralforecast/tsdataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@
import pandas as pd
import pytorch_lightning as pl
import torch
import utilsforecast.processing as ufp
from torch.utils.data import Dataset, DataLoader
from utilsforecast.compat import DataFrame, pl_Series
from utilsforecast.processing import (
assign_columns,
copy_if_pandas,
process_df,
sort,
to_numpy,
)

# %% ../nbs/tsdataset.ipynb 5
class TimeSeriesLoader(DataLoader):
Expand Down Expand Up @@ -146,15 +140,15 @@ def __eq__(self, other):

def align(self, df: DataFrame) -> "TimeSeriesDataset":
# Protect consistency
df = copy_if_pandas(df, deep=False)
df = ufp.copy_if_pandas(df, deep=False)

# Add Nones to missing columns (without available_mask)
temporal_cols = self.temporal_cols.copy()
for col in temporal_cols:
if col not in df.columns:
df = assign_columns(df, col, np.nan)
df = ufp.assign_columns(df, col, np.nan)
if col == "available_mask":
df = assign_columns(df, col, 1.0)
df = ufp.assign_columns(df, col, 1.0)

# Sort columns to match self.temporal_cols (without available_mask)
df = df[["unique_id", "ds"] + temporal_cols.tolist()]
Expand Down Expand Up @@ -272,9 +266,9 @@ def from_df(df, static_df=None, sort_df=False):
DeprecationWarning,
)
if sort_df:
static_df = sort(static_df, by="unique_id")
static_df = ufp.sort(static_df, by="unique_id")

ids, times, data, indptr, sort_idxs = process_df(df, "unique_id", "ds", "y")
ids, times, data, indptr, sort_idxs = ufp.process_df(df, "unique_id", "ds", "y")
# processor sets y as the first column
temporal_cols = pd.Index(
["y"] + [c for c in df.columns if c not in ("unique_id", "ds", "y")]
Expand All @@ -298,7 +292,7 @@ def from_df(df, static_df=None, sort_df=False):
# Static features
if static_df is not None:
static_cols = static_df.columns.drop("unique_id")
static = to_numpy(static_df[static_cols])
static = ufp.to_numpy(static_df[static_cols])
else:
static = None
static_cols = None
Expand Down

0 comments on commit da82798

Please sign in to comment.