Skip to content

Commit

Permalink
[BUG] Ensembles works incorrectly with HierarchicalPipeline (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-hse-repository authored Dec 8, 2023
1 parent 03de185 commit 9e577bb
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 23 deletions.
4 changes: 3 additions & 1 deletion etna/ensembles/direct_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def _merge(self, forecasts: List[TSDataset]) -> TSDataset:
# TODO: Fix slicing with explicit passing the segments in issue #775
horizon, forecast = horizons[idx], forecasts[idx][:, segments, "target"]
forecast_df.iloc[:horizon] = forecast
forecast_dataset = TSDataset(df=forecast_df, freq=forecasts[0].freq)
forecast_dataset = TSDataset(
df=forecast_df, freq=forecasts[0].freq, hierarchical_structure=forecasts[0].hierarchical_structure
)
return forecast_dataset

def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
Expand Down
14 changes: 12 additions & 2 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ def __init__(
self.joblib_params = joblib_params
super().__init__(horizon=self._get_horizon(pipelines=pipelines))

def _make_same_level(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset:
if ts.has_hierarchy():
if ts.current_df_level != forecasts[0].current_df_level:
ts = ts.get_level_dataset(forecasts[0].current_df_level) # type: ignore
return ts

def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set[str]]:
"""Return all the features from ``features_to_use`` which can be obtained from base models' forecasts."""
features_df = pd.concat([forecast.df for forecast in forecasts], axis=1)
Expand Down Expand Up @@ -132,7 +138,7 @@ def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set
def _backtest_pipeline(self, pipeline: BasePipeline, ts: TSDataset) -> TSDataset:
"""Get forecasts from backtest for given pipeline."""
forecasts = pipeline.get_historical_forecasts(ts=ts, n_folds=self.n_folds)
forecasts = TSDataset(df=forecasts, freq=ts.freq)
forecasts = TSDataset(df=forecasts, freq=ts.freq, hierarchical_structure=ts.hierarchical_structure)
return forecasts

def fit(self, ts: TSDataset, save_ts: bool = True) -> "StackingEnsemble":
Expand Down Expand Up @@ -174,6 +180,8 @@ def _make_features(
self, ts: TSDataset, forecasts: List[TSDataset], train: bool = False
) -> Tuple[pd.DataFrame, Optional[pd.Series]]:
"""Prepare features for the ``final_model``."""
ts = self._make_same_level(ts=ts, forecasts=forecasts)

# Stack targets from the forecasts
targets = [
forecast[:, :, "target"].rename({"target": f"regressor_target_{i}"}, level="feature", axis=1)
Expand Down Expand Up @@ -210,6 +218,8 @@ def _make_features(
return x, None

def _process_forecasts(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDataset:
ts = self._make_same_level(ts=ts, forecasts=forecasts)

x, _ = self._make_features(ts=ts, forecasts=forecasts, train=False)
y = self.final_model.predict(x)
num_segments = len(forecasts[0].segments)
Expand All @@ -225,7 +235,7 @@ def _process_forecasts(self, ts: TSDataset, forecasts: List[TSDataset]) -> TSDat
df = forecasts[0][:, :, "target"].copy()
df.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = np.NAN

result = TSDataset(df=df, freq=ts.freq, df_exog=df_exog)
result = TSDataset(df=df, freq=ts.freq, df_exog=df_exog, hierarchical_structure=ts.hierarchical_structure)
result.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = y
return result

Expand Down
4 changes: 3 additions & 1 deletion etna/ensembles/voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ def _vote(self, forecasts: List[TSDataset]) -> TSDataset:
forecast_df = sum(
[forecast[:, :, "target"] * weight for forecast, weight in zip(forecasts, self.processed_weights)]
)
forecast_dataset = TSDataset(df=forecast_df, freq=forecasts[0].freq)
forecast_dataset = TSDataset(
df=forecast_df, freq=forecasts[0].freq, hierarchical_structure=forecasts[0].hierarchical_structure
)
return forecast_dataset

def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
Expand Down
8 changes: 7 additions & 1 deletion etna/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,9 @@ def _validate_backtest_stride(n_folds: Union[int, List[FoldMask]], horizon: int,
return stride

@staticmethod
def _validate_backtest_dataset(ts: TSDataset, n_folds: int, horizon: int, stride: int):
def _validate_backtest_dataset(
ts: TSDataset, n_folds: int, horizon: int, stride: int
): # TODO: try to optimize, works really slow on datasets with large number of segments
"""Check all segments have enough timestamps to validate forecaster with given number of splits."""
min_required_length = horizon + (n_folds - 1) * stride
segments = set(ts.df.columns.get_level_values("segment"))
Expand Down Expand Up @@ -673,6 +675,10 @@ def _compute_metrics(
self, metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset
) -> Dict[str, Dict[str, float]]:
"""Compute metrics for given y_true, y_pred."""
if y_true.has_hierarchy():
if y_true.current_df_level != y_pred.current_df_level:
y_true = y_true.get_level_dataset(y_pred.current_df_level) # type: ignore

metrics_values: Dict[str, Dict[str, float]] = {}
for metric in metrics:
metrics_values[metric.name] = metric(y_true=y_true, y_pred=y_pred) # type: ignore
Expand Down
18 changes: 0 additions & 18 deletions etna/pipeline/hierarchical_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence

import pandas as pd

from etna.datasets.hierarchical_structure import HierarchicalStructure
from etna.datasets.tsdataset import TSDataset
from etna.metrics import Metric
from etna.models.base import ModelType
from etna.pipeline.pipeline import Pipeline
from etna.reconciliation.base import BaseReconciliator
Expand Down Expand Up @@ -278,21 +275,6 @@ def predict(
forecast_reconciled = self.reconciliator.reconcile(forecast)
return forecast_reconciled

def _compute_metrics(
self, metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset
) -> Dict[str, Dict[str, float]]:
"""Compute metrics for given y_true, y_pred."""
if y_true.current_df_level != self.reconciliator.target_level:
y_true = y_true.get_level_dataset(self.reconciliator.target_level)

if y_pred.current_df_level == self.reconciliator.source_level:
y_pred = self.reconciliator.reconcile(y_pred)

metrics_values: Dict[str, Dict[str, float]] = {}
for metric in metrics:
metrics_values[metric.name] = metric(y_true=y_true, y_pred=y_pred) # type: ignore
return metrics_values

def _forecast_prediction_interval(
self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int
) -> TSDataset:
Expand Down
20 changes: 20 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,20 @@ def product_level_df():
return df


@pytest.fixture
def product_level_df_long_history(periods=200):
n_segments = 4
df = pd.DataFrame(
{
"timestamp": list(pd.date_range(start="2000-01-01", periods=periods, freq="D")) * n_segments,
"segment": ["a"] * periods + ["b"] * periods + ["c"] * periods + ["d"] * periods,
"target": np.random.uniform(low=0, high=100, size=periods * n_segments),
}
)
df = TSDataset.to_dataset(df)
return df


@pytest.fixture
def product_level_df_w_nans():
df = pd.DataFrame(
Expand Down Expand Up @@ -644,6 +658,12 @@ def product_level_simple_hierarchical_ts(product_level_df, hierarchical_structur
return ts


@pytest.fixture
def product_level_simple_hierarchical_ts_long_history(product_level_df_long_history, hierarchical_structure):
ts = TSDataset(df=product_level_df_long_history, freq="D", hierarchical_structure=hierarchical_structure)
return ts


@pytest.fixture
def simple_no_hierarchy_ts(market_level_df):
ts = TSDataset(df=market_level_df, freq="D")
Expand Down
71 changes: 71 additions & 0 deletions tests/test_ensembles/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from etna.models import CatBoostPerSegmentModel
from etna.models import NaiveModel
from etna.models import ProphetModel
from etna.pipeline import HierarchicalPipeline
from etna.pipeline import Pipeline
from etna.reconciliation import BottomUpReconciliator
from etna.reconciliation import TopDownReconciliator
from etna.transforms import DateFlagsTransform
from etna.transforms import LagTransform

Expand Down Expand Up @@ -45,6 +48,42 @@ def naive_pipeline() -> Pipeline:
return pipeline


@pytest.fixture
def naive_pipeline_top_down_market_14() -> Pipeline:
"""Generate pipeline with NaiveModel."""
pipeline = HierarchicalPipeline(
model=NaiveModel(14),
transforms=[],
horizon=14,
reconciliator=TopDownReconciliator(source_level="total", target_level="market", period=7, method="AHP"),
)
return pipeline


@pytest.fixture
def naive_pipeline_top_down_product_14() -> Pipeline:
"""Generate pipeline with NaiveModel."""
pipeline = HierarchicalPipeline(
model=NaiveModel(14),
transforms=[],
horizon=14,
reconciliator=TopDownReconciliator(source_level="total", target_level="product", period=7, method="AHP"),
)
return pipeline


@pytest.fixture
def naive_pipeline_bottom_up_market_14() -> Pipeline:
"""Generate pipeline with NaiveModel."""
pipeline = HierarchicalPipeline(
model=NaiveModel(14),
transforms=[],
horizon=14,
reconciliator=BottomUpReconciliator(source_level="product", target_level="market"),
)
return pipeline


@pytest.fixture
def naive_pipeline_1() -> Pipeline:
"""Generate pipeline with NaiveModel(1)."""
Expand All @@ -67,6 +106,22 @@ def voting_ensemble_pipeline(
return pipeline


@pytest.fixture
def voting_ensemble_hierarchical_pipeline(
naive_pipeline_top_down_market_14: HierarchicalPipeline, naive_pipeline_bottom_up_market_14: HierarchicalPipeline
) -> VotingEnsemble:
pipeline = VotingEnsemble(pipelines=[naive_pipeline_top_down_market_14, naive_pipeline_bottom_up_market_14])
return pipeline


@pytest.fixture
def voting_ensemble_mix_pipeline(
naive_pipeline: Pipeline, naive_pipeline_top_down_product_14: HierarchicalPipeline
) -> VotingEnsemble:
pipeline = VotingEnsemble(pipelines=[naive_pipeline, naive_pipeline_top_down_product_14])
return pipeline


@pytest.fixture
def voting_ensemble_naive(naive_pipeline_1: Pipeline, naive_pipeline_2: Pipeline) -> VotingEnsemble:
pipeline = VotingEnsemble(pipelines=[naive_pipeline_1, naive_pipeline_2])
Expand All @@ -81,6 +136,22 @@ def stacking_ensemble_pipeline(
return pipeline


@pytest.fixture
def stacking_ensemble_hierarchical_pipeline(
naive_pipeline_top_down_market_14: HierarchicalPipeline, naive_pipeline_bottom_up_market_14: HierarchicalPipeline
) -> StackingEnsemble:
pipeline = StackingEnsemble(pipelines=[naive_pipeline_top_down_market_14, naive_pipeline_bottom_up_market_14])
return pipeline


@pytest.fixture
def stacking_ensemble_mix_pipeline(
naive_pipeline: Pipeline, naive_pipeline_top_down_product_14: HierarchicalPipeline
) -> StackingEnsemble:
pipeline = StackingEnsemble(pipelines=[naive_pipeline, naive_pipeline_top_down_product_14])
return pipeline


@pytest.fixture
def naive_featured_pipeline_1() -> Pipeline:
"""Generate pipeline with NaiveModel(1)."""
Expand Down
80 changes: 80 additions & 0 deletions tests/test_ensembles/test_direct_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
from etna.datasets import TSDataset
from etna.datasets import generate_from_patterns_df
from etna.ensembles import DirectEnsemble
from etna.metrics import MAE
from etna.models import NaiveModel
from etna.pipeline import HierarchicalPipeline
from etna.pipeline import Pipeline
from etna.reconciliation import TopDownReconciliator
from tests.test_pipeline.utils import assert_pipeline_equals_loaded_original
from tests.test_pipeline.utils import assert_pipeline_forecast_raise_error_if_no_ts
from tests.test_pipeline.utils import assert_pipeline_forecasts_given_ts
Expand All @@ -26,6 +29,54 @@ def direct_ensemble_pipeline() -> DirectEnsemble:
return ensemble


@pytest.fixture
def naive_pipeline_top_down_market_7() -> Pipeline:
"""Generate pipeline with NaiveModel."""
pipeline = HierarchicalPipeline(
model=NaiveModel(),
transforms=[],
horizon=7,
reconciliator=TopDownReconciliator(source_level="total", target_level="market", period=14, method="AHP"),
)
return pipeline


@pytest.fixture
def naive_pipeline_top_down_product_7() -> Pipeline:
"""Generate pipeline with NaiveModel."""
pipeline = HierarchicalPipeline(
model=NaiveModel(),
transforms=[],
horizon=7,
reconciliator=TopDownReconciliator(source_level="total", target_level="product", period=14, method="AHP"),
)
return pipeline


@pytest.fixture
def direct_ensemble_hierarchical_pipeline(
naive_pipeline_top_down_market_7, naive_pipeline_bottom_up_market_14
) -> DirectEnsemble:
ensemble = DirectEnsemble(
pipelines=[
naive_pipeline_top_down_market_7,
naive_pipeline_bottom_up_market_14,
]
)
return ensemble


@pytest.fixture
def direct_ensemble_mix_pipeline(naive_pipeline_top_down_product_7) -> DirectEnsemble:
ensemble = DirectEnsemble(
pipelines=[
naive_pipeline_top_down_product_7,
Pipeline(model=NaiveModel(), transforms=[], horizon=14),
]
)
return ensemble


@pytest.fixture
def simple_ts_train():
df = generate_from_patterns_df(patterns=[[1, 3, 5], [2, 4, 6], [7, 9, 11]], periods=3, start_time="2000-01-01")
Expand Down Expand Up @@ -123,3 +174,32 @@ def test_predict_with_return_components_fails(example_tsds, direct_ensemble_pipe
def test_params_to_tune_not_implemented(direct_ensemble_pipeline):
with pytest.raises(NotImplementedError, match="DirectEnsemble doesn't support this method"):
_ = direct_ensemble_pipeline.params_to_tune()


@pytest.mark.parametrize("n_jobs", (1, 5))
def test_backtest(direct_ensemble_pipeline, example_tsds, n_jobs: int):
results = direct_ensemble_pipeline.backtest(ts=example_tsds, metrics=[MAE()], n_jobs=n_jobs, n_folds=3)
for df in results:
assert isinstance(df, pd.DataFrame)


@pytest.mark.parametrize("n_jobs", (1, 5))
def test_backtest_hierarchical_pipeline(
direct_ensemble_hierarchical_pipeline, product_level_simple_hierarchical_ts_long_history: TSDataset, n_jobs: int
):
results = direct_ensemble_hierarchical_pipeline.backtest(
ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3
)
for df in results:
assert isinstance(df, pd.DataFrame)


@pytest.mark.parametrize("n_jobs", (1, 5))
def test_backtest_mix_pipeline(
direct_ensemble_mix_pipeline, product_level_simple_hierarchical_ts_long_history: TSDataset, n_jobs: int
):
results = direct_ensemble_mix_pipeline.backtest(
ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3
)
for df in results:
assert isinstance(df, pd.DataFrame)
28 changes: 28 additions & 0 deletions tests/test_ensembles/test_stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,34 @@ def test_backtest(stacking_ensemble_pipeline: StackingEnsemble, example_tsds: TS
assert isinstance(df, pd.DataFrame)


@pytest.mark.parametrize("n_jobs", (1, 5))
def test_backtest_hierarchical_pipeline(
stacking_ensemble_hierarchical_pipeline: StackingEnsemble,
product_level_simple_hierarchical_ts_long_history: TSDataset,
n_jobs: int,
):
"""Check that backtest works with StackingEnsemble of hierarchical pipelines."""
results = stacking_ensemble_hierarchical_pipeline.backtest(
ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3
)
for df in results:
assert isinstance(df, pd.DataFrame)


@pytest.mark.parametrize("n_jobs", (1, 5))
def test_backtest_mix_pipeline(
stacking_ensemble_mix_pipeline: StackingEnsemble,
product_level_simple_hierarchical_ts_long_history: TSDataset,
n_jobs: int,
):
"""Check that backtest works with StackingEnsemble of pipeline and hierarchical pipeline."""
results = stacking_ensemble_mix_pipeline.backtest(
ts=product_level_simple_hierarchical_ts_long_history, metrics=[MAE()], n_jobs=n_jobs, n_folds=3
)
for df in results:
assert isinstance(df, pd.DataFrame)


@pytest.mark.parametrize("n_jobs", (1, 5))
def test_get_historical_forecasts(stacking_ensemble_pipeline: StackingEnsemble, example_tsds: TSDataset, n_jobs: int):
"""Check that get_historical_forecasts works with StackingEnsemble."""
Expand Down
Loading

0 comments on commit 9e577bb

Please sign in to comment.