Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
SampleWeighter,
Scaler,
Selector,
Shifter,
)
from openstef_models.transforms.postprocessing import ConfidenceIntervalApplicator, QuantileSorter
from openstef_models.transforms.time_domain import (
Expand Down Expand Up @@ -206,6 +207,11 @@ class ForecastingWorkflowConfig(BaseConfig): # PredictionJob
)

# Feature engineering
shifter: Shifter = Field(
default=Shifter(),
description="Optional transform to shift features to align averaging intervals. "
"Select which features to shift via the selection parameter.",
)
rolling_aggregate_features: list[AggregationFunction] = Field(
default=[],
description="If not None, rolling aggregate(s) of load will be used as features in the model.",
Expand Down Expand Up @@ -311,6 +317,7 @@ def create_forecasting_workflow(
),
CompletenessChecker(completeness_threshold=config.completeness_threshold),
]
feature_aligners = [config.shifter] if config.shifter.selection != FeatureSelection.NONE else []
feature_adders = [
LagsAdder(
history_available=config.predict_history,
Expand Down Expand Up @@ -361,6 +368,7 @@ def create_forecasting_workflow(
if config.model == "xgboost":
preprocessing = [
*checks,
*feature_aligners,
*feature_adders,
HolidayFeatureAdder(country_code=config.location.country_code),
DatetimeFeaturesAdder(onehot_encode=False),
Expand All @@ -382,6 +390,7 @@ def create_forecasting_workflow(
elif config.model == "lgbmlinear":
preprocessing = [
*checks,
*feature_aligners,
*feature_adders,
HolidayFeatureAdder(country_code=config.location.country_code),
DatetimeFeaturesAdder(onehot_encode=False),
Expand All @@ -396,6 +405,7 @@ def create_forecasting_workflow(
elif config.model == "lgbm":
preprocessing = [
*checks,
*feature_aligners,
*feature_adders,
HolidayFeatureAdder(country_code=config.location.country_code),
DatetimeFeaturesAdder(onehot_encode=False),
Expand All @@ -410,6 +420,7 @@ def create_forecasting_workflow(
elif config.model == "gblinear":
preprocessing = [
*checks,
*feature_aligners,
*feature_adders,
*feature_standardizers,
Imputer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from openstef_models.transforms.general.sample_weighter import SampleWeightConfig, SampleWeighter
from openstef_models.transforms.general.scaler import Scaler
from openstef_models.transforms.general.selector import Selector
from openstef_models.transforms.general.shifter import Shifter

__all__ = [
"Clipper",
Expand All @@ -31,4 +32,5 @@
"SampleWeighter",
"Scaler",
"Selector",
"Shifter",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <[email protected]>
#
# SPDX-License-Identifier: MPL-2.0

"""Transform for shifting features to align averaging intervals.

This module provides functionality to shift time series features that are averaged
over a different interval than the target variable, correcting the phase misalignment
by shifting and linearly interpolating back onto the original time grid.
"""

from datetime import timedelta
from typing import Any, cast, override

import numpy as np
import pandas as pd
from pydantic import Field, PrivateAttr

from openstef_core.base_model import BaseConfig
from openstef_core.datasets import TimeSeriesDataset
from openstef_core.transforms import TimeSeriesTransform
from openstef_models.utils.feature_selection import FeatureSelection


class Shifter(BaseConfig, TimeSeriesTransform):
"""Transform that shifts features to align their averaging interval with the target.

When source features are averaged over a different interval than the target variable,
their timestamps represent a different center point in time. This transform corrects
the phase misalignment by shifting the source features and linearly interpolating
back onto the original time grid.

The shift is computed as::

shift = source_averaging_period / 2 - target_averaging_period / 2

Timestamps are assumed to be at the end of the averaging interval.
For example, a timestamp of 12:00 with a 60-minute averaging period represents
the average over [11:00, 12:00], centered at 11:30. For instantaneous features
or target, use an averaging period of zero.

Example: Aligning hourly radiation with 15-minute load
Hourly radiation (source_averaging_period=60 min) has its center 30 min
before the timestamp, while 15-minute load (target_averaging_period=15 min)
has its center 7.5 min before the timestamp. The required backward shift
for radiation is 22.5 minutes.

>>> import pandas as pd
>>> from datetime import timedelta
>>> from openstef_core.datasets import TimeSeriesDataset
>>> from openstef_models.transforms.general import Shifter
>>> from openstef_models.utils.feature_selection import FeatureSelection
>>>
>>> # Hourly radiation interpolated onto a 15-minute grid
>>> index = pd.date_range('2025-01-01', periods=8, freq='15min')
>>> data = pd.DataFrame({
... 'load': range(8),
... 'radiation': [200, 220, 240, 260, 280, 300, 320, 340],
... }, index=index)
>>> dataset = TimeSeriesDataset(data, timedelta(minutes=15))
>>>
>>> shifter = Shifter(
... selection=FeatureSelection(include=['radiation']),
... source_averaging_period=timedelta(minutes=60),
... target_averaging_period=timedelta(minutes=15),
... fill_edges=True,
... )
>>> result = shifter.transform(dataset)
>>> result.data['radiation'].tolist()
[230.0, 250.0, 270.0, 290.0, 310.0, 330.0, 340.0, 340.0]
"""

selection: FeatureSelection = Field(
default=FeatureSelection.NONE,
description="Features to shift.",
)
source_averaging_period: timedelta = Field(
default=timedelta(minutes=60),
description="Averaging period of the source features.",
)
target_averaging_period: timedelta = Field(
default=timedelta(minutes=15),
description="Averaging period of the target variable.",
)
fill_edges: bool = Field(
default=False,
description=(
"Whether to fill NaN at the edges introduced by the shift "
"with the original (un-shifted) boundary value of each feature."
),
)

_shift: timedelta = PrivateAttr()

@override
def model_post_init(self, context: Any) -> None:
self._shift = self.source_averaging_period / 2 - self.target_averaging_period / 2

@override
def transform(self, data: TimeSeriesDataset) -> TimeSeriesDataset:
if self._shift == timedelta(0):
return data

features = self.selection.resolve(data.feature_names)
transformed_data = data.data.copy()

original_index = cast(pd.DatetimeIndex, data.data.index)
shifted_index = original_index - self._shift
combined_index = cast(pd.DatetimeIndex, original_index.union(shifted_index))

feature_data = transformed_data[features]

# Place values on the shifted time axis, interpolate back onto the original grid
shifted_df = feature_data.set_axis(shifted_index) # pyright: ignore[reportUnknownMemberType]
realigned = shifted_df.reindex(combined_index).interpolate(method="time").reindex(original_index)

# Restore pre-existing NaN at their shifted positions (nearest-neighbor mapping)
nan_mask_shifted = feature_data.isna().set_axis(shifted_index) # pyright: ignore[reportUnknownMemberType]
realigned[nan_mask_shifted.reindex(original_index, method="nearest")] = np.nan

# Handle timestamps outside the range covered by the shifted data
outside_mask = (original_index < shifted_index.min()) | (original_index > shifted_index.max()) # pyright: ignore[reportUnknownMemberType]
if self.fill_edges:
edge = feature_data.iloc[[-1]] if self._shift > timedelta(0) else feature_data.iloc[[0]]
realigned.loc[outside_mask] = edge.to_numpy()
else:
realigned.loc[outside_mask] = np.nan

transformed_data[features] = realigned

return data.copy_with(data=transformed_data, is_sorted=True)

@override
def features_added(self) -> list[str]:
return []
174 changes: 174 additions & 0 deletions packages/openstef-models/tests/unit/transforms/general/test_shifter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <[email protected]>
#
# SPDX-License-Identifier: MPL-2.0

from datetime import timedelta

import numpy as np
import pandas as pd
import pytest

from openstef_core.datasets import TimeSeriesDataset
from openstef_models.transforms.general import Shifter
from openstef_models.utils.feature_selection import FeatureSelection


@pytest.fixture
def sample_dataset() -> TimeSeriesDataset:
"""Sample dataset on a 15-minute grid."""
return TimeSeriesDataset(
data=pd.DataFrame(
{
"load": list(range(8)),
"radiation": [200.0, 220.0, 240.0, 260.0, 280.0, 300.0, 320.0, 340.0],
},
index=pd.date_range("2025-01-01", periods=8, freq="15min"),
),
sample_interval=timedelta(minutes=15),
)


@pytest.mark.parametrize(
("source_averaging_period", "target_averaging_period", "expected_radiation"),
[
pytest.param(
timedelta(minutes=60),
timedelta(minutes=15),
[230.0, 250.0, 270.0, 290.0, 310.0, 330.0, np.nan, np.nan],
id="60min_to_15min",
),
pytest.param(
timedelta(minutes=30),
timedelta(minutes=15),
[210.0, 230.0, 250.0, 270.0, 290.0, 310.0, 330.0, np.nan],
id="30min_to_15min",
),
pytest.param(
timedelta(minutes=60),
timedelta(0),
[240.0, 260.0, 280.0, 300.0, 320.0, 340.0, np.nan, np.nan],
id="60min_to_instantaneous",
),
],
)
def test_shifter__shift_and_interpolate(
sample_dataset: TimeSeriesDataset,
source_averaging_period: timedelta,
target_averaging_period: timedelta,
expected_radiation: list[float],
):
"""Test that features are shifted and interpolated correctly for different intervals."""
# Arrange
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
source_averaging_period=source_averaging_period,
target_averaging_period=target_averaging_period,
)

# Act
result = shifter.transform(sample_dataset)

# Assert
expected = pd.Series(expected_radiation, index=sample_dataset.index, name="radiation")
pd.testing.assert_series_equal(result.data["radiation"], expected)
# Unselected feature should be unchanged
assert result.data["load"].tolist() == list(range(8))


def test_shifter__no_shift_when_intervals_equal(sample_dataset: TimeSeriesDataset):
"""Test that the same dataset object is returned when no shift is needed."""
# Arrange
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
source_averaging_period=timedelta(minutes=15),
target_averaging_period=timedelta(minutes=15),
)

# Act
result = shifter.transform(sample_dataset)

# Assert
assert result is sample_dataset


@pytest.mark.parametrize(
("fill_edges", "expected_trailing"),
[
pytest.param(False, [np.nan, np.nan], id="no_fill_leaves_nan"),
pytest.param(True, [340.0, 340.0], id="fill_uses_last_original_value"),
],
)
def test_shifter__fill_edges(
sample_dataset: TimeSeriesDataset,
fill_edges: bool,
expected_trailing: list[float],
):
"""Test edge handling with fill_edges."""
# Arrange
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
fill_edges=fill_edges,
)

# Act
result = shifter.transform(sample_dataset)

# Assert — first 6 values are always the shifted+interpolated result
assert result.data["radiation"].iloc[:6].tolist() == [230.0, 250.0, 270.0, 290.0, 310.0, 330.0]
# Trailing 2 values depend on fill_edges
expected = pd.Series(expected_trailing, name="radiation", index=sample_dataset.index[-2:])
pd.testing.assert_series_equal(result.data["radiation"].iloc[-2:], expected)


def test_shifter__fill_edges_leading_nan(sample_dataset: TimeSeriesDataset):
"""Test fill_edges handles leading NaN when source interval is smaller than target."""
# Arrange — negative shift (source < target) produces leading NaN
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
source_averaging_period=timedelta(minutes=15),
target_averaging_period=timedelta(minutes=60),
fill_edges=True,
)

# Act
result = shifter.transform(sample_dataset)

# Assert — leading NaN should be filled with the first original value (200.0)
expected = pd.Series(
[200.0, 200.0, 210.0, 230.0, 250.0, 270.0, 290.0, 310.0],
index=sample_dataset.index,
name="radiation",
)
pd.testing.assert_series_equal(result.data["radiation"], expected)


def test_shifter__preserves_preexisting_nan():
"""Test that pre-existing NaN values are shifted rather than imputed."""
# Arrange
data = pd.DataFrame(
{
"load": list(range(8)),
"radiation": [200.0, 220.0, np.nan, 260.0, 280.0, np.nan, np.nan, 340.0],
},
index=pd.date_range("2025-01-01", periods=8, freq="15min"),
)
dataset = TimeSeriesDataset(data, timedelta(minutes=15))
shifter = Shifter(
selection=FeatureSelection(include={"radiation"}),
source_averaging_period=timedelta(minutes=60),
target_averaging_period=timedelta(minutes=15),
fill_edges=True,
)

# Act
result = shifter.transform(dataset)

# Assert
# Each grid point inherits the NaN status of its nearest shifted data point.
# For equidistant cases, pandas nearest picks the later (right) neighbor.
expected = pd.Series(
[np.nan, 250.0, 270.0, np.nan, np.nan, 330.0, 340.0, 340.0],
index=dataset.index,
name="radiation",
)
pd.testing.assert_series_equal(result.data["radiation"], expected)
Loading