diff --git a/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py b/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py index 78b6954c3..b12785523 100644 --- a/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py +++ b/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py @@ -35,6 +35,11 @@ class AnalysisConfig(BaseConfig): visualization_providers: list[VisualizationProvider] = Field( default=[], description="List of visualization providers to use for generating analysis outputs" ) + filterings: list[Filtering] | None = Field( + default=None, + description="When set, only include these filterings (e.g. LeadTime, AvailableAt) in analysis. " + "None means use all filterings found in the evaluation data.", + ) class AnalysisPipeline: @@ -61,8 +66,8 @@ def __init__( super().__init__() self.config = config - @staticmethod def _group_by_filtering( + self, reports: Sequence[tuple[TargetMetadata, EvaluationReport]], ) -> dict[Filtering, list[ReportTuple]]: """Group reports by their lead time filtering conditions. @@ -71,13 +76,17 @@ def _group_by_filtering( 1-hour ahead vs 24-hour ahead forecasts), enabling comparison of model performance across different forecasting horizons. + When ``config.filterings`` is set, only subsets matching those filterings are included. + Returns: Dictionary mapping lead time filtering conditions to lists of report tuples. """ + allowed = set(self.config.filterings) if self.config.filterings is not None else None return groupby( (subset.filtering, (base_metadata.with_filtering(subset.filtering), subset)) for base_metadata, report in reports for subset in report.subset_reports + if allowed is None or subset.filtering in allowed ) def run_for_subsets( @@ -103,10 +112,7 @@ def run_for_subsets( no providers support the requested aggregation level. """ return [ - provider.create( - reports=reports, - aggregation=aggregation, - ) + provider.create(reports=reports, aggregation=aggregation) for provider in self.config.visualization_providers if aggregation in provider.supported_aggregations ] diff --git a/packages/openstef-beam/src/openstef_beam/analysis/visualizations/windowed_metric_visualization.py b/packages/openstef-beam/src/openstef_beam/analysis/visualizations/windowed_metric_visualization.py index 249b8d5da..e59cd18ff 100644 --- a/packages/openstef-beam/src/openstef_beam/analysis/visualizations/windowed_metric_visualization.py +++ b/packages/openstef-beam/src/openstef_beam/analysis/visualizations/windowed_metric_visualization.py @@ -8,6 +8,7 @@ how performance metrics evolve across different time windows. """ +import logging import operator from collections import defaultdict from datetime import datetime @@ -23,6 +24,8 @@ from openstef_beam.evaluation import EvaluationSubsetReport, Window from openstef_core.types import Quantile +_logger = logging.getLogger(__name__) + class WindowedMetricVisualization(VisualizationProvider): """Creates time series plots showing metric evolution across evaluation windows. @@ -180,7 +183,8 @@ def create_by_none( time_value_pairs = self._extract_windowed_metric_values(report, metric_name, quantile_or_global) if not time_value_pairs: - raise ValueError("No windowed metrics found for the specified window and metric.") + _logger.warning("No windowed metrics for %s (%s) — skipping visualization.", metadata.name, self.name) + return self._empty_output(f"No windowed metrics available for {metadata.name}") # Unpack the sorted pairs timestamps = [pair[0] for pair in time_value_pairs] @@ -198,19 +202,23 @@ def create_by_none( return VisualizationOutput(name=self.name, figure=figure) + def _empty_output(self, message: str) -> VisualizationOutput: + return VisualizationOutput(name=self.name, html=f"
{message}
") + @override def create_by_run_and_none(self, reports: dict[RunName, list[ReportTuple]]) -> VisualizationOutput: metric_name, quantile_or_global = self._get_metric_info() plotter = WindowedMetricPlotter() + has_data = False # Collect data for each run for run_name, report_pairs in reports.items(): for _metadata, report in report_pairs: time_value_pairs = self._extract_windowed_metric_values(report, metric_name, quantile_or_global) - # Skip if no data points found for this run if not time_value_pairs: - raise ValueError("No windowed metrics found for the specified window, metric and run.") + _logger.warning("No windowed metrics for run '%s' (%s) — skipping.", run_name, self.name) + continue # Unpack the sorted pairs timestamps = [pair[0] for pair in time_value_pairs] @@ -221,6 +229,10 @@ def create_by_run_and_none(self, reports: dict[RunName, list[ReportTuple]]) -> V timestamps=timestamps, metric_values=metric_values, ) + has_data = True + + if not has_data: + return self._empty_output("No windowed metrics available for any run") title = self._create_plot_title(metric_name, quantile_or_global, "by Run") figure = plotter.plot(title=title) @@ -238,13 +250,14 @@ def create_by_target( # Get the run name from the first target metadata for the title run_name = reports[0][0].run_name if reports else "" + has_data = False # Process each target's report for metadata, report in reports: time_value_pairs = self._extract_windowed_metric_values(report, metric_name, quantile_or_global) - # Skip if no data points found for this target if not time_value_pairs: - raise ValueError("No windowed metrics found for the specified window, metric and target.") + _logger.warning("No windowed metrics for target '%s' (%s) — skipping.", metadata.name, self.name) + continue # Unpack the sorted pairs timestamps = [pair[0] for pair in time_value_pairs] @@ -256,6 +269,10 @@ def create_by_target( timestamps=timestamps, metric_values=metric_values, ) + has_data = True + + if not has_data: + return self._empty_output("No windowed metrics available for any target") title_suffix = "by Target" if run_name: @@ -274,11 +291,13 @@ def create_by_run_and_target( ) -> VisualizationOutput: metric_name, quantile_or_global = self._get_metric_info() plotter = WindowedMetricPlotter() + has_data = False # Process each run and calculate averaged metrics across its targets for run_name, target_reports in reports.items(): if not target_reports: - raise ValueError("No windowed metrics found for the specified window, metric and run.") + _logger.warning("No reports for run '%s' (%s) — skipping.", run_name, self.name) + continue # Average windowed metrics across all targets for this run averaged_pairs = self._average_time_series_across_targets( @@ -287,9 +306,9 @@ def create_by_run_and_target( quantile_or_global=quantile_or_global, ) - # Skip if no averaged data points found for this run if not averaged_pairs: - raise ValueError("No windowed averaged metrics found for the specified window, metric and run.") + _logger.warning("No windowed averaged metrics for run '%s' (%s) — skipping.", run_name, self.name) + continue # Unpack the averaged pairs timestamps = [pair[0] for pair in averaged_pairs] @@ -301,6 +320,10 @@ def create_by_run_and_target( timestamps=timestamps, metric_values=metric_values, ) + has_data = True + + if not has_data: + return self._empty_output("No windowed metrics available for any run") title = self._create_plot_title(metric_name, quantile_or_global, "by run (averaged over targets in group)") figure = plotter.plot(title=title, metric_name=metric_name) @@ -321,10 +344,12 @@ def create_by_run_and_group( for (run_name, _group_name), target_reports in reports.items(): run_to_targets.setdefault(run_name, []).extend(target_reports) + has_data = False # Average metrics over all targets for each run for run_name, all_target_reports in run_to_targets.items(): if not all_target_reports: - raise ValueError("No windowed metrics found for the specified window, metric and run.") + _logger.warning("No reports for run '%s' (%s) — skipping.", run_name, self.name) + continue # Average windowed metrics across all targets for this run averaged_pairs = self._average_time_series_across_targets( @@ -334,7 +359,8 @@ def create_by_run_and_group( ) if not averaged_pairs: - raise ValueError("No windowed averaged metrics found for the specified window, metric and run.") + _logger.warning("No windowed averaged metrics for run '%s' (%s) — skipping.", run_name, self.name) + continue timestamps = [pair[0] for pair in averaged_pairs] metric_values = [pair[1] for pair in averaged_pairs] @@ -345,6 +371,10 @@ def create_by_run_and_group( timestamps=timestamps, metric_values=metric_values, ) + has_data = True + + if not has_data: + return self._empty_output("No windowed metrics available for any run") title = self._create_plot_title(metric_name, quantile_or_global, "by run (averaged over all targets)") figure = plotter.plot(title=title, metric_name=metric_name) @@ -373,9 +403,7 @@ def create_by_group( ) if not averaged_pairs: - raise ValueError( - "No windowed averaged metrics found for the specified window, metric and run across all groups." - ) + return self._empty_output("No windowed metrics available across all groups") timestamps = [pair[0] for pair in averaged_pairs] metric_values = [pair[1] for pair in averaged_pairs] diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py index d6328e7f2..0811613be 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py @@ -30,7 +30,11 @@ ) from openstef_core.base_model import BaseModel from openstef_core.datasets import TimeSeriesDataset -from openstef_core.exceptions import FlatlinerDetectedError, MissingExtraError, NotFittedError +from openstef_core.exceptions import ( + FlatlinerDetectedError, + InsufficientlyCompleteError, + MissingExtraError, +) from openstef_core.types import Q from openstef_models.presets import ForecastingWorkflowConfig, create_forecasting_workflow from openstef_models.presets.forecasting_workflow import LocationConfig @@ -118,6 +122,9 @@ def fit(self, data: RestrictedHorizonVersionedTimeSeries) -> None: self._logger.warning("Flatliner detected during training") self._is_flatliner_detected = True return # Skip setting the workflow on flatliner detection + except InsufficientlyCompleteError: + self._logger.warning("Insufficient training data at %s, retaining previous model", data.horizon) + return # Retain previous model state; predictions will use the last successful fit self._workflow = workflow @@ -128,7 +135,8 @@ def predict(self, data: RestrictedHorizonVersionedTimeSeries) -> TimeSeriesDatas return None if self._workflow is None: - raise NotFittedError("Must call fit() before predict()") + self._logger.info("No fitted model available, skipping prediction") + return None # Extract the dataset including both historical context and forecast period predict_data = data.get_window( diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py index 2aaac8f5c..cc9e435b7 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py @@ -120,6 +120,8 @@ def run( self, run_data: dict[RunName, BenchmarkStorage], filter_args: F | None = None, + *, + strict: bool = True, ): """Execute comparison analysis across multiple benchmark runs. @@ -132,6 +134,8 @@ def run( Each storage backend should contain evaluation results for the run. filter_args: Optional criteria for filtering targets. Only targets matching these criteria will be included in the comparison. + strict: If True, raise an error when evaluation is missing for a target. + If False, skip missing targets. """ targets = self.target_provider.get_targets(filter_args) @@ -142,7 +146,7 @@ def run( targets=targets, storage=run_storage, run_name=run_name, - strict=True, + strict=strict, ) reports.extend(run_reports) diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_pipeline.py b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_pipeline.py index f4220e0fa..ea4058563 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_pipeline.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_pipeline.py @@ -157,6 +157,8 @@ def run( run_name: str = "default", filter_args: F | None = None, n_processes: int | None = None, + *, + skip_analysis: bool = False, ) -> None: """Runs the benchmark for all targets, optionally filtered and in parallel. @@ -174,6 +176,8 @@ def run( matching these criteria will be processed. n_processes: Number of processes to use for parallel execution. If None or 1, targets are processed sequentially. + skip_analysis: When True, skips per-target and global analysis steps. + Useful when analysis will be run separately later. """ context = BenchmarkContext(run_name=run_name) @@ -184,13 +188,13 @@ def run( _logger.info("Running benchmark in parallel with %d processes", n_processes) run_parallel( - process_fn=partial(self._run_for_target, context, forecaster_factory), + process_fn=partial(self._run_for_target, context, forecaster_factory, skip_analysis=skip_analysis), items=targets, n_processes=n_processes, mode="loky", ) - if not self.storage.has_analysis_output( + if not skip_analysis and not self.storage.has_analysis_output( AnalysisScope( aggregation=AnalysisAggregation.GROUP, run_name=context.run_name, @@ -203,7 +207,14 @@ def run( self.callback_manager.on_benchmark_complete(runner=self, targets=cast(list[BenchmarkTarget], targets)) - def _run_for_target(self, context: BenchmarkContext, model_factory: ForecasterFactory[T], target: T) -> None: + def _run_for_target( + self, + context: BenchmarkContext, + model_factory: ForecasterFactory[T], + target: T, + *, + skip_analysis: bool = False, + ) -> None: """Run benchmark for a single target.""" if not self.callback_manager.on_target_start(runner=self, target=target): _logger.info("Skipping target") @@ -221,7 +232,7 @@ def _run_for_target(self, context: BenchmarkContext, model_factory: ForecasterFa predictions = self.storage.load_backtest_output(target) self.run_evaluation_for_target(target=target, predictions=predictions, quantiles=forecaster.quantiles) - if not self.storage.has_analysis_output( + if not skip_analysis and not self.storage.has_analysis_output( scope=AnalysisScope( aggregation=AnalysisAggregation.TARGET, target_name=target.name, diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/storage/local_storage.py b/packages/openstef-beam/src/openstef_beam/benchmarking/storage/local_storage.py index 892cd09ec..9c0f9fb83 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/storage/local_storage.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/storage/local_storage.py @@ -149,7 +149,7 @@ def get_analysis_path(self, scope: AnalysisScope) -> Path: elif scope.aggregation == AnalysisAggregation.RUN_AND_NONE: output_dir = base_dir / str(scope.group_name) / str(scope.target_name) elif scope.aggregation == AnalysisAggregation.RUN_AND_GROUP: - output_dir = base_dir + output_dir = base_dir / "global" elif scope.aggregation == AnalysisAggregation.RUN_AND_TARGET: output_dir = base_dir / str(scope.group_name) / "global" else: diff --git a/packages/openstef-beam/tests/unit/analysis/visualizations/test_windowed_metric_visualization.py b/packages/openstef-beam/tests/unit/analysis/visualizations/test_windowed_metric_visualization.py index 0d905fc67..c7cfde183 100644 --- a/packages/openstef-beam/tests/unit/analysis/visualizations/test_windowed_metric_visualization.py +++ b/packages/openstef-beam/tests/unit/analysis/visualizations/test_windowed_metric_visualization.py @@ -147,14 +147,17 @@ def test_create_by_none_creates_time_series_visualization( assert result.figure == mock_plotly_figure -def test_create_by_none_raises_error_when_no_windowed_data( +def test_create_by_none_returns_empty_output_when_no_windowed_data( empty_evaluation_report: EvaluationSubsetReport, simple_target_metadata: TargetMetadata, sample_window: Window ): - """Test that create_by_none raises appropriate error when no windowed metrics are found.""" + """Test that create_by_none returns an HTML placeholder when no windowed metrics are found.""" viz = WindowedMetricVisualization(name="test_viz", metric="mae", window=sample_window) - with pytest.raises(ValueError, match="No windowed metrics found for the specified window and metric"): - viz.create_by_none(empty_evaluation_report, simple_target_metadata) + result = viz.create_by_none(empty_evaluation_report, simple_target_metadata) + + assert result.figure is None + assert result.html is not None + assert "No windowed metrics" in result.html def test_create_by_target_adds_time_series_per_target( diff --git a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py index e9b66591a..176e387d1 100644 --- a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py +++ b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py @@ -5,6 +5,7 @@ from datetime import UTC, datetime, timedelta from pathlib import Path +import numpy as np import pytest from openstef_beam.backtesting.restricted_horizon_timeseries import RestrictedHorizonVersionedTimeSeries @@ -109,3 +110,73 @@ def test_fit_then_predict_returns_forecast( # Assert assert result is not None assert len(result.data) > 0 + + +def test_fit_retains_previous_model_on_insufficient_data( + benchmark_target: BenchmarkTarget, + training_data: VersionedTimeSeriesDataset, + tmp_path: Path, +): + """fit() should skip training and retain the previous model when data has all-NaN targets.""" + # Arrange — disable model reuse to avoid caching side-effects + config = ForecastingWorkflowConfig( + model_id="test_insufficient", + model="xgboost", + horizons=[LeadTime.from_string("PT24H")], + quantiles=[Q(0.5)], + model_reuse_enable=False, + ) + factory = create_openstef4_preset_backtest_forecaster( + workflow_config=config, + cache_dir=tmp_path / "test_insufficient", + ) + forecaster = factory(BenchmarkContext(run_name="insufficient"), benchmark_target) + + # First fit succeeds — establishes a baseline model + horizon_good = datetime(2024, 5, 25, tzinfo=UTC) + rhvts_good = RestrictedHorizonVersionedTimeSeries(dataset=training_data, horizon=horizon_good) + forecaster.fit(rhvts_good) + assert forecaster._workflow is not None + previous_workflow = forecaster._workflow + + # Build a dataset with all-NaN load to trigger InsufficientlyCompleteError naturally + nan_ts = create_synthetic_forecasting_dataset( + start=datetime(2024, 2, 1, tzinfo=UTC), + length=timedelta(days=120), + sample_interval=timedelta(minutes=15), + include_atmosphere=True, + include_price=True, + include_available_at=True, + ) + nan_ts.data["load"] = np.nan + nan_dataset = VersionedTimeSeriesDataset([nan_ts]) + rhvts_nan = RestrictedHorizonVersionedTimeSeries(dataset=nan_dataset, horizon=datetime(2024, 5, 26, tzinfo=UTC)) + + # Act — fit with data that has no valid targets + forecaster.fit(rhvts_nan) + + # Assert — previous model is retained (fit returned early without updating _workflow) + assert forecaster._workflow is previous_workflow + + +def test_predict_returns_none_when_never_fitted( + xgboost_config: ForecastingWorkflowConfig, + benchmark_target: BenchmarkTarget, + training_data: VersionedTimeSeriesDataset, + tmp_path: Path, +): + """predict() should return None when no model has been fitted yet.""" + # Arrange + factory = create_openstef4_preset_backtest_forecaster( + workflow_config=xgboost_config, + cache_dir=tmp_path / "test_no_fit", + ) + forecaster = factory(BenchmarkContext(run_name="no_fit"), benchmark_target) + horizon = datetime(2024, 5, 25, tzinfo=UTC) + rhvts = RestrictedHorizonVersionedTimeSeries(dataset=training_data, horizon=horizon) + + # Act + result = forecaster.predict(rhvts) + + # Assert + assert result is None diff --git a/packages/openstef-core/src/openstef_core/types.py b/packages/openstef-core/src/openstef_core/types.py index 1d536f9cb..2bd452bdd 100644 --- a/packages/openstef-core/src/openstef_core/types.py +++ b/packages/openstef-core/src/openstef_core/types.py @@ -338,7 +338,11 @@ def __get_pydantic_core_schema__(cls, source_type: Any, handler: GetCoreSchemaHa Returns: Core schema for Pydantic validation. """ - return core_schema.no_info_after_validator_function(cls, handler(float)) + return core_schema.no_info_after_validator_function( + cls, + handler(float), + serialization=core_schema.plain_serializer_function_ser_schema(float), + ) def format(self) -> str: """Instance method to format the quantile as a string. diff --git a/packages/openstef-core/src/openstef_core/utils/pandas.py b/packages/openstef-core/src/openstef_core/utils/pandas.py index 9466c353e..d77bc1903 100644 --- a/packages/openstef-core/src/openstef_core/utils/pandas.py +++ b/packages/openstef-core/src/openstef_core/utils/pandas.py @@ -83,3 +83,23 @@ def combine_timeseries_indexes(indexes: Sequence[pd.DatetimeIndex]) -> pd.Dateti ) index_raw = functools.reduce(union_fn, indexes) return index_raw.unique().sort_values(ascending=True) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType] + + +def nan_aware_weighted_mean(values: pd.DataFrame, weights: pd.DataFrame) -> "pd.Series[float]": + """Weighted mean that redistributes NaN values' weights proportionally. + + For each row, weights corresponding to NaN values are zeroed out and the + remaining weights are used to compute the weighted sum, normalized by their + total. If all values in a row are NaN, the result is 0. + + Args: + values: DataFrame of values (may contain NaN). + weights: DataFrame of non-negative weights, aligned with values. + + Returns: + Series with the weighted mean for each row. + """ + valid_weights = weights.where(values.notna(), 0) + available = cast("pd.Series[float]", valid_weights.sum(axis=1).replace(0, 1)) # pyright: ignore[reportUnknownMemberType] + weighted_sum = cast("pd.Series[float]", values.fillna(0).mul(valid_weights).sum(axis=1)) # pyright: ignore[reportUnknownMemberType] + return weighted_sum / available diff --git a/packages/openstef-core/tests/unit/test_types.py b/packages/openstef-core/tests/unit/test_types.py index 189c02733..1eb166929 100644 --- a/packages/openstef-core/tests/unit/test_types.py +++ b/packages/openstef-core/tests/unit/test_types.py @@ -2,13 +2,15 @@ # # SPDX-License-Identifier: MPL-2.0 +import warnings from datetime import UTC, datetime, time, timedelta, timezone import pandas as pd import pytest import pytz +from pydantic import BaseModel -from openstef_core.types import AvailableAt, LeadTime +from openstef_core.types import AvailableAt, LeadTime, Quantile, QuantileOrGlobal @pytest.mark.parametrize( @@ -342,3 +344,20 @@ def test_available_at_apply_index_matches_apply(): scalar = pd.DatetimeIndex([at.apply(ts.to_pydatetime()) for ts in index]) pd.testing.assert_index_equal(vectorized, scalar) + + +def test_quantile_serialization_no_warnings(): + """Quantile used as dict key in a QuantileOrGlobal union must not trigger pydantic serialization warnings.""" + + class Model(BaseModel): + metrics: dict[QuantileOrGlobal, dict[str, float]] + + m = Model(metrics={Quantile(0.05): {"mae": 1.0}, Quantile(0.5): {"mae": 2.0}, "global": {"mae": 1.5}}) + + with warnings.catch_warnings(): + warnings.filterwarnings("error", category=UserWarning, message="Pydantic serializer") + data = m.model_dump_json() + + assert '"0.05"' in data + assert '"0.5"' in data + assert '"global"' in data diff --git a/packages/openstef-core/tests/unit/utils/test_pandas.py b/packages/openstef-core/tests/unit/utils/test_pandas.py new file mode 100644 index 000000000..cf056d9cc --- /dev/null +++ b/packages/openstef-core/tests/unit/utils/test_pandas.py @@ -0,0 +1,56 @@ +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project