Skip to content

Commit

Permalink
Add support for datasets that can't fit in memory (#1049)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasminerienecker authored Jul 19, 2024
1 parent 5c39e99 commit 15ed497
Show file tree
Hide file tree
Showing 7 changed files with 719 additions and 155 deletions.
5 changes: 2 additions & 3 deletions nbs/common.base_model.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@
"import torch.nn as nn\n",
"import pytorch_lightning as pl\n",
"from pytorch_lightning.callbacks.early_stopping import EarlyStopping\n",
"\n",
"from neuralforecast.tsdataset import (\n",
" TimeSeriesDataModule,\n",
" TimeSeriesDataset,\n",
" BaseTimeSeriesDataset,\n",
" _DistributedTimeSeriesDataModule,\n",
")\n",
"from neuralforecast.losses.pytorch import IQLoss"
Expand Down Expand Up @@ -354,7 +353,7 @@
"\n",
" self.val_size = val_size\n",
" self.test_size = test_size\n",
" is_local = isinstance(dataset, TimeSeriesDataset)\n",
" is_local = isinstance(dataset, BaseTimeSeriesDataset)\n",
" if is_local:\n",
" datamodule_constructor = TimeSeriesDataModule\n",
" else:\n",
Expand Down
128 changes: 109 additions & 19 deletions nbs/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "2508f7a9-1433-4ad8-8f2f-0078c6ed6c3c",
"id": "515672ca",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -47,7 +47,8 @@
"import s3fs\n",
"from fastcore.test import test_eq, test_fail\n",
"from nbdev.showdoc import show_doc\n",
"from neuralforecast.utils import generate_series"
"from neuralforecast.utils import generate_series\n",
"from pathlib import Path"
]
},
{
Expand All @@ -63,7 +64,7 @@
"import warnings\n",
"from copy import deepcopy\n",
"from itertools import chain\n",
"from typing import Any, Dict, List, Optional, Union\n",
"from typing import Any, Dict, List, Optional, Sequence, Union\n",
"\n",
"import fsspec\n",
"import numpy as np\n",
Expand All @@ -83,7 +84,7 @@
"\n",
"from neuralforecast.common._base_model import DistributedConfig\n",
"from neuralforecast.compat import SparkDataFrame\n",
"from neuralforecast.tsdataset import _FilesDataset, TimeSeriesDataset\n",
"from neuralforecast.tsdataset import _FilesDataset, TimeSeriesDataset, LocalFilesTimeSeriesDataset\n",
"from neuralforecast.models import (\n",
" GRU, LSTM, RNN, TCN, DeepAR, DilatedRNN,\n",
" MLP, NHITS, NBEATS, NBEATSx, DLinear, NLinear,\n",
Expand Down Expand Up @@ -455,9 +456,42 @@
" target_col=target_col,\n",
" min_size=df.groupBy(id_col).count().agg({\"count\": \"min\"}).first()[0],\n",
" )\n",
" \n",
" def _prepare_fit_for_local_files(\n",
" self, \n",
" files_list: Sequence[str], \n",
" static_df: Optional[DataFrame], \n",
" sort_df: bool, \n",
" id_col: str, \n",
" time_col: str, \n",
" target_col: str\n",
" ):\n",
" if self.local_scaler_type is not None:\n",
" raise ValueError(\n",
" \"Historic scaling isn't supported when the dataset is split between files. \"\n",
" \"Please open an issue if this would be valuable to you.\"\n",
" )\n",
" \n",
" self.id_col = id_col\n",
" self.time_col = time_col\n",
" self.target_col = target_col \n",
" self.scalers_ = {} \n",
" self.sort_df = sort_df \n",
"\n",
" exogs = self._get_needed_exog() \n",
" return LocalFilesTimeSeriesDataset.from_data_directories(\n",
" directories=files_list,\n",
" static_df=static_df,\n",
" sort_df=sort_df,\n",
" exogs=exogs,\n",
" id_col=id_col,\n",
" time_col=time_col,\n",
" target_col=target_col,\n",
" )\n",
"\n",
"\n",
" def fit(self,\n",
" df: Optional[Union[DataFrame, SparkDataFrame]] = None,\n",
" df: Optional[Union[DataFrame, SparkDataFrame, Sequence[str]]] = None,\n",
" static_df: Optional[Union[DataFrame, SparkDataFrame]] = None,\n",
" val_size: Optional[int] = 0,\n",
" sort_df: bool = True,\n",
Expand All @@ -475,7 +509,7 @@
"\n",
" Parameters\n",
" ----------\n",
" df : pandas, polars or spark DataFrame, optional (default=None)\n",
" df : pandas, polars or spark DataFrame, or a list of parquet files containing the series, optional (default=None)\n",
" DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.\n",
" If None, a previously stored dataset is required.\n",
" static_df : pandas, polars or spark DataFrame, optional (default=None)\n",
Expand Down Expand Up @@ -539,12 +573,25 @@
" target_col=target_col,\n",
" distributed_config=distributed_config,\n",
" )\n",
" elif isinstance(df, Sequence):\n",
" if not all(isinstance(val, str) for val in df):\n",
" raise ValueError(\"All entries in the list of files must be of type string\") \n",
" self.dataset = self._prepare_fit_for_local_files(\n",
" files_list=df,\n",
" static_df=static_df,\n",
" sort_df=sort_df,\n",
" id_col=id_col,\n",
" time_col=time_col,\n",
" target_col=target_col,\n",
" )\n",
" self.uids = self.dataset.indices\n",
" self.last_dates = self.dataset.last_times\n",
" elif df is None:\n",
" if verbose:\n",
" print(\"Using stored dataset.\")\n",
" else:\n",
" raise ValueError(\n",
" f\"`df` must be a pandas, polars or spark DataFrame or `None`, got: {type(df)}\"\n",
" f\"`df` must be a pandas, polars or spark DataFrame, or a list of parquet files containing the series, or `None`, got: {type(df)}\"\n",
" )\n",
"\n",
" if val_size is not None:\n",
Expand Down Expand Up @@ -614,7 +661,12 @@
"\n",
" def _get_needed_futr_exog(self):\n",
" return set(chain.from_iterable(getattr(m, 'futr_exog_list', []) for m in self.models))\n",
"\n",
" \n",
" def _get_needed_exog(self):\n",
" futr_exog = self._get_needed_futr_exog()\n",
" hist_exog = set(chain.from_iterable(getattr(m, 'hist_exog_list', []) for m in self.models))\n",
" return futr_exog | hist_exog\n",
" \n",
" def _get_model_names(self) -> List[str]:\n",
" names: List[str] = []\n",
" count_names = {'model': 0}\n",
Expand Down Expand Up @@ -792,14 +844,20 @@
" # distributed df or NeuralForecast instance was trained with a distributed input and no df is provided\n",
" # we assume the user wants to perform distributed inference as well\n",
" is_files_dataset = isinstance(getattr(self, 'dataset', None), _FilesDataset)\n",
" is_dataset_local_files = isinstance(getattr(self, 'dataset', None), LocalFilesTimeSeriesDataset)\n",
" if isinstance(df, SparkDataFrame) or (df is None and is_files_dataset):\n",
" return self._predict_distributed(\n",
" df=df,\n",
" static_df=static_df,\n",
" futr_df=futr_df,\n",
" engine=engine,\n",
" )\n",
"\n",
" \n",
" if is_dataset_local_files and df is None:\n",
" raise ValueError(\n",
" \"When the model has been trained on a dataset that is split between multiple files, you must pass in a specific dataframe for prediciton.\"\n",
" )\n",
" \n",
" # Process new dataset but does not store it.\n",
" if df is not None:\n",
" validate_freq(df[self.time_col], self.freq)\n",
Expand Down Expand Up @@ -1572,6 +1630,8 @@
"outputs": [],
"source": [
"#| hide\n",
"import tempfile\n",
"\n",
"import matplotlib.pyplot as plt\n",
"import pytorch_lightning as pl\n",
"\n",
Expand Down Expand Up @@ -2135,6 +2195,45 @@
"ax.grid()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "85180620",
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"# test training with an iterative dataset produces the same results as directly passing in the dataset as a pandas dataframe\n",
"\n",
"models = [NHITS(h=12, input_size=12, max_steps=10, futr_exog_list=['trend'], random_seed=1)]\n",
"nf = NeuralForecast(models=models, freq='M')\n",
"\n",
"# fit+predict with pandas dataframe\n",
"nf.fit(df=AirPassengersPanel_train, use_init_models=True)\n",
"pred_dataframe = nf.predict(futr_df=AirPassengersPanel_test)\n",
"\n",
"# fit+predict with data directory\n",
"AirPassengersPanel_train['id'] = AirPassengersPanel_train['unique_id']\n",
"AirPassengersPanel_test['id'] = AirPassengersPanel_test['unique_id']\n",
"\n",
"with tempfile.TemporaryDirectory() as tmpdir:\n",
" AirPassengersPanel_train.to_parquet(tmpdir, partition_cols=['unique_id'], index=False)\n",
" data_directory = sorted([str(path) for path in Path(tmpdir).iterdir()])\n",
" nf.fit(df=data_directory, use_init_models=True, id_col='id')\n",
"\n",
"pred_df = AirPassengersPanel_train[AirPassengersPanel_train['unique_id'] == 'Airline2'].drop(columns='unique_id')\n",
"futr_df = AirPassengersPanel_test[AirPassengersPanel_test['unique_id'] == 'Airline2'].drop(columns='unique_id')\n",
"\n",
"pred_iterative = nf.predict(df=pred_df, futr_df=futr_df)\n",
"pred_airline2 = pred_dataframe[pred_dataframe['unique_id'] == 'Airline2'].reset_index(drop=True)\n",
"test_eq(pred_iterative['ds'], pred_airline2['ds'])\n",
"np.testing.assert_allclose(pred_iterative['NHITS'], pred_airline2['NHITS'], rtol=0, atol=1)\n",
"\n",
"# remove id columns to not impact future tests\n",
"AirPassengersPanel_train = AirPassengersPanel_train.drop(columns='id')\n",
"AirPassengersPanel_test = AirPassengersPanel_test.drop(columns='id')"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -3033,17 +3132,8 @@
" with warnings.catch_warnings(record=True) as issued_warnings:\n",
" warnings.simplefilter('always', UserWarning)\n",
" nf.fit(AirPassengersPanel_train)\n",
" assert any(\"ignoring lr_scheduler_kwargs as the lr_scheduler is not specified\" in str(w.message) for w in issued_warnings)\n",
"\n"
" assert any(\"ignoring lr_scheduler_kwargs as the lr_scheduler is not specified\" in str(w.message) for w in issued_warnings)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "652c530c-d0e6-4806-a999-bc9b812e5472",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
Loading

0 comments on commit 15ed497

Please sign in to comment.