diff --git a/src/sktime_mcp/benchmark/__init__.py b/src/sktime_mcp/benchmark/__init__.py new file mode 100644 index 00000000..781ae98e --- /dev/null +++ b/src/sktime_mcp/benchmark/__init__.py @@ -0,0 +1,6 @@ +"""Benchmark suite for sktime-mcp agentic workflows.""" + +from sktime_mcp.benchmark.runner import BenchmarkRunner +from sktime_mcp.benchmark.scorer import BenchmarkScorer + +__all__ = ["BenchmarkRunner", "BenchmarkScorer"] \ No newline at end of file diff --git a/src/sktime_mcp/benchmark/runner.py b/src/sktime_mcp/benchmark/runner.py new file mode 100644 index 00000000..36b12e76 --- /dev/null +++ b/src/sktime_mcp/benchmark/runner.py @@ -0,0 +1,188 @@ +""" +Benchmark runner for sktime-mcp agentic workflows. + +Runs benchmark tasks through the MCP tools and collects results. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any + +from sktime_mcp.benchmark.tasks import BenchmarkTask, load_all_tasks +from sktime_mcp.composition.validator import get_composition_validator +from sktime_mcp.tools.evaluate import evaluate_estimator_tool +from sktime_mcp.tools.fit_predict import fit_predict_tool +from sktime_mcp.tools.instantiate import instantiate_estimator_tool + +logger = logging.getLogger(__name__) + + +@dataclass +class TaskResult: + """ + Result of running a single benchmark task. + + Attributes + ---------- + task_name : str + Name of the task + estimator_name : str + Estimator used + pipeline : list[str] + Pipeline components used + pipeline_valid : bool + Whether the pipeline composition is valid + fit_predict_success : bool + Whether fit-predict completed successfully + cv_results : list[dict] + Cross-validation results from evaluate tool + errors : list[str] + Any errors encountered + raw : dict + Raw tool outputs + """ + + task_name: str + estimator_name: str + pipeline: list[str] = field(default_factory=list) + pipeline_valid: bool = False + fit_predict_success: bool = False + cv_results: list[dict] = field(default_factory=list) + errors: list[str] = field(default_factory=list) + raw: dict[str, Any] = field(default_factory=dict) + + +class BenchmarkRunner: + """ + Runs benchmark tasks through sktime-mcp tools. + + Usage + ----- + >>> runner = BenchmarkRunner() + >>> results = runner.run_all() + + Or run a single task: + >>> result = runner.run_task(task, estimator_name="NaiveForecaster") + """ + + def __init__(self): + self._validator = get_composition_validator() + + def run_task( + self, + task: BenchmarkTask, + estimator_name: str, + pipeline: list[str] | None = None, + ) -> TaskResult: + """ + Run a single benchmark task with a given estimator. + + Parameters + ---------- + task : BenchmarkTask + The task to run + estimator_name : str + Name of the estimator to use + pipeline : list[str], optional + Pipeline components. Defaults to [estimator_name] + + Returns + ------- + TaskResult + """ + pipeline = pipeline or [estimator_name] + result = TaskResult( + task_name=task.name, + estimator_name=estimator_name, + pipeline=pipeline, + ) + + # Step 1 - validate pipeline composition + validation = self._validator.validate_pipeline(pipeline) + result.pipeline_valid = validation.valid + if not validation.valid: + result.errors.extend(validation.errors) + logger.warning( + f"Invalid pipeline {pipeline} for task {task.name}: " + f"{validation.errors}" + ) + + # Step 2 - instantiate estimator + instantiate_result = instantiate_estimator_tool(estimator_name) + result.raw["instantiate"] = instantiate_result + + if not instantiate_result.get("success"): + result.errors.append( + f"Failed to instantiate {estimator_name}: " + f"{instantiate_result.get('error')}" + ) + return result + + handle = instantiate_result["handle"] + + # Step 3 - fit predict + fp_result = fit_predict_tool(handle, task.dataset, task.horizon) + result.raw["fit_predict"] = fp_result + result.fit_predict_success = fp_result.get("success", False) + + if not result.fit_predict_success: + result.errors.append( + f"fit_predict failed: {fp_result.get('error')}" + ) + + # Step 4 - evaluate via cross validation + eval_result = evaluate_estimator_tool(handle, task.dataset) + result.raw["evaluate"] = eval_result + + if eval_result.get("success"): + result.cv_results = eval_result.get("results", []) + else: + result.errors.append( + f"evaluate failed: {eval_result.get('error')}" + ) + + return result + + def run_task_all_estimators( + self, + task: BenchmarkTask, + ) -> list[TaskResult]: + """ + Run a task against all its valid estimators. + + Parameters + ---------- + task : BenchmarkTask + + Returns + ------- + list[TaskResult] + """ + results = [] + for estimator_name in task.valid_estimators: + logger.info(f"Running {task.name} with {estimator_name}") + result = self.run_task(task, estimator_name) + results.append(result) + return results + + def run_all( + self, + ) -> dict[str, list[TaskResult]]: + """ + Run all benchmark tasks. + + Returns + ------- + dict mapping task_name -> list of TaskResults + """ + tasks = load_all_tasks() + all_results = {} + + for task in tasks: + logger.info(f"Starting benchmark task: {task.name}") + results = self.run_task_all_estimators(task) + all_results[task.name] = results + + return all_results \ No newline at end of file diff --git a/src/sktime_mcp/benchmark/scorer.py b/src/sktime_mcp/benchmark/scorer.py new file mode 100644 index 00000000..2a1a3fc9 --- /dev/null +++ b/src/sktime_mcp/benchmark/scorer.py @@ -0,0 +1,219 @@ +""" +Scorer for sktime-mcp benchmark results. + +Scores TaskResults on three dimensions: +1. pipeline_validity_score - is the composition valid? +2. performance_score - how good is the forecast (MAPE based)? +3. task_match_score - did the estimator match the expected task? + +Final score is a weighted combination of all three. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass + +from sktime_mcp.benchmark.runner import TaskResult +from sktime_mcp.benchmark.tasks import BenchmarkTask +from sktime_mcp.registry.interface import get_registry + +logger = logging.getLogger(__name__) + +# Weights for final score +WEIGHT_PIPELINE_VALIDITY = 0.3 +WEIGHT_PERFORMANCE = 0.5 +WEIGHT_TASK_MATCH = 0.2 + + +@dataclass +class Score: + """ + Scores for a single TaskResult. + + Attributes + ---------- + task_name : str + estimator_name : str + pipeline_validity_score : float + 1.0 if pipeline is valid, 0.0 otherwise + performance_score : float + Normalized score based on MAPE (lower MAPE = higher score) + task_match_score : float + 1.0 if estimator matches expected task type, 0.0 otherwise + overall_score : float + Weighted combination of all three scores + mape : float or None + Raw MAPE value if available + notes : list[str] + Human readable notes about the score + """ + + task_name: str + estimator_name: str + pipeline_validity_score: float = 0.0 + performance_score: float = 0.0 + task_match_score: float = 0.0 + overall_score: float = 0.0 + mape: float | None = None + notes: list[str] = None + + def __post_init__(self): + if self.notes is None: + self.notes = [] + + def to_dict(self) -> dict: + return { + "task_name": self.task_name, + "estimator_name": self.estimator_name, + "pipeline_validity_score": round(self.pipeline_validity_score, 3), + "performance_score": round(self.performance_score, 3), + "task_match_score": round(self.task_match_score, 3), + "overall_score": round(self.overall_score, 3), + "mape": round(self.mape, 4) if self.mape is not None else None, + "notes": self.notes, + } + + +class BenchmarkScorer: + """ + Scores benchmark results from BenchmarkRunner. + + Usage + ----- + >>> scorer = BenchmarkScorer() + >>> scores = scorer.score_results(results, task) + >>> report = scorer.summary(scores) + """ + + def __init__(self): + self._registry = get_registry() + + def score_result( + self, + result: TaskResult, + task: BenchmarkTask, + ) -> Score: + """ + Score a single TaskResult against its task definition. + + Parameters + ---------- + result : TaskResult + task : BenchmarkTask + + Returns + ------- + Score + """ + score = Score( + task_name=result.task_name, + estimator_name=result.estimator_name, + ) + + # Score 1 - pipeline validity + score.pipeline_validity_score = 1.0 if result.pipeline_valid else 0.0 + if not result.pipeline_valid: + score.notes.append( + f"Invalid pipeline: {result.errors}" + ) + + # Score 2 - task match + score.task_match_score = self._score_task_match( + result.estimator_name, task.expected_task + ) + + # Score 3 - performance via MAPE + score.mape, score.performance_score = self._score_performance( + result.cv_results + ) + + # Final weighted score + score.overall_score = ( + WEIGHT_PIPELINE_VALIDITY * score.pipeline_validity_score + + WEIGHT_PERFORMANCE * score.performance_score + + WEIGHT_TASK_MATCH * score.task_match_score + ) + + return score + + def score_results( + self, + results: list[TaskResult], + task: BenchmarkTask, + ) -> list[Score]: + """Score a list of TaskResults for a given task.""" + return [self.score_result(r, task) for r in results] + + def summary(self, scores: list[Score]) -> dict: + """ + Generate a summary report from a list of scores. + + Parameters + ---------- + scores : list[Score] + + Returns + ------- + dict with ranked results and best estimator + """ + if not scores: + return {"error": "No scores to summarize"} + + ranked = sorted( + scores, key=lambda s: s.overall_score, reverse=True + ) + + return { + "best_estimator": ranked[0].estimator_name, + "best_score": round(ranked[0].overall_score, 3), + "ranking": [s.to_dict() for s in ranked], + "n_estimators_evaluated": len(scores), + } + + def _score_task_match( + self, + estimator_name: str, + expected_task: str, + ) -> float: + """Check if estimator matches expected task type.""" + try: + node = self._registry.get_estimator_by_name(estimator_name) + if node is None: + return 0.0 + return 1.0 if node.task == expected_task else 0.0 + except Exception as e: + logger.warning(f"Could not check task match for {estimator_name}: {e}") + return 0.0 + + def _score_performance( + self, + cv_results: list[dict], + ) -> tuple[float | None, float]: + """ + Extract MAPE from cv_results and normalize to 0-1 score. + + Lower MAPE = higher score. + MAPE of 0 = score 1.0 + MAPE of 100% or more = score 0.0 + """ + if not cv_results: + return None, 0.0 + + mape_values = [] + for fold in cv_results: + for key, val in fold.items(): + if "MAPE" in key or "mape" in key: + try: + mape_values.append(float(val)) + except (TypeError, ValueError): + pass + + if not mape_values: + return None, 0.0 + + avg_mape = sum(mape_values) / len(mape_values) + # Normalize: MAPE=0 -> score=1.0, MAPE>=1.0 (100%) -> score=0.0 + performance_score = max(0.0, 1.0 - avg_mape) + + return avg_mape, performance_score \ No newline at end of file diff --git a/src/sktime_mcp/benchmark/tasks.py b/src/sktime_mcp/benchmark/tasks.py new file mode 100644 index 00000000..2a1b6508 --- /dev/null +++ b/src/sktime_mcp/benchmark/tasks.py @@ -0,0 +1,136 @@ +""" +Task definitions for sktime-mcp benchmark suite. + +Tasks are loaded from YAML files in the tasks/ directory. +Contributors can add new tasks by adding a new YAML file. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml + +logger = logging.getLogger(__name__) + +TASKS_DIR = Path(__file__).parent / "tasks" + + +@dataclass +class BenchmarkTask: + """ + A single benchmark task definition. + + Attributes + ---------- + name : str + Unique task identifier + dataset : str + Demo dataset name (e.g. "airline", "sunspots") + horizon : int + Forecast horizon + expected_task : str + Expected sktime task type (e.g. "forecasting") + valid_estimators : list[str] + List of estimators considered correct for this task + valid_pipelines : list[list[str]] + List of valid pipeline compositions for this task + metric : str + Evaluation metric name + difficulty : str + Task difficulty: "beginner", "intermediate", "advanced" + description : str + Human readable description + tags : dict + Optional extra metadata + """ + + name: str + dataset: str + horizon: int + expected_task: str + valid_estimators: list[str] + valid_pipelines: list[list[str]] = field(default_factory=list) + metric: str = "MeanAbsolutePercentageError" + difficulty: str = "beginner" + description: str = "" + tags: dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "BenchmarkTask": + """Load a BenchmarkTask from a dictionary.""" + return cls( + name=data["name"], + dataset=data["dataset"], + horizon=data["horizon"], + expected_task=data["expected_task"], + valid_estimators=data["valid_estimators"], + valid_pipelines=data.get("valid_pipelines", []), + metric=data.get("metric", "MeanAbsolutePercentageError"), + difficulty=data.get("difficulty", "beginner"), + description=data.get("description", ""), + tags=data.get("tags", {}), + ) + + @classmethod + def from_yaml(cls, path: Path) -> "BenchmarkTask": + """Load a BenchmarkTask from a YAML file.""" + with open(path) as f: + data = yaml.safe_load(f) + return cls.from_dict(data) + + +def load_all_tasks(tasks_dir: Path | None = None) -> list[BenchmarkTask]: + """ + Load all benchmark tasks from the tasks directory. + + Parameters + ---------- + tasks_dir : Path, optional + Directory to load tasks from. Defaults to built-in tasks dir. + + Returns + ------- + list[BenchmarkTask] + List of loaded benchmark tasks + """ + tasks_dir = tasks_dir or TASKS_DIR + tasks = [] + + for yaml_file in sorted(tasks_dir.glob("*.yaml")): + try: + task = BenchmarkTask.from_yaml(yaml_file) + tasks.append(task) + logger.info(f"Loaded task: {task.name}") + except Exception as e: + logger.warning(f"Failed to load task from {yaml_file}: {e}") + + return tasks + + +def load_task(name: str, tasks_dir: Path | None = None) -> BenchmarkTask | None: + """ + Load a single task by name. + + Parameters + ---------- + name : str + Task name to load + tasks_dir : Path, optional + Directory to search in + + Returns + ------- + BenchmarkTask or None + """ + tasks_dir = tasks_dir or TASKS_DIR + yaml_file = tasks_dir / f"{name}.yaml" + + if not yaml_file.exists(): + logger.warning(f"Task file not found: {yaml_file}") + return None + + return BenchmarkTask.from_yaml(yaml_file) \ No newline at end of file diff --git a/src/sktime_mcp/benchmark/tasks/airline.yaml b/src/sktime_mcp/benchmark/tasks/airline.yaml new file mode 100644 index 00000000..b9f0e2f2 --- /dev/null +++ b/src/sktime_mcp/benchmark/tasks/airline.yaml @@ -0,0 +1,21 @@ +name: airline_forecast +dataset: airline +horizon: 12 +expected_task: forecasting +valid_estimators: + - NaiveForecaster + - ExponentialSmoothing + - AutoETS + - ARIMA +valid_pipelines: + - [NaiveForecaster] + - [ExponentialSmoothing] + - [Detrend, NaiveForecaster] + - [Deseasonalize, NaiveForecaster] +metric: MeanAbsolutePercentageError +difficulty: beginner +description: "Classic airline passengers dataset. Seasonal monthly data, 1949-1960." +tags: + seasonal: true + frequency: monthly + n_timepoints: 144 \ No newline at end of file diff --git a/src/sktime_mcp/benchmark/tasks/sunspots.yaml b/src/sktime_mcp/benchmark/tasks/sunspots.yaml new file mode 100644 index 00000000..78766b94 --- /dev/null +++ b/src/sktime_mcp/benchmark/tasks/sunspots.yaml @@ -0,0 +1,19 @@ +name: sunspots_forecast +dataset: sunspots +horizon: 24 +expected_task: forecasting +valid_estimators: + - NaiveForecaster + - ARIMA + - AutoARIMA +valid_pipelines: + - [NaiveForecaster] + - [ARIMA] + - [Detrend, ARIMA] +metric: MeanAbsolutePercentageError +difficulty: intermediate +description: "Sunspots dataset. Long yearly cycle, non-linear patterns." +tags: + seasonal: true + frequency: yearly + n_timepoints: 2820 \ No newline at end of file diff --git a/tests/test_benchmark.py b/tests/test_benchmark.py new file mode 100644 index 00000000..f092ac97 --- /dev/null +++ b/tests/test_benchmark.py @@ -0,0 +1,128 @@ +""" +Tests for sktime-mcp benchmark suite. +""" + +from __future__ import annotations + +import pytest + +from sktime_mcp.benchmark.runner import BenchmarkRunner, TaskResult +from sktime_mcp.benchmark.scorer import BenchmarkScorer +from sktime_mcp.benchmark.tasks import BenchmarkTask, load_all_tasks + + +class TestBenchmarkTasks: + """Tests for task loading.""" + + def test_load_all_tasks(self): + tasks = load_all_tasks() + assert len(tasks) > 0 + + def test_task_fields(self): + tasks = load_all_tasks() + task = tasks[0] + assert task.name + assert task.dataset + assert task.horizon > 0 + assert task.expected_task + assert len(task.valid_estimators) > 0 + + def test_task_from_dict(self): + data = { + "name": "test_task", + "dataset": "airline", + "horizon": 12, + "expected_task": "forecasting", + "valid_estimators": ["NaiveForecaster"], + } + task = BenchmarkTask.from_dict(data) + assert task.name == "test_task" + assert task.difficulty == "beginner" # default + + +class TestBenchmarkRunner: + """Tests for the benchmark runner.""" + + def test_run_single_task(self): + runner = BenchmarkRunner() + task = BenchmarkTask.from_dict({ + "name": "test_airline", + "dataset": "airline", + "horizon": 12, + "expected_task": "forecasting", + "valid_estimators": ["NaiveForecaster"], + }) + result = runner.run_task(task, "NaiveForecaster") + assert isinstance(result, TaskResult) + assert result.task_name == "test_airline" + assert result.estimator_name == "NaiveForecaster" + assert result.fit_predict_success + + def test_run_invalid_estimator(self): + runner = BenchmarkRunner() + task = BenchmarkTask.from_dict({ + "name": "test_invalid", + "dataset": "airline", + "horizon": 12, + "expected_task": "forecasting", + "valid_estimators": ["NonExistentEstimator999"], + }) + result = runner.run_task(task, "NonExistentEstimator999") + assert isinstance(result, TaskResult) + assert len(result.errors) > 0 + + def test_pipeline_validation(self): + runner = BenchmarkRunner() + task = BenchmarkTask.from_dict({ + "name": "test_pipeline", + "dataset": "airline", + "horizon": 12, + "expected_task": "forecasting", + "valid_estimators": ["NaiveForecaster"], + }) + result = runner.run_task( + task, + "NaiveForecaster", + pipeline=["NaiveForecaster"] + ) + assert result.pipeline_valid + + +class TestBenchmarkScorer: + """Tests for the benchmark scorer.""" + + def test_score_result(self): + runner = BenchmarkRunner() + scorer = BenchmarkScorer() + task = BenchmarkTask.from_dict({ + "name": "test_score", + "dataset": "airline", + "horizon": 12, + "expected_task": "forecasting", + "valid_estimators": ["NaiveForecaster"], + }) + result = runner.run_task(task, "NaiveForecaster") + score = scorer.score_result(result, task) + + assert score.overall_score >= 0.0 + assert score.overall_score <= 1.0 + assert score.task_match_score == 1.0 + assert score.pipeline_validity_score == 1.0 + + def test_summary(self): + runner = BenchmarkRunner() + scorer = BenchmarkScorer() + task = BenchmarkTask.from_dict({ + "name": "test_summary", + "dataset": "airline", + "horizon": 12, + "expected_task": "forecasting", + "valid_estimators": ["NaiveForecaster", "AutoETS"], + }) + results = runner.run_task_all_estimators(task) + scores = scorer.score_results(results, task) + summary = scorer.summary(scores) + + assert "best_estimator" in summary + assert "ranking" in summary + assert summary["n_estimators_evaluated"] == 2 \ No newline at end of file