diff --git a/darts/dataprocessing/transformers/__init__.py b/darts/dataprocessing/transformers/__init__.py index e8efdfc919..5080760af3 100644 --- a/darts/dataprocessing/transformers/__init__.py +++ b/darts/dataprocessing/transformers/__init__.py @@ -9,6 +9,7 @@ from .fittable_data_transformer import FittableDataTransformer from .invertible_data_transformer import InvertibleDataTransformer from .mappers import InvertibleMapper, Mapper +from .midas import MIDAS from .missing_values_filler import MissingValuesFiller from .reconciliation import ( BottomUpReconciliator, diff --git a/darts/dataprocessing/transformers/midas.py b/darts/dataprocessing/transformers/midas.py new file mode 100644 index 0000000000..4e83220210 --- /dev/null +++ b/darts/dataprocessing/transformers/midas.py @@ -0,0 +1,218 @@ +""" +Mixed-data sampling (MIDAS) Transformer +------------------ +""" +from typing import Any, Mapping + +import numpy as np +import pandas as pd +from pandas import DatetimeIndex + +from darts import TimeSeries +from darts.dataprocessing.transformers import BaseDataTransformer +from darts.logging import get_logger, raise_log + +logger = get_logger(__name__) + + +class MIDAS(BaseDataTransformer): + def __init__( + self, + rule: str, + strip: bool = True, + name: str = "MIDASTransformer", + n_jobs: int = 1, + verbose: bool = False, + ): + """Mixed-data sampling transformer. + + A transformer that converts higher frequency time series to lower frequency using mixed-data sampling; see + [1]_ for further details. This allows higher frequency covariates to be used whilst forecasting a lower + frequency target series. For example, using monthly inputs to forecast a quarterly target. + + Notes + ----- + The high input frequency should always relate in the same rate to the low target frequency. For + example, there's always three months in quarter. However, the number of days in a month varies per month. So in + the latter case a MIDAS transformation does not work and the transformer will raise an error. + + Parameters + ---------- + rule + The offset string or object representing target conversion. Passed on to the rule parameter in + pandas.DataFrame.resample and therefore it is equivalent to it. + strip + Whether to strip -remove the NaNs from the start and the end of- the transformed series. + + Examples + -------- + >>> from darts.datasets import AirPassengersDataset + >>> from darts.dataprocessing.transformers import MIDAS + >>> monthly_series = AirPassengersDataset().load() + >>> midas = MIDAS(rule="QS") + >>> quarterly_series = midas.transform(monthly_series) + >>> print(quarterly_series.head()) + + array([[[112.], + [118.], + [132.]], + + [[129.], + [121.], + [135.]], + + [[148.], + [148.], + [136.]], + + [[119.], + [104.], + [118.]], + + [[115.], + [126.], + [141.]]]) + Coordinates: + * Month (Month) datetime64[ns] 1949-01-01 1949-04-01 ... 1950-01-01 + * component (component) object '#Passengers_0' ... '#Passengers_2' + Dimensions without coordinates: sample + Attributes: + static_covariates: None + hierarchy: None + + References + ---------- + .. [1] https://en.wikipedia.org/wiki/Mixed-data_sampling + """ + self._rule = rule + self._strip = strip + super().__init__(name, n_jobs, verbose) + + @staticmethod + def ts_transform(series: TimeSeries, params: Mapping[str, Any]) -> TimeSeries: + """ + Transforms series from high to low frequency using a mixed-data sampling approach. Uses and relies on + pandas.DataFrame.resample. + + Steps: + (1) Transform series to pd.DataFrame and get frequency string for PeriodIndex + (2) Downsample series and then upsample it again + (3) Replace input series by unsampled series if it's not 'full' + (4) Transform every column of the high frequency series into multiple columns for the low frequency series + (5) Transform the low frequency series back into a TimeSeries + """ + rule, strip = params["fixed"]["_rule"], params["fixed"]["_strip"] + high_freq_datetime = series.freq_str + + # TimeSeries to pd.DataFrame + series_df = series.pd_dataframe() + series_copy_df = series_df.copy() + + # get high frequency string that's suitable for PeriodIndex + series_period_index_df = series_df.copy() + series_period_index_df.index = series_df.index.to_period() + high_freq_period = series_period_index_df.index.freqstr + + # downsample + low_freq_series_df = series_df.resample(rule).last() + low_index_datetime = low_freq_series_df.index + + # upsample again to get full range of high freq periods for every low freq period + low_freq_series_df.index = low_index_datetime.to_period() + high_freq_series_df = low_freq_series_df.resample(high_freq_period).last() + + # make sure the extension of the index matches the original index + if "End" in str(series.freq): + args_to_timestamp = {"freq": high_freq_period} + else: + args_to_timestamp = {"how": "start"} + high_index_datetime = high_freq_series_df.index.to_timestamp( + **args_to_timestamp + ) + + # check if user requested a transform from a high to a low frequency (otherwise raise an error) + _assert_high_to_low_freq( + high_freq_series_df=high_freq_series_df, + low_freq_series_df=low_freq_series_df, + rule=rule, + high_freq=high_freq_datetime, + ) + + # if necessary, expand the original series + if len(high_index_datetime) > series_df.shape[0]: + series_df = pd.DataFrame( + np.nan, index=high_index_datetime, columns=series_copy_df.columns + ) + series_df.loc[series_copy_df.index, :] = series_copy_df.values + + # make multiple low frequency columns out of the high frequency column(s) + midas_df = _create_midas_df( + series_df=series_df, + low_index_datetime=low_index_datetime, + ) + + # back to TimeSeries + midas_ts = TimeSeries.from_dataframe(midas_df) + if strip: + midas_ts = midas_ts.strip() + + return midas_ts + + +def _assert_high_to_low_freq( + high_freq_series_df: pd.DataFrame, + low_freq_series_df: pd.DataFrame, + rule, + high_freq, +): + """ + Asserts that the lower frequency series really has a lower frequency then the assumed higher frequency series. + """ + if not low_freq_series_df.shape[0] < high_freq_series_df.shape[0]: + raise_log( + ValueError( + f"The target conversion should go from a high to a " + f"low frequency, instead the targeted frequency is " + f"{rule}, while the original frequency is {high_freq}." + ) + ) + + +def _create_midas_df( + series_df: pd.DataFrame, + low_index_datetime: DatetimeIndex, +) -> pd.DataFrame: + """ + Function for actually creating the lower frequency dataframe out of a higher frequency dataframe. + """ + # calculate the multiple + n_high = series_df.shape[0] + n_low = len(low_index_datetime) + multiple = n_high / n_low + + if not multiple.is_integer(): + raise_log( + ValueError( + "The frequency of the high frequency input series should be an exact multiple of the targeted" + "low frequency output. For example, you could go from a monthly series to a quarterly series." + ) + ) + else: + multiple = int(multiple) + + # set up integer index + range_lst = list(range(n_high)) + col_names = list(series_df.columns) + midas_lst = [] + + # for every column we now create 'multiple' columns + # by going through a column and picking every one in 'multiple' values + for f in range(multiple): + range_lst_tmp = range_lst[f:][0::multiple] + series_tmp_df = series_df.iloc[range_lst_tmp, :] + series_tmp_df.index = low_index_datetime + col_names_tmp = [col_name + f"_{f}" for col_name in col_names] + rename_dict_tmp = dict(zip(col_names, col_names_tmp)) + midas_lst += [series_tmp_df.rename(columns=rename_dict_tmp)] + + return pd.concat(midas_lst, axis=1) diff --git a/darts/tests/dataprocessing/transformers/test_midas.py b/darts/tests/dataprocessing/transformers/test_midas.py new file mode 100644 index 0000000000..e6d1d4f20d --- /dev/null +++ b/darts/tests/dataprocessing/transformers/test_midas.py @@ -0,0 +1,131 @@ +import unittest + +import numpy as np +import pandas as pd + +from darts import TimeSeries +from darts.dataprocessing.transformers import MIDAS + + +class MIDASTestCase(unittest.TestCase): + monthly_values = np.arange(1, 10) + monthly_times = pd.date_range(start="01-2020", periods=9, freq="M") + monthly_ts = TimeSeries.from_times_and_values( + times=monthly_times, values=monthly_values, columns=["values"] + ) + + monthly_not_complete_ts = monthly_ts[2:-1] + + quarterly_values = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + quarterly_times = pd.date_range(start="01-2020", periods=3, freq="QS") + quarterly_ts = TimeSeries.from_times_and_values( + times=quarterly_times, + values=quarterly_values, + columns=["values_0", "values_1", "values_2"], + ) + + quarterly_values = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + quarterly_end_times = pd.date_range(start="01-2020", periods=3, freq="Q") + quarterly_with_quarter_end_index_ts = TimeSeries.from_times_and_values( + times=quarterly_end_times, + values=quarterly_values, + columns=["values_0", "values_1", "values_2"], + ) + + quarterly_not_complete_values = np.array( + [[np.nan, np.nan, 3], [4, 5, 6], [7, 8, np.nan]] + ) + quarterly_times = pd.date_range(start="01-2020", periods=3, freq="QS") + quarterly_not_complete_ts = TimeSeries.from_times_and_values( + times=quarterly_times, + values=quarterly_not_complete_values, + columns=["values_0", "values_1", "values_2"], + ) + + daily_times = pd.date_range(start="01-2020", end="09-30-2020", freq="D") + daily_values = np.arange(1, len(daily_times) + 1) + daily_ts = TimeSeries.from_times_and_values( + times=daily_times, values=daily_values, columns=["values"] + ) + + second_times = pd.date_range(start="01-2020", periods=120, freq="S") + second_values = np.arange(1, len(second_times) + 1) + second_ts = TimeSeries.from_times_and_values( + times=second_times, values=second_values, columns=["values"] + ) + + minute_times = pd.date_range(start="01-2020", periods=2, freq="T") + minute_values = np.array([[i for i in range(1, 61)], [i for i in range(61, 121)]]) + minute_ts = TimeSeries.from_times_and_values( + times=minute_times, + values=minute_values, + columns=[f"values_{i}" for i in range(60)], + ) + + def test_complete_monthly_to_quarterly(self): + """ + Tests if monthly series is transformed into a quarterly series in the expected way. + """ + # 'complete' monthly series + midas_1 = MIDAS(rule="QS") + quarterly_midas_ts = midas_1.transform(self.monthly_ts) + self.assertEqual( + quarterly_midas_ts, + self.quarterly_ts, + "Monthly TimeSeries is not correctly transformed " + "into a quarterly TimeSeries.", + ) + + # 'complete' monthly series + midas_2 = MIDAS(rule="Q") + quarterly_midas_ts = midas_2.transform(self.monthly_ts) + self.assertEqual( + quarterly_midas_ts, + self.quarterly_with_quarter_end_index_ts, + "Monthly TimeSeries is not correctly transformed " + "into a quarterly TimeSeries. Specifically, when the rule requires an QuarterEnd index.", + ) + + def test_not_complete_monthly_to_quarterly(self): + """ + Tests if a not 'complete' monthly series is transformed into a quarterly series in the expected way. + """ + # not 'complete' monthly series + midas = MIDAS(rule="QS", strip=False) + quarterly_midas_not_complete_ts = midas.transform(self.monthly_not_complete_ts) + self.assertEqual( + quarterly_midas_not_complete_ts, + self.quarterly_not_complete_ts, + "Monthly TimeSeries is not " + "correctly transformed when" + " it is not 'complete'.", + ) + + def test_error_when_from_low_to_high(self): + """ + Tests if the transformer raises an error when the user asks for a transform in the wrong direction. + """ + # wrong direction / low to high freq + midas_1 = MIDAS(rule="M") + self.assertRaises(ValueError, midas_1.transform, self.quarterly_ts) + + # transform to same index requested + midas_2 = MIDAS(rule="Q") + self.assertRaises(ValueError, midas_2.transform, self.quarterly_ts) + + def test_error_when_frequency_not_suitable_for_midas(self): + """ + MIDAS can only be performed when the high frequency is the same and the exact multiple of the low frequency. + For example, there are always exactly three months in a quarter, but the number of days in a month differs. + So the monthly to quarterly transformation is possible, while the daily to monthly MIDAS transform is + impossible. + """ + midas = MIDAS(rule="M") + self.assertRaises(ValueError, midas.transform, self.daily_ts) + + def test_from_second_to_minute(self): + """ + Test to see if other frequencies transforms like second to minute work as well. + """ + midas = MIDAS(rule="T") + self.assertEqual(midas.transform(self.second_ts), self.minute_ts)