Skip to content

Commit

Permalink
Implement EmpericalPredictionIntervals (#173)
Browse files Browse the repository at this point in the history
* added empirical method

* added tests

* updated docs

* updated changelog

* review fixes

* fixed indentation
  • Loading branch information
brsnw250 authored Dec 5, 2023
1 parent c166e9c commit 03de185
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `get_historical_forecasts` to pipelines for forecast estimation at each fold on the historical dataset ([#143](https://github.com/etna-team/etna/pull/143))
- `ConformalPredictionIntervals` method for prediction intervals estimation ([#152](https://github.com/etna-team/etna/pull/152))
- Add DeepARNativeModel ([#114](https://github.com/etna-team/etna/pull/114))
- `EmpiricalPredictionIntervals` method for prediction intervals estimation ([#173](https://github.com/etna-team/etna/pull/173))
-
-
-
-
Expand Down
1 change: 1 addition & 0 deletions docs/source/api_reference/experimental.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Prediction Intervals:
prediction_intervals.BasePredictionIntervals
prediction_intervals.NaiveVariancePredictionIntervals
prediction_intervals.ConformalPredictionIntervals
prediction_intervals.EmpiricalPredictionIntervals

Prediction Intervals utilities:

Expand Down
1 change: 1 addition & 0 deletions etna/experimental/prediction_intervals/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from etna.experimental.prediction_intervals.base import BasePredictionIntervals
from etna.experimental.prediction_intervals.conformal import ConformalPredictionIntervals
from etna.experimental.prediction_intervals.empirical import EmpiricalPredictionIntervals
from etna.experimental.prediction_intervals.naive_variance import NaiveVariancePredictionIntervals
91 changes: 91 additions & 0 deletions etna/experimental/prediction_intervals/empirical.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from typing import Sequence

import numpy as np
import pandas as pd

from etna.datasets import TSDataset
from etna.experimental.prediction_intervals import BasePredictionIntervals
from etna.experimental.prediction_intervals.utils import residuals_matrices
from etna.pipeline import BasePipeline


class EmpiricalPredictionIntervals(BasePredictionIntervals):
"""Estimate prediction intervals using values of historical residuals.
1. Compute matrix of residuals :math:`r_{it} = |\hat y_{it} - y_{it}|` using k-fold backtest, where :math:`i` is fold index.
2. Estimate quantiles levels, that satisfy the provided coverage, for the corresponding residuals distributions.
3. Estimate quantiles for each timestamp using computed residuals and levels.
`Reference implementation <https://www.sktime.net/en/stable/api_reference/auto_generated/sktime.forecasting.conformal.ConformalIntervals.html>`_.
"""

def __init__(self, pipeline: BasePipeline, coverage: float = 0.95, include_forecast: bool = True, stride: int = 1):
"""Initialize instance of ``EmpiricalPredictionIntervals`` with given parameters.
Parameters
----------
pipeline:
Base pipeline or ensemble for prediction intervals estimation.
coverage:
Interval coverage. In literature this value maybe referred as ``1 - alpha``.
include_forecast:
Ensure that the forecast lies within the prediction interval.
stride:
Number of points between folds.
"""
if not (0 <= coverage <= 1):
raise ValueError("Parameter `coverage` must be non-negative number not greater than 1!")

if stride <= 0:
raise ValueError("Parameter `stride` must be positive!")

self.coverage = coverage
self.include_forecast = include_forecast
self.stride = stride

super().__init__(pipeline=pipeline)

def _forecast_prediction_interval(
self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int
) -> TSDataset:
"""Estimate and store prediction intervals.
Parameters
----------
ts:
Dataset to forecast.
predictions:
Dataset with point predictions.
quantiles:
Levels of prediction distribution.
n_folds:
Number of folds to use in the backtest for prediction interval estimation.
Returns
-------
:
Dataset with predictions.
"""
residuals = residuals_matrices(pipeline=self, ts=ts, n_folds=n_folds, stride=self.stride)

prediction_target = predictions[:, :, "target"]

levels = 0.5 + np.array([-0.5, 0.5]) * self.coverage
lower_quantile, upper_quantile = np.quantile(residuals, levels, axis=0)

# cutoffs to keep prediction inside interval
if self.include_forecast:
upper_quantile = np.maximum(upper_quantile, 0)
lower_quantile = np.minimum(lower_quantile, 0)

upper_border = prediction_target + upper_quantile
lower_border = prediction_target + lower_quantile

upper_border.rename({"target": "target_upper"}, inplace=True, axis=1)
lower_border.rename({"target": "target_lower"}, inplace=True, axis=1)

intervals_df = pd.concat([lower_border, upper_border], axis=1)
predictions.add_prediction_intervals(prediction_intervals_df=intervals_df)
return predictions
146 changes: 146 additions & 0 deletions tests/test_experimental/test_prediction_intervals/test_empirical.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from unittest.mock import MagicMock

import numpy as np
import pandas as pd
import pytest

from etna.ensembles import DirectEnsemble
from etna.ensembles import StackingEnsemble
from etna.ensembles import VotingEnsemble
from etna.experimental.prediction_intervals import EmpiricalPredictionIntervals
from etna.models import NaiveModel
from etna.pipeline import AutoRegressivePipeline
from etna.pipeline import HierarchicalPipeline
from etna.pipeline import Pipeline
from etna.reconciliation import BottomUpReconciliator
from tests.test_experimental.test_prediction_intervals.common import get_arima_pipeline
from tests.test_experimental.test_prediction_intervals.common import get_catboost_pipeline
from tests.test_experimental.test_prediction_intervals.common import get_naive_pipeline
from tests.test_experimental.test_prediction_intervals.common import get_naive_pipeline_with_transforms
from tests.test_experimental.test_prediction_intervals.common import run_base_pipeline_compat_check


@pytest.mark.parametrize("stride", (-1, 0))
def test_invalid_stride_parameter_error(stride):
with pytest.raises(ValueError, match="Parameter `stride` must be positive!"):
EmpiricalPredictionIntervals(pipeline=Pipeline(model=NaiveModel()), stride=stride)


@pytest.mark.parametrize("coverage", (-3, -1))
def test_invalid_coverage_parameter_error(coverage):
with pytest.raises(ValueError, match="Parameter `coverage` must be non-negative"):
EmpiricalPredictionIntervals(pipeline=Pipeline(model=NaiveModel()), coverage=coverage)


@pytest.mark.parametrize("pipeline_name", ("naive_pipeline", "naive_pipeline_with_transforms"))
def test_pipeline_fit_forecast_without_intervals(example_tsds, pipeline_name, request):
pipeline = request.getfixturevalue(pipeline_name)

intervals_pipeline = EmpiricalPredictionIntervals(pipeline=pipeline)

intervals_pipeline.fit(ts=example_tsds)

intervals_pipeline_pred = intervals_pipeline.forecast(prediction_interval=False)
pipeline_pred = pipeline.forecast(prediction_interval=False)

pd.testing.assert_frame_equal(intervals_pipeline_pred.df, pipeline_pred.df)


@pytest.mark.parametrize("stride", (2, 5, 10))
@pytest.mark.parametrize("expected_columns", ({"target", "target_lower", "target_upper"},))
def test_valid_strides(example_tsds, expected_columns, stride):
intervals_pipeline = EmpiricalPredictionIntervals(pipeline=Pipeline(model=NaiveModel(), horizon=5), stride=stride)
run_base_pipeline_compat_check(
ts=example_tsds, intervals_pipeline=intervals_pipeline, expected_columns=expected_columns
)


@pytest.mark.parametrize(
"expected_columns",
({"target", "target_lower", "target_upper"},),
)
@pytest.mark.parametrize(
"pipeline",
(
get_naive_pipeline(horizon=1),
get_naive_pipeline_with_transforms(horizon=1),
AutoRegressivePipeline(model=NaiveModel(), horizon=1),
HierarchicalPipeline(
model=NaiveModel(),
horizon=1,
reconciliator=BottomUpReconciliator(target_level="market", source_level="product"),
),
),
)
def test_pipelines_forecast_intervals_exist(product_level_constant_hierarchical_ts, pipeline, expected_columns):
intervals_pipeline = EmpiricalPredictionIntervals(pipeline=pipeline)
run_base_pipeline_compat_check(
ts=product_level_constant_hierarchical_ts,
intervals_pipeline=intervals_pipeline,
expected_columns=expected_columns,
)


@pytest.mark.parametrize("pipeline", (get_arima_pipeline(horizon=5),))
def test_forecast_prediction_intervals_is_used(example_tsds, pipeline):
intervals_pipeline = EmpiricalPredictionIntervals(pipeline=pipeline)
intervals_pipeline._forecast_prediction_interval = MagicMock()

intervals_pipeline.fit(ts=example_tsds)
intervals_pipeline.forecast(prediction_interval=True)
intervals_pipeline._forecast_prediction_interval.assert_called()


@pytest.mark.parametrize(
"pipeline",
(
get_naive_pipeline(horizon=5),
get_naive_pipeline_with_transforms(horizon=5),
AutoRegressivePipeline(model=NaiveModel(), horizon=5),
get_catboost_pipeline(horizon=5),
get_arima_pipeline(horizon=5),
),
)
def test_pipelines_forecast_intervals_valid(example_tsds, pipeline):
intervals_pipeline = EmpiricalPredictionIntervals(pipeline=pipeline, include_forecast=True)
intervals_pipeline.fit(ts=example_tsds)

prediction = intervals_pipeline.forecast(prediction_interval=True)
assert np.all(prediction[:, :, "target_lower"].values <= prediction[:, :, "target"].values)
assert np.all(prediction[:, :, "target"].values <= prediction[:, :, "target_upper"].values)


@pytest.mark.parametrize(
"expected_columns",
({"target", "target_lower", "target_upper"},),
)
@pytest.mark.parametrize(
"ensemble",
(
DirectEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=2)]),
VotingEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=1)]),
StackingEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=1)]),
),
)
def test_ensembles_forecast_intervals_exist(example_tsds, ensemble, expected_columns):
intervals_pipeline = EmpiricalPredictionIntervals(pipeline=ensemble)
run_base_pipeline_compat_check(
ts=example_tsds, intervals_pipeline=intervals_pipeline, expected_columns=expected_columns
)


@pytest.mark.parametrize(
"ensemble",
(
DirectEnsemble(pipelines=[get_naive_pipeline(horizon=5), get_naive_pipeline_with_transforms(horizon=6)]),
VotingEnsemble(pipelines=[get_naive_pipeline(horizon=5), get_naive_pipeline_with_transforms(horizon=5)]),
StackingEnsemble(pipelines=[get_naive_pipeline(horizon=5), get_naive_pipeline_with_transforms(horizon=5)]),
),
)
def test_ensembles_forecast_intervals_valid(example_tsds, ensemble):
intervals_pipeline = EmpiricalPredictionIntervals(pipeline=ensemble, include_forecast=True)
intervals_pipeline.fit(ts=example_tsds)

prediction = intervals_pipeline.forecast(prediction_interval=True)
assert np.all(prediction[:, :, "target_lower"].values <= prediction[:, :, "target"].values)
assert np.all(prediction[:, :, "target"].values <= prediction[:, :, "target_upper"].values)

0 comments on commit 03de185

Please sign in to comment.