Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
SampleWeighter,
Scaler,
Selector,
Shifter,
)
from openstef_models.transforms.postprocessing import ConfidenceIntervalApplicator, QuantileSorter
from openstef_models.transforms.time_domain import (
Expand Down Expand Up @@ -206,6 +207,11 @@ class ForecastingWorkflowConfig(BaseConfig): # PredictionJob
)

# Feature engineering
shifters: list[Shifter] = Field(
default=[],
description="List of feature shifts to align aggregation intervals. "
"Each Shifter can target different features with different aggregation periods.",
)
rolling_aggregate_features: list[AggregationFunction] = Field(
default=[],
description="If not None, rolling aggregate(s) of load will be used as features in the model.",
Expand Down Expand Up @@ -311,6 +317,7 @@ def create_forecasting_workflow(
),
CompletenessChecker(completeness_threshold=config.completeness_threshold),
]
feature_aligners = list(config.shifters)
feature_adders = [
LagsAdder(
history_available=config.predict_history,
Expand Down Expand Up @@ -361,6 +368,7 @@ def create_forecasting_workflow(
if config.model == "xgboost":
preprocessing = [
*checks,
*feature_aligners,
*feature_adders,
HolidayFeatureAdder(country_code=config.location.country_code),
DatetimeFeaturesAdder(onehot_encode=False),
Expand All @@ -382,6 +390,7 @@ def create_forecasting_workflow(
elif config.model == "lgbmlinear":
preprocessing = [
*checks,
*feature_aligners,
*feature_adders,
HolidayFeatureAdder(country_code=config.location.country_code),
DatetimeFeaturesAdder(onehot_encode=False),
Expand All @@ -396,6 +405,7 @@ def create_forecasting_workflow(
elif config.model == "lgbm":
preprocessing = [
*checks,
*feature_aligners,
*feature_adders,
HolidayFeatureAdder(country_code=config.location.country_code),
DatetimeFeaturesAdder(onehot_encode=False),
Expand All @@ -410,6 +420,7 @@ def create_forecasting_workflow(
elif config.model == "gblinear":
preprocessing = [
*checks,
*feature_aligners,
*feature_adders,
*feature_standardizers,
Imputer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from openstef_models.transforms.general.sample_weighter import SampleWeightConfig, SampleWeighter
from openstef_models.transforms.general.scaler import Scaler
from openstef_models.transforms.general.selector import Selector
from openstef_models.transforms.general.shifter import Shifter

__all__ = [
"Clipper",
Expand All @@ -31,4 +32,5 @@
"SampleWeighter",
"Scaler",
"Selector",
"Shifter",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <[email protected]>
#
# SPDX-License-Identifier: MPL-2.0

"""Transform for shifting features to align aggregation intervals.

This module provides functionality to shift time series features that are aggregated
over a different interval than the target variable, correcting the phase misalignment
by shifting and linearly interpolating back onto the original time grid.
"""

from datetime import timedelta
from typing import Any, override

import numpy as np
from pydantic import Field, PrivateAttr

from openstef_core.base_model import BaseConfig
from openstef_core.datasets import TimeSeriesDataset
from openstef_core.transforms import TimeSeriesTransform
from openstef_models.utils.feature_selection import FeatureSelection


class Shifter(BaseConfig, TimeSeriesTransform):
"""Transform that shifts features to align their aggregation interval with the target.

When source features are aggregated over a different interval than the target variable,
their timestamps represent a different center point in time. This transform corrects
the phase misalignment by shifting the source features and linearly interpolating
back onto the original time grid.

The shift is computed as::

shift = source_aggregation_period / 2 - target_aggregation_period / 2

Timestamps are assumed to be at the end of the aggregation interval.
For example, a timestamp of 12:00 with a 60-minute aggregation period represents
the average over [11:00, 12:00], centered at 11:30. For instantaneous features
or target, use an aggregation period of zero.

Example: Aligning hourly radiation with 15-minute load
Hourly radiation (source_aggregation_period=60 min) has its center 30 min
before the timestamp, while 15-minute load (target_aggregation_period=15 min)
has its center 7.5 min before the timestamp. The required backward shift
for radiation is 22.5 minutes.

>>> import pandas as pd
>>> from datetime import timedelta
>>> from openstef_core.datasets import TimeSeriesDataset
>>> from openstef_models.transforms.general import Shifter
>>> from openstef_models.utils.feature_selection import FeatureSelection
>>>
>>> # Hourly radiation interpolated onto a 15-minute grid
>>> index = pd.date_range('2025-01-01', periods=8, freq='15min')
>>> data = pd.DataFrame({
... 'load': range(8),
... 'radiation': [200, 220, 240, 260, 280, 300, 320, 340],
... }, index=index)
>>> dataset = TimeSeriesDataset(data, timedelta(minutes=15))
>>>
>>> shifter = Shifter(
... selection=FeatureSelection(include=['radiation']),
... source_aggregation_period=timedelta(minutes=60),
... target_aggregation_period=timedelta(minutes=15),
... fill_edges=True,
... )
>>> result = shifter.transform(dataset)
>>> result.data['radiation'].tolist()
[230.0, 250.0, 270.0, 290.0, 310.0, 330.0, 340.0, 340.0]
"""

selection: FeatureSelection = Field(
default=FeatureSelection.NONE,
description="Features to shift.",
)
source_aggregation_period: timedelta = Field(
default=timedelta(minutes=60),
description="Aggregation period of the source features.",
)
target_aggregation_period: timedelta = Field(
default=timedelta(minutes=15),
description="Aggregation period of the target variable.",
)
fill_edges: bool = Field(
default=False,
description=(
"Whether to fill NaN at the edges introduced by the shift "
"with the original (un-shifted) boundary value of each feature."
),
)

_shift: timedelta = PrivateAttr()

@override
def model_post_init(self, context: Any) -> None:
self._shift = self.source_aggregation_period / 2 - self.target_aggregation_period / 2

@override
def transform(self, data: TimeSeriesDataset) -> TimeSeriesDataset:
if self._shift == timedelta(0):
return data

features = self.selection.resolve(data.feature_names)
transformed_data = data.data.copy()

original_index = data.index
shifted_index = original_index - self._shift
combined_index = original_index.union(shifted_index)

feature_data = transformed_data[features]

# Place values on the shifted time axis, interpolate back onto the original grid
shifted_df = feature_data.set_axis(shifted_index) # pyright: ignore[reportUnknownMemberType]
combined_df = shifted_df.reindex(combined_index)

limit_area = None if self.fill_edges else "inside"
realigned = combined_df.interpolate(method="time", limit_direction="both", limit_area=limit_area)
realigned = realigned.reindex(original_index)

# Restore pre-existing NaN at their shifted positions (nearest-neighbor mapping)
nan_mask_shifted = feature_data.isna().set_axis(shifted_index) # pyright: ignore[reportUnknownMemberType]
realigned[nan_mask_shifted.reindex(original_index, method="nearest")] = np.nan

transformed_data[features] = realigned

return data.copy_with(data=transformed_data, is_sorted=True)

@override
def features_added(self) -> list[str]:
return []
174 changes: 174 additions & 0 deletions packages/openstef-models/tests/unit/transforms/general/test_shifter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <[email protected]>
#
# SPDX-License-Identifier: MPL-2.0

from datetime import timedelta

import numpy as np
import pandas as pd
import pytest

from openstef_core.datasets import TimeSeriesDataset
from openstef_models.transforms.general import Shifter
from openstef_models.utils.feature_selection import FeatureSelection


@pytest.fixture
def sample_dataset() -> TimeSeriesDataset:
"""Sample dataset on a 15-minute grid."""
return TimeSeriesDataset(
data=pd.DataFrame(
{
"load": list(range(8)),
"radiation": [200.0, 220.0, 240.0, 260.0, 280.0, 300.0, 320.0, 340.0],
},
index=pd.date_range("2025-01-01", periods=8, freq="15min"),
),
sample_interval=timedelta(minutes=15),
)


@pytest.mark.parametrize(
("source_aggregation_period", "target_aggregation_period", "expected_radiation"),
[
pytest.param(
timedelta(minutes=60),
timedelta(minutes=15),
[230.0, 250.0, 270.0, 290.0, 310.0, 330.0, np.nan, np.nan],
id="60min_to_15min",
),
pytest.param(
timedelta(minutes=30),
timedelta(minutes=15),
[210.0, 230.0, 250.0, 270.0, 290.0, 310.0, 330.0, np.nan],
id="30min_to_15min",
),
pytest.param(
timedelta(minutes=60),
timedelta(0),
[240.0, 260.0, 280.0, 300.0, 320.0, 340.0, np.nan, np.nan],
id="60min_to_instantaneous",
),
],
)
def test_shifter__shift_and_interpolate(
sample_dataset: TimeSeriesDataset,
source_aggregation_period: timedelta,
target_aggregation_period: timedelta,
expected_radiation: list[float],
):
"""Test that features are shifted and interpolated correctly for different intervals."""
# Arrange
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
source_aggregation_period=source_aggregation_period,
target_aggregation_period=target_aggregation_period,
)

# Act
result = shifter.transform(sample_dataset)

# Assert
expected = pd.Series(expected_radiation, index=sample_dataset.index, name="radiation")
pd.testing.assert_series_equal(result.data["radiation"], expected)
# Unselected feature should be unchanged
assert result.data["load"].tolist() == list(range(8))


def test_shifter__no_shift_when_intervals_equal(sample_dataset: TimeSeriesDataset):
"""Test that the same dataset object is returned when no shift is needed."""
# Arrange
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
source_aggregation_period=timedelta(minutes=15),
target_aggregation_period=timedelta(minutes=15),
)

# Act
result = shifter.transform(sample_dataset)

# Assert
assert result is sample_dataset


@pytest.mark.parametrize(
("fill_edges", "expected_trailing"),
[
pytest.param(False, [np.nan, np.nan], id="no_fill_leaves_nan"),
pytest.param(True, [340.0, 340.0], id="fill_uses_last_original_value"),
],
)
def test_shifter__fill_edges(
sample_dataset: TimeSeriesDataset,
fill_edges: bool,
expected_trailing: list[float],
):
"""Test edge handling with fill_edges."""
# Arrange
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
fill_edges=fill_edges,
)

# Act
result = shifter.transform(sample_dataset)

# Assert — first 6 values are always the shifted+interpolated result
assert result.data["radiation"].iloc[:6].tolist() == [230.0, 250.0, 270.0, 290.0, 310.0, 330.0]
# Trailing 2 values depend on fill_edges
expected = pd.Series(expected_trailing, name="radiation", index=sample_dataset.index[-2:])
pd.testing.assert_series_equal(result.data["radiation"].iloc[-2:], expected)


def test_shifter__fill_edges_leading_nan(sample_dataset: TimeSeriesDataset):
"""Test fill_edges handles leading NaN when source interval is smaller than target."""
# Arrange — negative shift (source < target) produces leading NaN
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
source_aggregation_period=timedelta(minutes=15),
target_aggregation_period=timedelta(minutes=60),
fill_edges=True,
)

# Act
result = shifter.transform(sample_dataset)

# Assert — leading NaN should be filled with the first original value (200.0)
expected = pd.Series(
[200.0, 200.0, 210.0, 230.0, 250.0, 270.0, 290.0, 310.0],
index=sample_dataset.index,
name="radiation",
)
pd.testing.assert_series_equal(result.data["radiation"], expected)


def test_shifter__preserves_preexisting_nan():
"""Test that pre-existing NaN values are shifted rather than imputed."""
# Arrange
data = pd.DataFrame(
{
"load": list(range(8)),
"radiation": [200.0, 220.0, np.nan, 260.0, 280.0, np.nan, np.nan, 340.0],
},
index=pd.date_range("2025-01-01", periods=8, freq="15min"),
)
dataset = TimeSeriesDataset(data, timedelta(minutes=15))
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
source_aggregation_period=timedelta(minutes=60),
target_aggregation_period=timedelta(minutes=15),
fill_edges=True,
)

# Act
result = shifter.transform(dataset)

# Assert
# Each grid point inherits the NaN status of its nearest shifted data point.
# For equidistant cases, pandas nearest picks the later (right) neighbor.
expected = pd.Series(
[np.nan, 250.0, 270.0, np.nan, np.nan, 330.0, 340.0, 340.0],
index=dataset.index,
name="radiation",
)
pd.testing.assert_series_equal(result.data["radiation"], expected)
Loading