diff --git a/examples/benchmarks/custom_benchmark/README.md b/examples/benchmarks/custom_benchmark/README.md new file mode 100644 index 000000000..54db5a672 --- /dev/null +++ b/examples/benchmarks/custom_benchmark/README.md @@ -0,0 +1,142 @@ + + +# Custom Benchmark Example + +End-to-end examples for running and customizing OpenSTEF **BEAM** (Backtesting, Evaluation, Analysis, Metrics) benchmarks. + +## What is BEAM? + +BEAM replays historical data day by day, trains your model, makes forecasts, and scores them -- all without data leakage. It works with any model that implements the `BacktestForecasterMixin` interface. + +## Files + +| File | What it does | +|---|---| +| `example_baseline.py` | **Start here.** A minimal forecaster that predicts the median of recent history. Shows the `BacktestForecasterMixin` interface (`config`, `quantiles`, `fit`, `predict`). | +| `example_benchmark.py` | Defines a custom benchmark: target provider (where data lives), metrics, and pipeline assembly. Extends `SimpleTargetProvider` directly -- adapt this when you have your own data layout. | +| `run_liander2024_benchmark.py` | Runs the example baseline + GBLinear on the built-in **Liander 2024** dataset (auto-downloaded from HuggingFace). Good starting point if you just want to try things out. | +| `run_benchmark.py` | Same as above but uses the custom benchmark pipeline from `example_benchmark.py`. | +| `evaluate_existing_forecasts.py` | **Bring your own forecasts.** Points the pipeline at pre-existing prediction parquets and runs only evaluation + analysis (no backtesting). | +| `compare_liander2024_results.py` | Compare results from multiple runs on the **Liander 2024** dataset. Auto-detects which targets are available in all runs. | +| `compare_custom_results.py` | Compare results from multiple runs on the **custom** benchmark. Same auto-detection as above. | + +## Quick Start + +```bash +# 1. Clone the repo +git clone git@github.com:OpenSTEF/openstef.git -b "release/v4.0.0" +cd openstef + +# 2. Install all packages (requires uv: https://docs.astral.sh/uv/) +uv sync --all-extras --all-groups --all-packages +``` + +### Run the Liander 2024 benchmark + +Uses the built-in Liander 2024 dataset (auto-downloaded from HuggingFace). Runs the example baseline and GBLinear on all target categories. + +```bash +uv run python -m examples.benchmarks.custom_benchmark.run_liander2024_benchmark +``` + +### Run the custom benchmark + +Uses the custom target provider from `example_benchmark.py` with your own pipeline config. Runs on `solar_park` targets by default. + +```bash +uv run python -m examples.benchmarks.custom_benchmark.run_benchmark +``` + +### Evaluate pre-existing forecasts (no backtesting) + +If you already have predictions from your own model or external system, you can skip backtesting entirely. Place your forecast parquets in the expected directory layout and run only evaluation + analysis. + +#### Required directory layout + +``` +benchmark_results/MyForecasts/ +└── backtest/ + └── / # e.g. "solar_park" + └── / # e.g. "Within 15 kilometers of Opmeer_normalized" + └── predictions.parquet +``` + +`group_name` and `target_name` must match the values from your targets YAML. You can list them: + +```bash +uv run python -c " +from examples.benchmarks.custom_benchmark.example_benchmark import create_custom_benchmark_runner +for t in create_custom_benchmark_runner().target_provider.get_targets(['solar_park']): + print(t.group_name, '/', t.name) +" +``` + +#### Required parquet format + +Each `predictions.parquet` must have: + +| Column | Type | Description | +|---|---|---| +| *(index)* `timestamp` | `DatetimeIndex` | When each prediction is valid for. 15-min intervals, tz-naive UTC. | +| `available_at` | `datetime64` | When the prediction was generated (enables D-1 / lead-time filtering). | +| `quantile_P05` | `float` | 5th percentile prediction. | +| `quantile_P50` | `float` | Median prediction (**required**). | +| `quantile_P95` | `float` | 95th percentile prediction. | +| ... | `float` | One column per quantile, named with `Quantile(x).format()`. | + +Example rows: + +``` +timestamp (index) available_at quantile_P05 quantile_P50 quantile_P95 +2023-01-15 12:00:00 2023-01-14 06:00:00 0.5 1.2 2.0 +2023-01-15 12:15:00 2023-01-14 06:00:00 0.6 1.3 2.1 +``` + +#### Run + +```bash +uv run python -m examples.benchmarks.custom_benchmark.evaluate_existing_forecasts +``` + +See `evaluate_existing_forecasts.py` for the full script. + +Results are written to `./benchmark_results/`. Each model gets its own subfolder with backtest predictions, evaluation scores, and analysis plots. + +### Compare results across runs + +After running at least two models, generate side-by-side comparison plots (global, per-group, per-target). The scripts automatically detect which targets are available in all runs. + +```bash +# Compare on the Liander 2024 dataset +uv run python -m examples.benchmarks.custom_benchmark.compare_liander2024_results + +# Compare on the custom benchmark +uv run python -m examples.benchmarks.custom_benchmark.compare_custom_results +``` + +Comparison output (HTML plots) is saved to `./benchmark_results_comparison/`. + +## Creating Your Own + +### 1. Write a forecaster + +Copy `example_baseline.py` and implement two methods: + +- **`fit(data)`** -- called periodically with recent history. Train your model here. +- **`predict(data)`** -- called every few hours. Return a `TimeSeriesDataset` with a `"load"` column and one column per quantile (e.g. `"quantile_P05"`, `"quantile_P50"`). + +The `data` argument is a `RestrictedHorizonVersionedTimeSeries` -- it enforces no-lookahead by only exposing data available at `data.horizon`. Use `data.get_window(start, end, available_before)` to retrieve slices. + +### 2. Define a benchmark (optional) + +Copy `example_benchmark.py` if you want to use **your own data**. The key class is `SimpleTargetProvider` -- override `_get_measurements_path_for_target()` and `_get_weather_path_for_target()` to point to your parquet files. + +If you're fine with the Liander 2024 dataset, skip this step and use `create_liander2024_benchmark_runner()` directly. + +### 3. Write a runner + +Copy `run_benchmark.py`. Register your models as forecaster factories and call `pipeline.run()`. diff --git a/examples/benchmarks/custom_benchmark/compare_custom_results.py b/examples/benchmarks/custom_benchmark/compare_custom_results.py new file mode 100644 index 000000000..692ce9b79 --- /dev/null +++ b/examples/benchmarks/custom_benchmark/compare_custom_results.py @@ -0,0 +1,48 @@ +"""Compare benchmark results from different runs on the custom benchmark. + +Usage: + 1. First run at least two models with run_benchmark.py + (e.g. ExampleBaseline and GBLinear). + 2. Then run this script to generate side-by-side comparison plots. + +Output is saved to ./benchmark_results_comparison/custom/. +""" + +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +from pathlib import Path +from typing import cast + +from examples.benchmarks.custom_benchmark.example_benchmark import ANALYSIS_CONFIG, create_custom_benchmark_runner +from openstef_beam.analysis.models import RunName +from openstef_beam.benchmarking import BenchmarkComparisonPipeline, LocalBenchmarkStorage +from openstef_beam.benchmarking.storage import BenchmarkStorage + +# One storage per run — keys are human-readable labels shown in comparison plots. +run_storages: dict[RunName, BenchmarkStorage] = { + "ExampleBaseline": LocalBenchmarkStorage(base_path=Path("./benchmark_results/ExampleBaseline")), + "GBLinear": LocalBenchmarkStorage(base_path=Path("./benchmark_results/GBLinear")), +} + +# Check that results exist. +for name, storage in run_storages.items(): + base_path = cast(LocalBenchmarkStorage, storage).base_path + if not base_path.exists(): + msg = f"Benchmark directory not found for '{name}': {base_path}. Run the benchmarks first." + raise FileNotFoundError(msg) + +# Reuse the custom target provider. +OUTPUT_PATH = Path("./benchmark_results_comparison/custom") +target_provider = create_custom_benchmark_runner( + storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH), +).target_provider + +# Run the comparison — generates global, group, and per-target HTML plots. +comparison = BenchmarkComparisonPipeline( + analysis_config=ANALYSIS_CONFIG, + storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH), + target_provider=target_provider, +) +comparison.run(run_data=run_storages, filter_args=["solar_park"]) diff --git a/examples/benchmarks/custom_benchmark/compare_liander2024_results.py b/examples/benchmarks/custom_benchmark/compare_liander2024_results.py new file mode 100644 index 000000000..f0e0ea725 --- /dev/null +++ b/examples/benchmarks/custom_benchmark/compare_liander2024_results.py @@ -0,0 +1,49 @@ +"""Compare benchmark results from different runs on the Liander 2024 dataset. + +Usage: + 1. First run at least two models with run_liander2024_benchmark.py + (e.g. ExampleBaseline and GBLinear). + 2. Then run this script to generate side-by-side comparison plots. + +Output is saved to ./benchmark_results_comparison/liander2024/. +""" + +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +from pathlib import Path +from typing import cast + +from openstef_beam.analysis.models import RunName +from openstef_beam.benchmarking import BenchmarkComparisonPipeline, LocalBenchmarkStorage +from openstef_beam.benchmarking.benchmarks import create_liander2024_benchmark_runner +from openstef_beam.benchmarking.benchmarks.liander2024 import LIANDER2024_ANALYSIS_CONFIG +from openstef_beam.benchmarking.storage import BenchmarkStorage + +# One storage per run — keys are human-readable labels shown in comparison plots. +run_storages: dict[RunName, BenchmarkStorage] = { + "ExampleBaseline": LocalBenchmarkStorage(base_path=Path("./benchmark_results/ExampleBaseline")), + "GBLinear": LocalBenchmarkStorage(base_path=Path("./benchmark_results/GBLinear")), +} + +# Check that results exist. +for name, storage in run_storages.items(): + base_path = cast(LocalBenchmarkStorage, storage).base_path + if not base_path.exists(): + msg = f"Benchmark directory not found for '{name}': {base_path}. Run the benchmarks first." + raise FileNotFoundError(msg) + +# Reuse the Liander 2024 target provider. +OUTPUT_PATH = Path("./benchmark_results_comparison/liander2024") +target_provider = create_liander2024_benchmark_runner( + storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH), +).target_provider + +# Run the comparison — generates global, group, and per-target HTML plots. +comparison = BenchmarkComparisonPipeline( + analysis_config=LIANDER2024_ANALYSIS_CONFIG, + storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH), + target_provider=target_provider, +) +comparison.run(run_data=run_storages) diff --git a/examples/benchmarks/custom_benchmark/evaluate_existing_forecasts.py b/examples/benchmarks/custom_benchmark/evaluate_existing_forecasts.py new file mode 100644 index 000000000..c1a4dbaeb --- /dev/null +++ b/examples/benchmarks/custom_benchmark/evaluate_existing_forecasts.py @@ -0,0 +1,104 @@ +"""Evaluate pre-existing forecasts without running backtesting. + +If you already have forecast predictions (e.g. from your own model or an external +system), you can point the benchmark pipeline at them and run only the evaluation +and analysis steps. + +How it works: + 1. Place your prediction parquet files in the expected directory layout (see below). + 2. Run this script — the pipeline detects existing backtest output and + automatically skips to evaluation + analysis. + +Expected directory layout:: + + benchmark_results/MyForecasts/ + └── backtest/ + └── / # e.g. "solar_park" + └── / # e.g. "Within 15 kilometers of Opmeer_normalized" + └── predictions.parquet + +Expected parquet format:: + + Index: pd.DatetimeIndex (name="timestamp", tz-naive UTC, 15-min intervals) + Columns: + - "available_at" (datetime) — when the prediction was generated + - "quantile_P05" (float) — 5th percentile prediction + - "quantile_P50" (float) — median prediction (REQUIRED) + - "quantile_P95" (float) — 95th percentile prediction + - ...one column per quantile, named with Quantile(x).format() + +Example row:: + + timestamp (index) available_at quantile_P05 quantile_P50 quantile_P95 + 2023-01-15 12:00:00 2023-01-14 06:00:00 0.5 1.2 2.0 + +You can list the expected target names and group names by checking the targets.yaml +in your dataset, or by running:: + + runner = create_custom_benchmark_runner() + for t in runner.target_provider.get_targets(["solar_park"]): + print(t.group_name, t.name) + +The pipeline still needs a "forecaster factory" to know which quantiles were used, +but fit() and predict() are never called. We use DummyForecaster for this. +""" + +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +import logging +import multiprocessing +import os +from pathlib import Path + +from examples.benchmarks.custom_benchmark.example_benchmark import create_custom_benchmark_runner +from openstef_beam.backtesting.backtest_forecaster import DummyForecaster +from openstef_beam.benchmarking import BenchmarkContext, BenchmarkTarget, LocalBenchmarkStorage +from openstef_core.types import Q + +os.environ["OMP_NUM_THREADS"] = "1" +os.environ["OPENBLAS_NUM_THREADS"] = "1" +os.environ["MKL_NUM_THREADS"] = "1" + +_logger = logging.getLogger(__name__) + +logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s] %(message)s") + +# Path to the folder that contains the backtest/ directory with your parquets. +OUTPUT_PATH = Path("./benchmark_results/MyForecasts") +N_PROCESSES = multiprocessing.cpu_count() + +# Quantiles your forecasts were generated for (must include 0.5 = median). +# Adjust this list to match whatever quantiles are in your parquet columns. +PREDICTION_QUANTILES = [Q(0.05), Q(0.1), Q(0.3), Q(0.5), Q(0.7), Q(0.9), Q(0.95)] + + +def stub_factory(_context: BenchmarkContext, _target: BenchmarkTarget) -> DummyForecaster: + """Factory that returns a DummyForecaster (backtesting is skipped). + + DummyForecaster provides quantile info to the pipeline but never runs + fit() or predict() since backtest output already exists on disk. + + Returns: + DummyForecaster with the configured quantiles. + """ + return DummyForecaster(predict_quantiles=PREDICTION_QUANTILES) + + +if __name__ == "__main__": + # Point the storage at your results folder. + # The pipeline reads parquets from: + # OUTPUT_PATH / backtest / / / predictions.parquet + storage = LocalBenchmarkStorage(base_path=OUTPUT_PATH) + + runner = create_custom_benchmark_runner(storage=storage) + + # Run the pipeline — backtesting is auto-skipped for every target that + # already has a predictions.parquet on disk. + runner.run( + forecaster_factory=stub_factory, + run_name="my_forecasts", + n_processes=N_PROCESSES, + filter_args=["solar_park"], + ) diff --git a/examples/benchmarks/custom_benchmark/example_baseline.py b/examples/benchmarks/custom_benchmark/example_baseline.py new file mode 100644 index 000000000..206dc1042 --- /dev/null +++ b/examples/benchmarks/custom_benchmark/example_baseline.py @@ -0,0 +1,93 @@ +"""Custom baseline: predicts a constant value (last known median) for all future timestamps. + +Implements BacktestForecasterMixin — the interface BEAM needs to run any model +in its backtesting/benchmarking pipeline. To create your own baseline, copy this +file and modify fit() and predict(). +""" + +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +from datetime import timedelta +from typing import override + +import pandas as pd + +from openstef_beam.backtesting.backtest_forecaster import BacktestForecasterConfig, BacktestForecasterMixin +from openstef_beam.backtesting.restricted_horizon_timeseries import RestrictedHorizonVersionedTimeSeries +from openstef_core.datasets import TimeSeriesDataset +from openstef_core.types import Q, Quantile + + +class ExampleBenchmarkForecaster(BacktestForecasterMixin): + """Predicts a constant median of recent history for all future timestamps. + + All quantile columns get the same value -- no uncertainty estimation. + """ + + def __init__(self, predict_quantiles: list[Quantile] | None = None) -> None: # noqa: D107 + # Quantiles define the probabilistic forecast bands (e.g. P05 = 5th percentile) + self._quantiles = predict_quantiles or [Q(0.05), Q(0.1), Q(0.3), Q(0.5), Q(0.7), Q(0.9), Q(0.95)] + self._median: float = 0.0 + + # BacktestForecasterConfig tells BEAM how to schedule training and prediction + self.config = BacktestForecasterConfig( + requires_training=True, # Call fit() before predict() + predict_length=timedelta(days=7), # How far ahead to forecast + predict_min_length=timedelta(minutes=15), + predict_context_length=timedelta(minutes=15), # Data needed before forecast start (>0) + predict_context_min_coverage=0.0, + training_context_length=timedelta(days=30), # How much history fit() sees + training_context_min_coverage=0.3, # Min 30% non-NaN data required + predict_sample_interval=timedelta(minutes=15), # Output resolution (15-min intervals) + ) + + @property + @override + def quantiles(self) -> list[Quantile]: + """Quantiles this forecaster produces.""" + return self._quantiles + + @override + def fit(self, data: RestrictedHorizonVersionedTimeSeries) -> None: + """Compute median of recent load data. + + Args: + data: Restricted-horizon view -- only sees data available at data.horizon. + """ + # data.horizon = the current point in time during backtesting + # get_window() returns only data that was available at that point (no lookahead) + training = data.get_window( + start=data.horizon - self.config.training_context_length, # 30 days before horizon + end=data.horizon, + available_before=data.horizon, # Ensures no future data leaks in + ) + # "load" is the target column (actual energy consumption/generation) + if "load" in training.data.columns: + self._median = float(training.data["load"].median()) + + @override + def predict(self, data: RestrictedHorizonVersionedTimeSeries) -> TimeSeriesDataset | None: + """Return constant median prediction for the forecast horizon. + + Returns: + Forecast with all quantiles set to the training median, or None on failure. + """ + # Build a DataFrame with "load" + one column per quantile (e.g. "quantile_P05") + # All values are the same constant (the median from fit()) + # q.format() converts Q(0.05) -> "quantile_P05" (the required column naming) + return TimeSeriesDataset( + data=pd.DataFrame( + data={"load": self._median} | {q.format(): self._median for q in self._quantiles}, + index=pd.DatetimeIndex( + pd.date_range( + data.horizon, + periods=int(self.config.predict_length / self.config.predict_sample_interval), + freq=self.config.predict_sample_interval, + ), + name="datetime", + ), + ), + sample_interval=self.config.predict_sample_interval, + ) diff --git a/examples/benchmarks/custom_benchmark/example_benchmark.py b/examples/benchmarks/custom_benchmark/example_benchmark.py new file mode 100644 index 000000000..8b8b4004b --- /dev/null +++ b/examples/benchmarks/custom_benchmark/example_benchmark.py @@ -0,0 +1,177 @@ +"""Example: custom benchmark with your own target provider. + +Shows how to extend SimpleTargetProvider to load your own data and build a +benchmark pipeline. Uses the Liander 2024 dataset as example data source -- +replace paths and logic with your own. + +Expected directory layout (customize via path overrides):: + + data_dir/ + ├── targets.yaml # Target definitions + ├── load_measurements/ + │ └── /.parquet # Measurements per target + └── features/ + └── /.parquet # Features per target (weather, etc.) +""" + +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +from datetime import timedelta +from pathlib import Path +from typing import Literal, override + +from huggingface_hub import snapshot_download +from pydantic import Field + +from openstef_beam.analysis import AnalysisConfig +from openstef_beam.analysis.visualizations import WindowedMetricVisualization +from openstef_beam.analysis.visualizations.grouped_target_metric_visualization import GroupedTargetMetricVisualization +from openstef_beam.analysis.visualizations.quantile_probability_visualization import QuantileProbabilityVisualization +from openstef_beam.analysis.visualizations.summary_table_visualization import SummaryTableVisualization +from openstef_beam.analysis.visualizations.timeseries_visualization import TimeSeriesVisualization +from openstef_beam.backtesting import BacktestConfig +from openstef_beam.benchmarking import BenchmarkPipeline, BenchmarkTarget, StrictExecutionCallback +from openstef_beam.benchmarking.storage.base import BenchmarkStorage +from openstef_beam.benchmarking.target_provider import SimpleTargetProvider +from openstef_beam.evaluation import EvaluationConfig, Window +from openstef_beam.evaluation.metric_providers import MetricProvider, RCRPSProvider, RMAEProvider +from openstef_core.types import AvailableAt, LeadTime, Quantile + +# Define your own target categories for filtering (must match group_name in targets.yaml) +type MyCategory = Literal["solar_park", "wind_park"] + + +class MyTargetProvider(SimpleTargetProvider[BenchmarkTarget, list[MyCategory]]): + """Custom target provider -- extend SimpleTargetProvider to load your own data. + + Configure path templates and data flags, then override methods to customize + target filtering, metrics, and file resolution. + """ + + # Path templates -- adapt to your directory structure + # {name} is replaced with target.name from targets.yaml + targets_file_path: str = Field(default="liander2024_targets.yaml", init=False) + measurements_path_template: str = Field(default="{name}.parquet", init=False) + weather_path_template: str = Field(default="{name}.parquet", init=False) + + # Disable shared profiles and prices -- only per-target features are used + # Set to True if you have shared data files (profiles.parquet, prices.parquet) + use_profiles: bool = False + use_prices: bool = False + + @override + def get_targets(self, filter_args: list[MyCategory] | None = None) -> list[BenchmarkTarget]: + """Load targets and optionally filter by category. + + Returns: + Filtered list of benchmark targets. + """ + # super().get_targets() reads targets from the YAML file + targets = super().get_targets(filter_args) + # Keep only targets whose group_name matches one of the filter categories + if filter_args is not None: + targets = [t for t in targets if t.group_name in filter_args] + return targets + + @override + def get_metrics_for_target(self, target: BenchmarkTarget) -> list[MetricProvider]: + """Define which metrics to compute per target. + + Returns: + List of metric providers. + """ + # rMAE: deterministic accuracy at the median (lower is better) + # rCRPS: probabilistic accuracy across all quantiles (lower is better) + return [ + RMAEProvider(quantiles=[Quantile(0.5)], lower_quantile=0.01, upper_quantile=0.99), + RCRPSProvider(lower_quantile=0.01, upper_quantile=0.99), + ] + + @override + def _get_measurements_path_for_target(self, target: BenchmarkTarget) -> Path: + """Resolve path to load measurement parquet. + + Liander 2024 uses: data_dir/load_measurements//.parquet + Change this to match your directory structure. + + Returns: + Path to the measurement parquet file. + """ + return self.data_dir / "load_measurements" / target.group_name / f"{target.name}.parquet" + + @override + def _get_weather_path_for_target(self, target: BenchmarkTarget) -> Path: + """Resolve path to features parquet (weather, etc.). + + Liander 2024 uses: data_dir/weather_forecasts_versioned//.parquet + Change this to match your directory structure. + + Returns: + Path to the features parquet file. + """ + return self.data_dir / "weather_forecasts_versioned" / target.group_name / f"{target.name}.parquet" + + +# --- Analysis config: which plots and tables to generate after evaluation --- +ANALYSIS_CONFIG = AnalysisConfig( + visualization_providers=[ + TimeSeriesVisualization(name="time_series"), + WindowedMetricVisualization( + name="rMAE_7D", + metric=("rMAE", Quantile(0.5)), + window=Window(lag=timedelta(hours=0), size=timedelta(days=7)), + ), + WindowedMetricVisualization( + name="rCRPS_30D", + metric="rCRPS", + window=Window(lag=timedelta(hours=0), size=timedelta(days=30)), + ), + GroupedTargetMetricVisualization(name="rMAE_grouped", metric="rMAE", quantile=Quantile(0.5)), + GroupedTargetMetricVisualization(name="rCRPS_grouped", metric="rCRPS"), + SummaryTableVisualization(name="summary"), + QuantileProbabilityVisualization(name="quantile_probability"), + ], +) + + +def create_custom_benchmark_runner( + storage: BenchmarkStorage, + data_dir: Path | None = None, +) -> BenchmarkPipeline[BenchmarkTarget, list[MyCategory]]: + """Assemble a benchmark pipeline with the custom target provider. + + Args: + storage: Where to save results. + data_dir: Dataset path. Downloads Liander 2024 from HuggingFace if None. + + Returns: + Ready-to-run benchmark pipeline. + """ + if data_dir is None: + data_dir = Path(snapshot_download(repo_id="OpenSTEF/liander2024-stef-benchmark", repo_type="dataset")) + + return BenchmarkPipeline[BenchmarkTarget, list[MyCategory]]( + # Backtest: how to replay history + backtest_config=BacktestConfig( + prediction_sample_interval=timedelta(minutes=15), # Data resolution + predict_interval=timedelta(hours=6), # New forecast every 6 hours + train_interval=timedelta(days=7), # Retrain model every 7 days + ), + # Evaluation: how to slice and score the results + evaluation_config=EvaluationConfig( + available_ats=[AvailableAt.from_string("D-1T06:00")], # Day-ahead forecast at 06:00 + lead_times=[ + LeadTime.from_string("P1D"), # 1 day ahead + ], # Evaluate all lead times + windows=[ # Rolling windows for metrics + Window(lag=timedelta(hours=0), size=timedelta(days=7)), + Window(lag=timedelta(hours=0), size=timedelta(days=30)), + ], + ), + analysis_config=ANALYSIS_CONFIG, + target_provider=MyTargetProvider(data_dir=data_dir), + storage=storage, + callbacks=[StrictExecutionCallback()], # Fail fast on errors + ) diff --git a/examples/benchmarks/custom_benchmark/run_benchmark.py b/examples/benchmarks/custom_benchmark/run_benchmark.py new file mode 100644 index 000000000..12bfc3140 --- /dev/null +++ b/examples/benchmarks/custom_benchmark/run_benchmark.py @@ -0,0 +1,91 @@ +"""Run the custom benchmark: example baseline vs OpenSTEF GBLinear. + +Uses the custom benchmark pipeline from example_benchmark.py (which extends +SimpleTargetProvider) instead of the built-in Liander 2024 runner. +""" + +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +import os + +# Prevent thread contention when running multiple targets in parallel +os.environ["OMP_NUM_THREADS"] = "1" +os.environ["OPENBLAS_NUM_THREADS"] = "1" +os.environ["MKL_NUM_THREADS"] = "1" + +import logging +import multiprocessing +from pathlib import Path + +from examples.benchmarks.custom_benchmark.example_baseline import ExampleBenchmarkForecaster +from examples.benchmarks.custom_benchmark.example_benchmark import MyCategory, create_custom_benchmark_runner +from openstef_beam.benchmarking import BenchmarkContext, BenchmarkTarget, LocalBenchmarkStorage +from openstef_beam.benchmarking.baselines import create_openstef4_preset_backtest_forecaster +from openstef_core.types import LeadTime, Q +from openstef_models.presets import ForecastingWorkflowConfig + +logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s] %(message)s") + +OUTPUT_PATH = Path("./benchmark_results") +N_PROCESSES = multiprocessing.cpu_count() + +# Optional: filter to specific target categories (None = run all) +BENCHMARK_FILTER: list[MyCategory] | None = ["solar_park"] + +# Quantiles define the probabilistic forecast bands +PREDICTION_QUANTILES = [Q(0.05), Q(0.1), Q(0.3), Q(0.5), Q(0.7), Q(0.9), Q(0.95)] + +# --- GBLinear config --- +# Map column names in your data to what OpenSTEF expects +gblinear_config = ForecastingWorkflowConfig( + model_id="custom_benchmark_", + run_name=None, + model="gblinear", + horizons=[LeadTime.from_string("P3D")], + quantiles=PREDICTION_QUANTILES, + model_reuse_enable=True, + radiation_column="shortwave_radiation", + wind_speed_column="wind_speed_80m", + pressure_column="surface_pressure", + temperature_column="temperature_2m", + relative_humidity_column="relative_humidity_2m", + energy_price_column="EPEX_NL", + rolling_aggregate_features=["mean", "median", "max", "min"], +) + + +# --- Example baseline factory --- +def example_factory(_context: BenchmarkContext, _target: BenchmarkTarget) -> ExampleBenchmarkForecaster: + """Create an example forecaster for a benchmark target. + + Returns: + Configured ExampleBenchmarkForecaster instance. + """ + return ExampleBenchmarkForecaster(predict_quantiles=PREDICTION_QUANTILES) + + +if __name__ == "__main__": + # 1. Run example baseline using the custom benchmark pipeline + create_custom_benchmark_runner( + storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH / "ExampleBaseline"), + ).run( + forecaster_factory=example_factory, + run_name="example_baseline", + n_processes=N_PROCESSES, + filter_args=BENCHMARK_FILTER, + ) + + # 2. Run GBLinear using the same custom pipeline + create_custom_benchmark_runner( + storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH / "GBLinear"), + ).run( + forecaster_factory=create_openstef4_preset_backtest_forecaster( + workflow_config=gblinear_config, + cache_dir=OUTPUT_PATH / "cache", + ), + run_name="gblinear", + n_processes=N_PROCESSES, + filter_args=BENCHMARK_FILTER, + ) diff --git a/examples/benchmarks/custom_benchmark/run_liander2024_benchmark.py b/examples/benchmarks/custom_benchmark/run_liander2024_benchmark.py new file mode 100644 index 000000000..884af5627 --- /dev/null +++ b/examples/benchmarks/custom_benchmark/run_liander2024_benchmark.py @@ -0,0 +1,97 @@ +"""Example: run the built-in Liander 2024 benchmark with a custom baseline and GBLinear. + +Uses create_liander2024_benchmark_runner() which pre-configures everything: +backtest settings, evaluation windows, metrics, analysis plots, and target +definitions. Data is auto-downloaded from HuggingFace. +""" + +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +import os + +os.environ["OMP_NUM_THREADS"] = "1" +os.environ["OPENBLAS_NUM_THREADS"] = "1" +os.environ["MKL_NUM_THREADS"] = "1" + +import logging +import multiprocessing +from pathlib import Path + +from examples.benchmarks.custom_benchmark.example_baseline import ExampleBenchmarkForecaster +from openstef_beam.benchmarking import BenchmarkContext, BenchmarkTarget, LocalBenchmarkStorage +from openstef_beam.benchmarking.baselines import create_openstef4_preset_backtest_forecaster +from openstef_beam.benchmarking.benchmarks.liander2024 import Liander2024Category, create_liander2024_benchmark_runner +from openstef_beam.benchmarking.callbacks.strict_execution_callback import StrictExecutionCallback +from openstef_core.types import LeadTime, Q +from openstef_models.presets import ForecastingWorkflowConfig + +logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s] %(message)s") + +OUTPUT_PATH = Path("./benchmark_results") +N_PROCESSES = multiprocessing.cpu_count() + +# Optional: filter to specific target categories (None = run all) +BENCHMARK_FILTER: list[Liander2024Category] | None = None + +# Quantiles define the probabilistic forecast bands +# Q(0.05) = 5th percentile, Q(0.5) = median, Q(0.95) = 95th percentile +PREDICTION_QUANTILES = [Q(0.05), Q(0.1), Q(0.3), Q(0.5), Q(0.7), Q(0.9), Q(0.95)] + +# --- GBLinear model config --- +# Map column names in your data to what OpenSTEF expects +gblinear_config = ForecastingWorkflowConfig( + model_id="liander_benchmark_", + run_name=None, + model="gblinear", + horizons=[LeadTime.from_string("P3D")], + quantiles=PREDICTION_QUANTILES, + model_reuse_enable=True, + radiation_column="shortwave_radiation", + wind_speed_column="wind_speed_80m", + pressure_column="surface_pressure", + temperature_column="temperature_2m", + relative_humidity_column="relative_humidity_2m", + energy_price_column="EPEX_NL", + rolling_aggregate_features=["mean", "median", "max", "min"], +) + + +def example_factory(_context: BenchmarkContext, _target: BenchmarkTarget) -> ExampleBenchmarkForecaster: + """Create the example baseline forecaster. + + Returns: + Configured ExampleBenchmarkForecaster instance. + """ + return ExampleBenchmarkForecaster(predict_quantiles=PREDICTION_QUANTILES) + + +if __name__ == "__main__": + # 1. Run custom baseline on Liander 2024 + # create_liander2024_benchmark_runner() sets up everything: data download, configs, metrics + # LocalBenchmarkStorage writes results as parquet files to disk + create_liander2024_benchmark_runner( + storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH / "ExampleBaseline"), + callbacks=[StrictExecutionCallback()], # Fail fast on errors + ).run( + forecaster_factory=example_factory, # Your model factory (called per target) + run_name="example_baseline", # Label for this run + n_processes=N_PROCESSES, # Parallel targets + filter_args=BENCHMARK_FILTER, # None = all categories + ) + + # 2. Run GBLinear on Liander 2024 + # create_openstef4_preset_backtest_forecaster returns a factory that wraps OpenSTEF models + create_liander2024_benchmark_runner( + storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH / "GBLinear"), + callbacks=[StrictExecutionCallback()], + ).run( + forecaster_factory=create_openstef4_preset_backtest_forecaster( + workflow_config=gblinear_config, + cache_dir=OUTPUT_PATH / "cache", + ), + run_name="gblinear", + n_processes=N_PROCESSES, + filter_args=BENCHMARK_FILTER, + )