From c4a850298648c764cbc32ad8992de55c0eb8490f Mon Sep 17 00:00:00 2001 From: Jasper Zschiegner Date: Fri, 26 May 2023 14:48:06 +0200 Subject: [PATCH 1/2] Add HierarchicalDataset, remove HierarchicalTimeSeries. --- src/gluonts/dataset/hierarchical.py | 215 ++---------------- test/dataset/test_hierarchical.py | 154 ------------- .../test_train_prediction_with_hts.py | 82 +++---- 3 files changed, 44 insertions(+), 407 deletions(-) delete mode 100644 test/dataset/test_hierarchical.py diff --git a/src/gluonts/dataset/hierarchical.py b/src/gluonts/dataset/hierarchical.py index 008228f3bc..551bd1ae95 100644 --- a/src/gluonts/dataset/hierarchical.py +++ b/src/gluonts/dataset/hierarchical.py @@ -11,214 +11,25 @@ # express or implied. See the License for the specific language governing # permissions and limitations under the License. - -# Standard library imports +from dataclasses import dataclass from typing import Optional -# Third-party imports import numpy as np -import pandas as pd - -# First-party imports -from gluonts.dataset.pandas import PandasDataset - - -class HierarchicalTimeSeries: - r""" - Class for representing hierarchical time series. - - The hierarchy is represented by the standard aggregation matrix `S`. - The time series at the bottom (leaf) level of the hierarchy - (`ts_at_bottom_level`) are assumed to be given by the columns of - a single pandas dataframe. - - The ordering of columns of `ts_at_bottom_level` should be consistent - with the ordering of the columns of `S`. - - Parameters - ---------- - ts_at_bottom_level - A single pandas dataframe whose columns are the time series - corresponding to the leaves of the hierarchy. - S - Summation or aggregation matrix whose ordering should be consistent - with the ordering of the columns of `ts_at_all_levels`. - In particular, the bottom `k x k` sub-matrix should be identity matrix, - where `k` is the number of leaves of the hierarchy. - """ - - def __init__( - self, - ts_at_bottom_level: pd.DataFrame, - S: np.ndarray, - ): - assert isinstance(ts_at_bottom_level.index, pd.PeriodIndex), ( - "Index of `ts_at_bottom_level` must be an instance of " - "`pd.PeriodIndex`." - ) - - self._freq = ts_at_bottom_level.index.freqstr - - self._S = S - self.ts_at_bottom_level = ts_at_bottom_level - - self.ts_aggregated = HierarchicalTimeSeries.aggregate_ts( - ts_at_bottom_level=self.ts_at_bottom_level, - S=self._S, - ) - - self._ts_at_all_levels = pd.concat( - [self.ts_aggregated, self.ts_at_bottom_level], - axis=1, - ) - self._ts_at_all_levels.columns = list(range(self.num_ts)) - - @property - def freq(self): - return self._freq - - @property - def ts_at_all_levels(self): - return self._ts_at_all_levels - - @property - def S(self): - return self._S - - @property - def num_ts(self): - return self._S.shape[0] - - @property - def num_bottom_ts(self): - return self._S.shape[1] - - @staticmethod - def aggregate_ts( - ts_at_bottom_level: pd.DataFrame, - S: np.ndarray, - ) -> pd.DataFrame: - """ - Constructs aggregated time series according to the - summation/aggregation matrix `S`. - - Parameters - ---------- - ts_at_bottom_level - A single pandas dataframe whose columns are the time series - corresponding to the leaves of the hierarchy. - S - Summation or aggregation matrix whose ordering should be consistent - with the ordering of the columns of `ts_at_all_levels`. - In particular, the bottom `k x k` sub-matrix should be an identity - matrix, where `k` is the number of leaves of the hierarchy. - - Returns - ------- - A pandas dataframe consisting of aggregated time series - (at all non-leaf levels). - """ - num_ts, num_bottom_ts = S.shape - num_agg_ts = num_ts - num_bottom_ts - - assert ts_at_bottom_level.shape[1] == num_bottom_ts, ( - "Number of columns of the aggregation matrix `S` and " - "the dataframe `ts_at_bottom_level` should be same." - f"But shape of `S`: {S.shape} and shape of `ts_at_bottom_level`: " - f"{ts_at_bottom_level.shape}." - ) - - # Last `num_bottom_ts` rows contain the identity matrix. - assert (S[num_agg_ts:, :] == np.eye(num_bottom_ts)).all(), ( - f"The last {num_bottom_ts} rows of aggregation matrix `S`" - f" should contain Identity matrix." - ) - - # First `num_agg_ts` rows contain the aggregation information. - S_sum = S[:num_agg_ts, :] - - # Construct aggregated time series. - ts_aggregated = pd.concat( - { - f"agg_ts_{i}": ts_at_bottom_level.apply( - lambda row: np.dot(row, agg), - axis=1, - ) - for i, agg in enumerate(S_sum) - }, - axis=1, - ) - ts_aggregated.set_index(ts_at_bottom_level.index, inplace=True) - - return ts_aggregated - - def to_dataset( - self, - feat_dynamic_real: Optional[pd.DataFrame] = None, - ): - """ - Convert the hierarchical time series into - `gluonts.dataset.PandasDataset`. - - Note: Currently only dynamic real features are used by the hierarchical - model. However, the model internally creates a categorical feature - to distinguish between different time series of the hierarchy. - - Parameters - ---------- - feat_dynamic_real - A pandas dataframe containing dynamic features as columns. - Note that features of any (or all) time series in the hierarchy - can be passed here, since all time series are considered together - as a single multivariate time series. - Returns - ------- - PandasDataset - An instance of `PandasDataset`. - """ - future_length = 0 +from gluonts.dataset import Dataset - if feat_dynamic_real is not None: - assert ( - self.ts_at_all_levels.index[0] == feat_dynamic_real.index[0] - ), ( - "The staring time point of dynamic features should match " - "with that of the hierarchical time series. " - f"Start of `feat_dynamic_real`: " - f"{feat_dynamic_real.index[0]} and " - f"the start of hierarchical time series: " - f"{self.ts_at_all_levels.index[0]}." - ) - assert feat_dynamic_real.index.intersection( - self.ts_at_all_levels.index - ).equals(self.ts_at_all_levels.index), ( - "Dynamic features should be provided for all time " - "points where the target is defined. " - f"Index of `feat_dynamic_real`: {feat_dynamic_real.index}, \n" - f"Index of `ts_at_all_levels` of `hts`: " - f"{self.ts_at_all_levels.index}. \n " - "Check if the periods of these indices also match. \n" - ) +@dataclass +class HierarchicalDataset: + data: Dataset + S: np.ndarray - feat_dynamic_real.columns = [ - f"feat_dynamic_real_{col}" for col in feat_dynamic_real.columns - ] - future_length = len(feat_dynamic_real.index) - len( - self.ts_at_all_levels.index - ) - else: - feat_dynamic_real = pd.DataFrame() + def __iter__(self): + for entry in self.data: + entry = entry.copy() + entry["target"] = self.S @ np.array(entry["target"]) - pandas_ds = PandasDataset( - dataframes=pd.concat( - [self.ts_at_all_levels, feat_dynamic_real], - axis=1, - ), - target=list(self.ts_at_all_levels.columns), - feat_dynamic_real=list(feat_dynamic_real.columns), - future_length=future_length, - ) + yield entry - return pandas_ds + def __len__(self) -> int: + return len(self.data) diff --git a/test/dataset/test_hierarchical.py b/test/dataset/test_hierarchical.py deleted file mode 100644 index 0d74a9654f..0000000000 --- a/test/dataset/test_hierarchical.py +++ /dev/null @@ -1,154 +0,0 @@ -# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). -# You may not use this file except in compliance with the License. -# A copy of the License is located at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing -# permissions and limitations under the License. - - -import numpy as np -import pandas as pd -from toolz.itertoolz import first - -import pytest -from gluonts.dataset.hierarchical import HierarchicalTimeSeries - - -PERIODS = 24 -FREQ = "H" - - -def random_ts(num_ts: int, periods: int, freq: str): - index = pd.period_range(start="22-03-2020", periods=periods, freq=freq) - - return pd.concat( - [ - pd.Series(data=np.random.random(size=len(index)), index=index) - for _ in range(num_ts) - ], - axis=1, - ) - - -def test_three_level_hierarchy(): - # Simple three-level hierarchy containing 4 leaf nodes. - S = np.vstack(([[1, 1, 1, 1], [1, 1, 0, 0], [0, 0, 1, 1]], np.eye(4))) - - num_ts, num_bottom_ts = S.shape - ts_at_bottom_level = random_ts( - num_ts=num_bottom_ts, - periods=PERIODS, - freq=FREQ, - ) - - hts = HierarchicalTimeSeries(ts_at_bottom_level=ts_at_bottom_level, S=S) - - ts_at_all_levels = hts.ts_at_all_levels - assert (ts_at_all_levels.index == ts_at_bottom_level.index).all(), ( - "The index of dataframe `ts_at_all_levels` does not match " - "with that of `ts_at_bottom_level`:\n" - f"Index of `ts_at_bottom_level`: {ts_at_bottom_level.index}, \n " - f"Index of `ts_at_all_levels`: {ts_at_all_levels.index}." - ) - - assert ts_at_all_levels.shape == (PERIODS, num_ts), ( - "Hierarchical time series do not have the right shape. " - f"Expected: {(PERIODS, num_ts)}, " - f"Obtained: {ts_at_bottom_level.shape}!" - ) - - root_level = hts.ts_at_all_levels.iloc[:, 0] - root_level_expected = ts_at_bottom_level.sum(axis=1) - np.testing.assert_array_almost_equal( - root_level.values, - root_level_expected.values, - err_msg="Values of the time series at the root" - "level are not correctly computed.", - ) - - level_1 = hts.ts_at_all_levels.iloc[:, 1:3] - level_1_expected = pd.concat( - [ - ts_at_bottom_level.iloc[:, :2].sum(axis=1), - ts_at_bottom_level.iloc[:, 2:].sum(axis=1), - ], - axis=1, - ) - np.testing.assert_array_almost_equal( - level_1.values, - level_1_expected.values, - err_msg="Values of the time series at the first" - "aggregated level (after the root) are not " - "correctly computed.", - ) - - leaf_level = hts.ts_at_all_levels.iloc[:, 3:] - np.testing.assert_array_almost_equal( - ts_at_bottom_level.values, - leaf_level.values, - err_msg="Values of the time series at the bottom " - "level do not agree with the given inputs.", - ) - - -def get_random_hts(S: np.ndarray, periods: int, freq: str): - num_ts, num_bottom_ts = S.shape - ts_at_bottom_level = random_ts( - num_ts=num_bottom_ts, - periods=periods, - freq=freq, - ) - - hts = HierarchicalTimeSeries(ts_at_bottom_level=ts_at_bottom_level, S=S) - return hts - - -@pytest.mark.parametrize("mode", ["train", "inference", "fail"]) -def test_hts_to_dataset(mode: str): - S = np.vstack(([[1, 1, 1, 1], [1, 1, 0, 0], [0, 0, 1, 1]], np.eye(4))) - hts = get_random_hts(S=S, periods=PERIODS, freq=FREQ) - - num_bottom_ts = S.shape[1] - num_features = 10 - num_future_time_steps = { - "train": 0, - "inference": PERIODS // 2, - "fail": PERIODS // 2, - }[mode] - - features_df = random_ts( - num_ts=num_features, - periods=PERIODS + num_future_time_steps, - freq=FREQ, - ) - - if mode == "fail": - # Create a misalignment with the index of target time series. - features_df.index = features_df.index.shift(periods=-1) - - with pytest.raises(Exception): - ds = hts.to_dataset(feat_dynamic_real=features_df) - else: - ds = hts.to_dataset(feat_dynamic_real=features_df) - entry = first(ds) - - assert entry["start"] == features_df.index[0] - - if mode == "train": - entry["target"].shape == (num_bottom_ts, PERIODS) - entry["feat_dynamic_real"].shape == (num_features, PERIODS) - else: - entry["target"].shape == ( - num_bottom_ts, - PERIODS + num_future_time_steps, - ) - entry["feat_dynamic_real"].shape == ( - num_features, - PERIODS + num_future_time_steps, - ) diff --git a/test/mx/model/deepvar_hierarchical/test_train_prediction_with_hts.py b/test/mx/model/deepvar_hierarchical/test_train_prediction_with_hts.py index 22fc735217..8e9d538314 100644 --- a/test/mx/model/deepvar_hierarchical/test_train_prediction_with_hts.py +++ b/test/mx/model/deepvar_hierarchical/test_train_prediction_with_hts.py @@ -21,7 +21,7 @@ import pytest # First-party imports -from gluonts.dataset.hierarchical import HierarchicalTimeSeries +from gluonts.dataset.hierarchical import HierarchicalDataset from gluonts.mx.model.deepvar_hierarchical import DeepVARHierarchicalEstimator from gluonts.mx.trainer import Trainer @@ -29,69 +29,49 @@ NUM_BOTTOM_TS = 4 FREQ = "H" PERIODS = 168 * 2 -S = np.vstack(([[1, 1, 1, 1], [1, 1, 0, 0], [0, 0, 1, 1]], np.eye(4))) -PREDICTION_LENGTH = 24 - - -def random_ts(num_ts: int, periods: int, freq: str): - index = pd.period_range(start="22-03-2020", periods=periods, freq=freq) - - return pd.concat( - [ - pd.Series(data=np.random.random(size=len(index)), index=index) - for _ in range(num_ts) - ], - axis=1, +S = np.vstack( + ( + [1, 1, 1, 1], + [1, 1, 0, 0], + [0, 0, 1, 1], + np.identity(4), ) +) +PREDICTION_LENGTH = 24 @pytest.mark.parametrize( - "features_df", - [ - None, - random_ts( - num_ts=S.shape[0], periods=PERIODS + PREDICTION_LENGTH, freq=FREQ - ), - ], + "use_feat_dynamic_real", + [True, False], ) -def test_train_prediction(features_df: Optional[pd.DataFrame]): - if features_df is not None: - use_feat_dynamic_real = True - features_df_train = features_df.iloc[:-PREDICTION_LENGTH, :] - else: - use_feat_dynamic_real = False - features_df_train = None - - # HTS - ts_at_bottom_level = random_ts( - num_ts=NUM_BOTTOM_TS, - periods=PERIODS, - freq="H", - ) - hts = HierarchicalTimeSeries( - ts_at_bottom_level=ts_at_bottom_level, - S=S, - ) - - dataset = hts.to_dataset(feat_dynamic_real=features_df_train) - +def test_train_prediction(use_feat_dynamic_real: Optional[pd.DataFrame]): + entry = { + "start": pd.Period("22-03-2020"), + "target": np.random.random(size=(NUM_BOTTOM_TS, PERIODS)), + } + if use_feat_dynamic_real: + entry["feat_dynamic_real"] = np.random.random(size=(3, PERIODS)) + + dataset = HierarchicalDataset([entry], S=S) estimator = DeepVARHierarchicalEstimator( - freq=hts.freq, + freq=FREQ, prediction_length=PREDICTION_LENGTH, trainer=Trainer(epochs=1, num_batches_per_epoch=1, hybridize=False), - S=hts.S, + S=S, use_feat_dynamic_real=use_feat_dynamic_real, ) - predictor = estimator.train(dataset) - predictor_input = hts.to_dataset(feat_dynamic_real=features_df) - forecasts = list(predictor.predict(predictor_input)) + if use_feat_dynamic_real: + entry["feat_dynamic_real"] = np.random.random( + size=(3, PERIODS + PREDICTION_LENGTH) + ) + + dataset = HierarchicalDataset([entry], S=S) + forecasts = list(predictor.predict(dataset)) assert len(forecasts) == len(dataset) assert all( - [ - forecast.samples.shape == (100, PREDICTION_LENGTH, hts.num_ts) - for forecast in forecasts - ] + forecast.samples.shape == (100, PREDICTION_LENGTH, len(S)) + for forecast in forecasts ) From 90a2766a42d29eed103941b85246d28a037f19c2 Mon Sep 17 00:00:00 2001 From: Jasper Zschiegner Date: Fri, 26 May 2023 15:15:18 +0200 Subject: [PATCH 2/2] Fixup. --- src/gluonts/dataset/hierarchical.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/gluonts/dataset/hierarchical.py b/src/gluonts/dataset/hierarchical.py index 551bd1ae95..53c53326b7 100644 --- a/src/gluonts/dataset/hierarchical.py +++ b/src/gluonts/dataset/hierarchical.py @@ -12,7 +12,6 @@ # permissions and limitations under the License. from dataclasses import dataclass -from typing import Optional import numpy as np