forked from tinkoff-ai/etna
-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement
BasePredictionIntervals
(#86)
* added implementation * added tests * updated documentation * updated `fit` signature * updated changelog * changed tests * moved intervals to experimental * updated documentation * fixed tests * removed duplications * reworked `params_to_tune` * reworked tests * updated changelog * updated test * reformatted tests
- Loading branch information
Showing
8 changed files
with
505 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from etna.experimental.prediction_intervals.base import BasePredictionIntervals |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
import pathlib | ||
from abc import abstractmethod | ||
from typing import Dict | ||
from typing import Optional | ||
from typing import Sequence | ||
|
||
import pandas as pd | ||
|
||
from etna.datasets import TSDataset | ||
from etna.distributions import BaseDistribution | ||
from etna.pipeline.base import BasePipeline | ||
|
||
|
||
class BasePredictionIntervals(BasePipeline): | ||
"""Base class for prediction intervals methods. | ||
This class implements a wrapper interface for pipelines and ensembles that provides the ability to | ||
estimate prediction intervals. | ||
To implement a particular method, one must inherit from this class and provide an implementation for the | ||
abstract method ``_forecast_prediction_interval``. This method should estimate and store prediction | ||
intervals for out-of-sample forecasts. | ||
In-sample prediction is not supported by default and will raise a corresponding error while attempting to do so. | ||
This functionality could be implemented if needed by overriding ``_predict`` method. This method is responsible | ||
for building an in-sample point forecast and adding prediction intervals. | ||
""" | ||
|
||
def __init__(self, pipeline: BasePipeline): | ||
"""Initialize instance of ``BasePredictionIntervals`` with given parameters. | ||
Parameters | ||
---------- | ||
pipeline: | ||
Base pipeline or ensemble for prediction intervals estimation. | ||
""" | ||
ts = pipeline.ts | ||
self.pipeline = pipeline | ||
super().__init__(pipeline.horizon) | ||
self.pipeline.ts = ts | ||
|
||
def fit(self, ts: TSDataset, save_ts: bool = True) -> "BasePredictionIntervals": | ||
"""Fit the pipeline or ensemble of pipelines. | ||
Fit and apply given transforms to the data, then fit the model on the transformed data. | ||
Parameters | ||
---------- | ||
ts: | ||
Dataset with timeseries data. | ||
save_ts: | ||
Whether to save ``ts`` in the pipeline during ``fit``. | ||
Returns | ||
------- | ||
: | ||
Fitted instance. | ||
""" | ||
self.pipeline.fit(ts=ts, save_ts=save_ts) | ||
return self | ||
|
||
@property | ||
def ts(self) -> Optional[TSDataset]: | ||
"""Access internal pipeline dataset.""" | ||
return self.pipeline.ts | ||
|
||
@ts.setter | ||
def ts(self, ts: Optional[TSDataset]): | ||
"""Set internal pipeline dataset.""" | ||
self.pipeline.ts = ts | ||
|
||
def _predict( | ||
self, | ||
ts: TSDataset, | ||
start_timestamp: Optional[pd.Timestamp], | ||
end_timestamp: Optional[pd.Timestamp], | ||
prediction_interval: bool, | ||
quantiles: Sequence[float], | ||
return_components: bool, | ||
) -> TSDataset: | ||
"""Make in-sample predictions on dataset in a given range. | ||
This method is not implemented by default. A custom implementation could be added by overriding if needed. | ||
Parameters | ||
---------- | ||
ts: | ||
Dataset to make predictions on. | ||
start_timestamp: | ||
First timestamp of prediction range to return, should be >= than first timestamp in ``ts``; | ||
expected that beginning of each segment <= ``start_timestamp``; | ||
if isn't set the first timestamp where each segment began is taken. | ||
end_timestamp: | ||
Last timestamp of prediction range to return; if isn't set the last timestamp of ``ts`` is taken. | ||
Expected that value is less or equal to the last timestamp in ``ts``. | ||
prediction_interval: | ||
If ``True`` returns prediction interval. | ||
quantiles: | ||
Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval. | ||
return_components: | ||
If ``True`` additionally returns forecast components. | ||
Returns | ||
------- | ||
: | ||
Dataset with predictions in ``[start_timestamp, end_timestamp]`` range. | ||
""" | ||
raise NotImplementedError( | ||
"In-sample sample prediction is not supported! See documentation on how it could be implemented." | ||
) | ||
|
||
def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset: | ||
"""Make point forecasts using base pipeline or ensemble.""" | ||
return self.pipeline._forecast(ts=ts, return_components=return_components) | ||
|
||
def save(self, path: pathlib.Path): | ||
"""Implement in SavePredictionIntervalsMixin.""" | ||
pass | ||
|
||
@classmethod | ||
def load(cls, path: pathlib.Path): | ||
"""Implement in SavePredictionIntervalsMixin.""" | ||
pass | ||
|
||
def forecast( | ||
self, | ||
ts: Optional[TSDataset] = None, | ||
prediction_interval: bool = False, | ||
quantiles: Sequence[float] = (0.025, 0.975), | ||
n_folds: int = 3, | ||
return_components: bool = False, | ||
) -> TSDataset: | ||
"""Make a forecast of the next points of a dataset. | ||
The result of forecasting starts from the last point of ``ts``, not including it. | ||
Parameters | ||
---------- | ||
ts: | ||
Dataset to forecast. | ||
prediction_interval: | ||
If True returns prediction interval for forecast. | ||
quantiles: | ||
Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval. | ||
If method don't use or estimate quantiles this parameter will be ignored. | ||
n_folds: | ||
Number of folds to use in the backtest for prediction interval estimation. | ||
return_components: | ||
If True additionally returns forecast components. | ||
Returns | ||
------- | ||
: | ||
Dataset with predictions. | ||
""" | ||
predictions = super().forecast( | ||
ts=ts, | ||
prediction_interval=prediction_interval, | ||
quantiles=quantiles, | ||
n_folds=n_folds, | ||
return_components=return_components, | ||
) | ||
return predictions | ||
|
||
def params_to_tune(self) -> Dict[str, BaseDistribution]: | ||
"""Get hyperparameter grid of the base pipeline to tune. | ||
Returns | ||
------- | ||
: | ||
Grid with hyperparameters. | ||
""" | ||
pipeline_params = self.pipeline.params_to_tune() | ||
pipeline_params = {f"pipeline.{key}": value for key, value in pipeline_params.items()} | ||
return pipeline_params | ||
|
||
@abstractmethod | ||
def _forecast_prediction_interval( | ||
self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int | ||
) -> TSDataset: | ||
"""Estimate and store prediction intervals. | ||
Parameters | ||
---------- | ||
ts: | ||
Dataset to forecast. | ||
predictions: | ||
Dataset with point predictions. | ||
quantiles: | ||
Levels of prediction distribution. | ||
n_folds: | ||
Number of folds to use in the backtest for prediction interval estimation. | ||
Returns | ||
------- | ||
: | ||
Dataset with predictions. | ||
""" | ||
pass |
Empty file.
51 changes: 51 additions & 0 deletions
51
tests/test_experimental/test_prediction_intervals/common.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from typing import Dict | ||
from typing import Sequence | ||
|
||
import pandas as pd | ||
|
||
from etna.datasets import TSDataset | ||
from etna.distributions import BaseDistribution | ||
from etna.distributions import FloatDistribution | ||
from etna.experimental.prediction_intervals import BasePredictionIntervals | ||
from etna.models import NaiveModel | ||
from etna.pipeline import BasePipeline | ||
from etna.pipeline import Pipeline | ||
from etna.transforms import AddConstTransform | ||
from etna.transforms import DateFlagsTransform | ||
|
||
|
||
def get_naive_pipeline(horizon): | ||
return Pipeline(model=NaiveModel(), transforms=[], horizon=horizon) | ||
|
||
|
||
def get_naive_pipeline_with_transforms(horizon): | ||
transforms = [AddConstTransform(in_column="target", value=1e6), DateFlagsTransform()] | ||
return Pipeline(model=NaiveModel(), transforms=transforms, horizon=horizon) | ||
|
||
|
||
class DummyPredictionIntervals(BasePredictionIntervals): | ||
"""Dummy class for testing.""" | ||
|
||
def __init__(self, pipeline: BasePipeline, width: float = 0.0): | ||
self.width = width | ||
super().__init__(pipeline=pipeline) | ||
|
||
def _forecast_prediction_interval( | ||
self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int | ||
) -> TSDataset: | ||
"""Set intervals borders as point forecast.""" | ||
borders = [] | ||
for segment in ts.segments: | ||
target_df = (predictions[:, segment, "target"]).to_frame() | ||
borders.append(target_df.rename({"target": f"target_lower"}, axis=1) - self.width / 2) | ||
borders.append(target_df.rename({"target": f"target_upper"}, axis=1) + self.width / 2) | ||
|
||
# directly store borders in ts.df | ||
predictions.df = pd.concat([predictions.df] + borders, axis=1).sort_index(axis=1, level=(0, 1)) | ||
|
||
return predictions | ||
|
||
def params_to_tune(self) -> Dict[str, BaseDistribution]: | ||
params = super().params_to_tune() | ||
params["width"] = FloatDistribution(low=-5.0, high=5.0) | ||
return params |
Oops, something went wrong.