diff --git a/examples/evals/README.md b/examples/evals/README.md new file mode 100644 index 000000000..cf1371f6a --- /dev/null +++ b/examples/evals/README.md @@ -0,0 +1,392 @@ +# Arcade Evals Examples + +This directory contains user-friendly examples demonstrating how to evaluate tools from different sources using the Arcade evals framework. + +## π Table of Contents + +- [Quick Start](#quick-start) +- [Example Files](#example-files) +- [CLI Reference](#cli-reference) +- [Common Patterns](#common-patterns) +- [Troubleshooting](#troubleshooting) + +## π Quick Start + +### What Makes These Examples Different + +These examples are designed to be: +- **Production-ready**: Include proper error handling and timeouts +- **Copy-paste friendly**: Clear configuration sections you can modify +- **Informative**: Print status messages during loading +- **Focused**: One concept per example, no unnecessary complexity +- **Pattern-based**: Follow consistent structure from real-world evals + +### Installation + +```bash +# Install with evals support +pip install 'arcade-mcp[evals]' + +# Or using uv (recommended) +uv tool install 'arcade-mcp[evals]' +``` + +### Basic Usage + +```bash +# Run an evaluation with OpenAI +arcade evals examples/evals/eval_arcade_gateway.py \ + --api-key openai:YOUR_OPENAI_KEY + +# Compare multiple models +arcade evals examples/evals/eval_stdio_mcp_server.py \ + -p "openai:gpt-4o anthropic:claude-sonnet-4-5-20250929" \ + -k openai:YOUR_OPENAI_KEY \ + -k anthropic:YOUR_ANTHROPIC_KEY + +# Output results to HTML +arcade evals examples/evals/eval_http_mcp_server.py \ + --api-key openai:YOUR_KEY \ + -o results.html -d +``` + +## π Example Files + +### Example Structure + +All examples follow a consistent pattern: + +```python +# 1. Configuration section - Update these values +ARCADE_API_KEY = os.environ.get("ARCADE_API_KEY", "YOUR_KEY_HERE") + +# 2. Eval suite with async loading +@tool_eval() +async def eval_my_suite() -> EvalSuite: + suite = EvalSuite(name="...", system_message="...", rubric=...) + + # 3. Load tools with timeout and error handling + try: + await asyncio.wait_for( + suite.add_arcade_gateway(...), + timeout=10.0, + ) + print(" β Source loaded") + except Exception as e: + print(f" β Source failed: {e}") + return suite + + # 4. Add test cases + suite.add_case(name="...", user_message="...", ...) + + return suite +``` + +This pattern ensures: +- Clear configuration at the top +- Robust error handling +- Informative output during loading +- Graceful degradation if sources fail + +### 1. `eval_arcade_gateway.py` + +Evaluates tools from Arcade Gateway (cloud-hosted toolkits). + +**What it demonstrates:** + +- Async loading from Arcade Gateway with timeout handling +- Error handling for connection failures +- Math toolkit evaluations +- BinaryCritic for parameter validation +- Conversational context with additional_messages + +**Prerequisites:** + +Before running this example, you need to set up an MCP Gateway: + +1. **Get your API key** - [API Keys Setup Guide](https://docs.arcade.dev/en/get-started/setup/api-keys) +2. **Create an MCP Gateway** at [Arcade Portal](https://portal.arcade.dev) +3. **Add toolkits** (e.g., Math, GitHub, Slack) to your gateway +4. **Get your credentials:** + - `ARCADE_API_KEY` - Your Arcade API key + - `ARCADE_USER_ID` - Your user ID (found in portal settings) + +π **Full setup guide:** [MCP Gateways Documentation](https://docs.arcade.dev/en/guides/create-tools/mcp-gateways) + +**Requirements:** + +- Arcade API key (get one at [arcade.dev](https://arcade.dev)) +- LLM API key (OpenAI or Anthropic) + +**Run it:** + +```bash +# Set your Arcade API key +export ARCADE_API_KEY=your_arcade_key + +arcade evals examples/evals/eval_arcade_gateway.py \ + --api-key openai:YOUR_OPENAI_KEY +``` + +### 2. `eval_stdio_mcp_server.py` + +Evaluates tools from local MCP servers running via stdio (subprocess). + +**What it demonstrates:** + +- Loading from local stdio MCP servers (subprocesses) +- Using `add_mcp_stdio_server()` method +- Setting environment variables (PYTHONUNBUFFERED) +- Simple echo tool evaluations +- Async loading with timeout and error handling + +**Requirements:** + +- Local MCP server code +- Server dependencies installed +- LLM API key + +**Run it:** + +```bash +arcade evals examples/evals/eval_stdio_mcp_server.py \ + --api-key openai:YOUR_KEY +``` + +### 3. `eval_http_mcp_server.py` + +Evaluates tools from remote MCP servers via HTTP or SSE. + +**What it demonstrates:** + +- Connecting to HTTP MCP endpoints +- Using SSE (Server-Sent Events) transport +- Authentication with Bearer tokens +- Error handling with timeouts + +**Requirements:** + +- Running HTTP/SSE MCP server +- Network connectivity +- LLM API key +- (Optional) Authentication token + +**Run it:** + +```bash +# Update the configuration in the file first, then run: +arcade evals examples/evals/eval_http_mcp_server.py \ + --api-key openai:YOUR_KEY +``` + +### 4. `eval_comprehensive_comparison.py` + +Compares tool performance across multiple sources simultaneously. + +**What it demonstrates:** + +- Comparative evaluation across different tool sources +- Loading from multiple sources (Gateway, stdio, dict) +- Track-based evaluation (comparing same task across sources) +- Conditional test cases based on loaded sources +- Using SimilarityCritic for fuzzy matching + +**Requirements:** + +- Arcade API key (for Gateway) +- LLM API key +- (Optional) Local simple MCP server + +**Run it:** + +```bash +# Set environment variables +export ARCADE_API_KEY=your_key +export ARCADE_USER_ID=your_user_id + +arcade evals examples/evals/eval_comprehensive_comparison.py \ + -p "openai:gpt-4o anthropic:claude-sonnet-4-5-20250929" \ + -k openai:YOUR_KEY \ + -k anthropic:YOUR_KEY \ + -o comparison.html -d +``` + +## π― CLI Reference + +### New v2.0.0 Flags + + +| Flag | Short | Description | Example | +| --------------------- | ------- | -------------------------------------------------- | ------------------------------------------------- | +| `--use-provider` | `-p` | Provider(s) and models (space-separated) | `-p "openai:gpt-4o anthropic:claude-sonnet"` | +| `--api-key` | `-k` | API key in`provider:key` format (repeatable) | `-k openai:sk-... -k anthropic:sk-ant-...` | +| `--output` | `-o` | Output file (auto-detects format from extension) | `-o results.html` or `-o results` (all formats) | +| `--only-failed` | `-f` | Show only failed evaluations | `--only-failed` | +| `--include-context` | | Include system messages and conversation history | `--include-context` | +| `--details` | `-d` | Show detailed output | `-d` | +| `--max-concurrent` | | Max concurrent evaluations | `--max-concurrent 5` | +| `--capture` | | Capture mode (record tool calls without scoring) | `--capture` | + +### Provider & Model Selection + +**Single provider with default model:** + +```bash +arcade evals eval_file.py -p openai -k openai:YOUR_KEY +``` + +**Single provider with specific models:** + +```bash +arcade evals eval_file.py -p "openai:gpt-4o,gpt-4o-mini" -k openai:YOUR_KEY +``` + +**Multiple providers (space-separated):** + +```bash +arcade evals eval_file.py \ + -p "openai:gpt-4o anthropic:claude-sonnet-4-5-20250929" \ + -k openai:YOUR_KEY \ + -k anthropic:YOUR_KEY +``` + +### Output Formats + +**Auto-detect from extension:** + +```bash +-o results.html # HTML output +-o results.json # JSON output +-o results.md # Markdown output +-o results.txt # Text output +``` + +**Multiple formats:** + +```bash +-o results.html -o results.json # Both HTML and JSON +``` + +**All formats:** + +```bash +-o results # Generates results.txt, results.md, results.html, results.json +``` + +## π§ Common Patterns + +### Pattern 1: Compare OpenAI Models + +```bash +arcade evals examples/evals/eval_arcade_gateway.py \ + -p "openai:gpt-4o,gpt-4o-mini,gpt-3.5-turbo" \ + -k openai:YOUR_KEY \ + -o comparison.html -d +``` + +### Pattern 2: OpenAI vs Anthropic + +```bash +arcade evals examples/evals/eval_stdio_mcp_server.py \ + -p "openai:gpt-4o anthropic:claude-sonnet-4-5-20250929" \ + -k openai:YOUR_OPENAI_KEY \ + -k anthropic:YOUR_ANTHROPIC_KEY \ + -o battle.html -d +``` + +### Pattern 3: Failed Tests Only + +```bash +arcade evals examples/evals/eval_http_mcp_server.py \ + --api-key openai:YOUR_KEY \ + --only-failed -d +``` + +### Pattern 4: Comparative Evaluation + +```bash +# Compare performance across multiple tool sources +arcade evals examples/evals/eval_comprehensive_comparison.py \ + -p "openai:gpt-4o anthropic:claude-sonnet-4-5-20250929" \ + -k openai:YOUR_KEY \ + -k anthropic:YOUR_KEY \ + -o comparison.html -d +``` + +### Pattern 5: Capture Mode (No Scoring) + +```bash +# Record tool calls without evaluation +arcade evals examples/evals/eval_arcade_gateway.py \ + --capture \ + --api-key openai:YOUR_KEY \ + -o captured.json +``` + +### Pattern 6: Full Context Output + +```bash +arcade evals examples/evals/eval_stdio_mcp_server.py \ + --api-key openai:YOUR_KEY \ + --include-context \ + -o full_results.html -d +``` + +## π Troubleshooting + +### Error: "No module named 'openai'" + +**Solution:** Install evals dependencies: + +```bash +pip install 'arcade-mcp[evals]' +``` + +### Error: "API key not found for provider 'openai'" + +**Solution:** Provide API key via flag or environment variable: + +```bash +# Via flag +arcade evals eval_file.py --api-key openai:YOUR_KEY + +# Via environment variable +export OPENAI_API_KEY=your_key +arcade evals eval_file.py +``` + +### Error: "Connection refused" (HTTP server) + +**Solution:** Ensure your HTTP MCP server is running: + +```bash +# Check if server is running +curl http://localhost:8000/mcp + +# Start your server first +python server.py +``` + +### Error: "Module not found" (stdio server) + +**Solution:** Install server dependencies: + +```bash +cd examples/mcp_servers/simple +uv sync +``` + +### Evals run but all tests fail + +**Possible causes:** + +1. Wrong tool names - check your server's tool definitions +2. Incorrect argument names - verify expected vs actual +3. Server not responding - check server logs +4. API key issues - verify LLM provider keys + +**Debug with verbose output:** + +```bash +arcade evals eval_file.py --api-key openai:YOUR_KEY -d +``` diff --git a/examples/evals/eval_arcade_gateway.py b/examples/evals/eval_arcade_gateway.py new file mode 100644 index 000000000..1126e7aa3 --- /dev/null +++ b/examples/evals/eval_arcade_gateway.py @@ -0,0 +1,135 @@ +"""Arcade Gateway evaluation - Loading tools from cloud-hosted toolkits. + +This example demonstrates loading and evaluating tools from Arcade Gateway, +which provides access to pre-built toolkits (Math, GitHub, Slack, Linear, etc.). + +Prerequisites: + 1. Get your API key: https://docs.arcade.dev/en/get-started/setup/api-keys + 2. Create an MCP Gateway at https://portal.arcade.dev + 3. Add toolkits to your gateway (e.g., Math, GitHub, Slack) + 4. Get your ARCADE_API_KEY and ARCADE_USER_ID from the portal + + Full setup guide: https://docs.arcade.dev/en/guides/create-tools/mcp-gateways + +Run: + # Set environment variables + export ARCADE_API_KEY=your_arcade_key + export ARCADE_USER_ID=your_user_id + + # Run the evaluation + arcade evals examples/evals/eval_arcade_gateway.py \\ + -p openai:gpt-4o \\ + -k openai:YOUR_KEY \\ + -o results.html -d +""" + +import asyncio +import os + +from arcade_evals import ( + BinaryCritic, + EvalRubric, + EvalSuite, + ExpectedMCPToolCall, + tool_eval, +) + +# ============================================================================= +# CONFIGURATION +# ============================================================================= + +ARCADE_API_KEY = os.environ.get("ARCADE_API_KEY", "YOUR_ARCADE_API_KEY_HERE") +ARCADE_USER_ID = os.environ.get("ARCADE_USER_ID", "YOUR_USER_ID_HERE") + +default_rubric = EvalRubric( + fail_threshold=0.7, + warn_threshold=0.9, +) + + +# ============================================================================= +# EVAL SUITE +# ============================================================================= + + +@tool_eval() +async def eval_arcade_gateway() -> EvalSuite: + """Evaluate Math toolkit from Arcade Gateway.""" + suite = EvalSuite( + name="Arcade Gateway - Math Toolkit", + system_message="You are a helpful math assistant. Use tools to perform calculations.", + rubric=default_rubric, + ) + + print("\n Loading Arcade Gateway...") + + try: + await asyncio.wait_for( + suite.add_arcade_gateway( + gateway_slug="Math", + arcade_api_key=ARCADE_API_KEY, + arcade_user_id=ARCADE_USER_ID, + ), + timeout=10.0, + ) + print(" β Arcade Gateway (Math toolkit)") + except asyncio.TimeoutError: + print(" β Arcade Gateway - timeout") + return suite + except Exception as e: + print(f" β Arcade Gateway - {type(e).__name__}: {e}") + return suite + + # Test Case 1: Simple addition + suite.add_case( + name="Simple addition - 10 + 5", + user_message="What is 10 plus 5?", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="Math_Add", + args={"a": 10, "b": 5}, + ) + ], + critics=[ + BinaryCritic(critic_field="a", weight=0.5), + BinaryCritic(critic_field="b", weight=0.5), + ], + ) + + # Test Case 2: Larger numbers + suite.add_case( + name="Addition - 123 + 456", + user_message="Calculate 123 + 456", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="Math_Add", + args={"a": 123, "b": 456}, + ) + ], + critics=[ + BinaryCritic(critic_field="a", weight=0.5), + BinaryCritic(critic_field="b", weight=0.5), + ], + ) + + # Test Case 3: Conversational context + suite.add_case( + name="Addition with context", + user_message="Now add them together", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="Math_Add", + args={"a": 50, "b": 25}, + ) + ], + critics=[ + BinaryCritic(critic_field="a", weight=0.5), + BinaryCritic(critic_field="b", weight=0.5), + ], + additional_messages=[ + {"role": "user", "content": "I have two numbers: 50 and 25"}, + {"role": "assistant", "content": "Great! I'll remember those numbers."}, + ], + ) + + return suite diff --git a/examples/evals/eval_comprehensive_comparison.py b/examples/evals/eval_comprehensive_comparison.py new file mode 100644 index 000000000..6f85380ef --- /dev/null +++ b/examples/evals/eval_comprehensive_comparison.py @@ -0,0 +1,229 @@ +"""Comprehensive comparison across multiple tool sources. + +This example demonstrates comparative evaluations across different sources: +- Arcade Gateway (cloud toolkits) +- Local stdio MCP servers +- Dict-based tool definitions (baseline) + +Run: + arcade evals examples/evals/eval_comprehensive_comparison.py \\ + -p "openai:gpt-4o anthropic:claude-sonnet-4-5-20250929" \\ + -k openai:YOUR_KEY -k anthropic:YOUR_KEY \\ + -o comparison.html -d +""" + +import asyncio +import os + +from arcade_evals import ( + BinaryCritic, + EvalRubric, + EvalSuite, + ExpectedMCPToolCall, + MCPToolDefinition, + SimilarityCritic, + tool_eval, +) + +# ============================================================================= +# CONFIGURATION +# ============================================================================= + +ARCADE_API_KEY = os.environ.get("ARCADE_API_KEY", "YOUR_ARCADE_API_KEY_HERE") +ARCADE_USER_ID = os.environ.get("ARCADE_USER_ID", "YOUR_USER_ID_HERE") + +EXAMPLES_DIR = os.path.dirname(os.path.dirname(__file__)) +SIMPLE_SERVER_PATH = os.path.join(EXAMPLES_DIR, "mcp_servers", "simple") + +SIMPLE_SERVER_COMMAND = [ + "uv", + "run", + "--directory", + SIMPLE_SERVER_PATH, + "simple", +] + +# Baseline dict tool (for comparison) +DICT_SEARCH: MCPToolDefinition = { + "name": "search", + "description": "Search for information", + "inputSchema": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search query"}, + }, + "required": ["query"], + }, +} + +default_rubric = EvalRubric( + fail_threshold=0.6, + warn_threshold=0.8, + fail_on_tool_selection=False, +) + + +# ============================================================================= +# EVAL SUITE +# ============================================================================= + + +@tool_eval() +async def eval_comprehensive_comparison() -> EvalSuite: + """Compare tool performance across multiple sources.""" + suite = EvalSuite( + name="Multi-Source Comparative Evaluation", + system_message="You are a helpful assistant with various tools available.", + rubric=default_rubric, + ) + + loaded_tracks: list[str] = [] + + # Always add baseline dict tools + suite.add_tool_definitions([DICT_SEARCH], track="dict_baseline") + loaded_tracks.append("dict_baseline") + + print("\n Loading tool sources...") + + # Load from Arcade Gateway + try: + print(" β Loading Arcade Gateway (Math)...") + await asyncio.wait_for( + suite.add_arcade_gateway( + gateway_slug="Math", + arcade_api_key=ARCADE_API_KEY, + arcade_user_id=ARCADE_USER_ID, + track="arcade_gateway", + ), + timeout=10.0, + ) + loaded_tracks.append("arcade_gateway") + print(" β Arcade Gateway") + except asyncio.TimeoutError: + print(" β Arcade Gateway - timeout") + except Exception as e: + print(f" β Arcade Gateway - {type(e).__name__}: {e}") + + # Load from stdio MCP server + try: + print(" β Loading stdio MCP server (simple)...") + await asyncio.wait_for( + suite.add_mcp_stdio_server( + command=SIMPLE_SERVER_COMMAND, + env={"PYTHONUNBUFFERED": "1"}, + track="stdio_simple", + ), + timeout=15.0, + ) + loaded_tracks.append("stdio_simple") + print(" β Stdio MCP server") + except asyncio.TimeoutError: + print(" β Stdio MCP server - timeout") + except Exception as e: + print(f" β Stdio MCP server - {type(e).__name__}: {e}") + + print(f"\n Loaded tracks: {loaded_tracks}\n") + + # ========================================================================= + # TEST CASE 1: Math operation (Arcade Gateway vs baseline) + # ========================================================================= + + if "arcade_gateway" in loaded_tracks: + case1 = suite.add_comparative_case( + name="Math addition - Gateway vs Baseline", + user_message="What is 15 plus 27?", + ) + case1.for_track( + "arcade_gateway", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="Math_Add", + args={"a": 15, "b": 27}, + ) + ], + critics=[ + BinaryCritic(critic_field="a", weight=0.5), + BinaryCritic(critic_field="b", weight=0.5), + ], + ) + case1.for_track( + "dict_baseline", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="search", + args={"query": "15 plus 27"}, + ) + ], + critics=[SimilarityCritic(critic_field="query", weight=1.0, similarity_threshold=0.3)], + ) + + # ========================================================================= + # TEST CASE 2: Echo operation (stdio vs baseline) + # ========================================================================= + + if "stdio_simple" in loaded_tracks: + case2 = suite.add_comparative_case( + name="Echo message - Stdio vs Baseline", + user_message="Echo 'Hello World'", + ) + case2.for_track( + "stdio_simple", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="echo", + args={"message": "Hello World"}, + ) + ], + critics=[ + BinaryCritic(critic_field="message", weight=1.0), + ], + ) + case2.for_track( + "dict_baseline", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="search", + args={"query": "Hello World"}, + ) + ], + critics=[SimilarityCritic(critic_field="query", weight=1.0, similarity_threshold=0.5)], + ) + + # ========================================================================= + # TEST CASE 3: Conversational context + # ========================================================================= + + if "arcade_gateway" in loaded_tracks: + case3 = suite.add_comparative_case( + name="Math with context", + user_message="Now add them together", + additional_messages=[ + {"role": "user", "content": "I have two numbers: 50 and 25"}, + {"role": "assistant", "content": "I'll remember those numbers."}, + ], + ) + case3.for_track( + "arcade_gateway", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="Math_Add", + args={"a": 50, "b": 25}, + ) + ], + critics=[ + BinaryCritic(critic_field="a", weight=0.5), + BinaryCritic(critic_field="b", weight=0.5), + ], + ) + case3.for_track( + "dict_baseline", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="search", + args={"query": "50 plus 25"}, + ) + ], + critics=[SimilarityCritic(critic_field="query", weight=1.0, similarity_threshold=0.3)], + ) + + return suite diff --git a/examples/evals/eval_http_mcp_server.py b/examples/evals/eval_http_mcp_server.py new file mode 100644 index 000000000..b70952cc1 --- /dev/null +++ b/examples/evals/eval_http_mcp_server.py @@ -0,0 +1,142 @@ +"""Remote HTTP/SSE MCP server evaluation. + +This example demonstrates loading and evaluating tools from remote MCP servers +accessible via HTTP or Server-Sent Events (SSE). + +NOTE: This requires a running HTTP MCP server. Update the configuration below +with your server details. + +Run: + arcade evals examples/evals/eval_http_mcp_server.py \\ + -p openai:gpt-4o \\ + -k openai:YOUR_KEY \\ + -o results.html -d +""" + +import asyncio +import os + +from arcade_evals import ( + BinaryCritic, + EvalRubric, + EvalSuite, + ExpectedMCPToolCall, + tool_eval, +) + +# ============================================================================= +# CONFIGURATION - Update these for your HTTP MCP server +# ============================================================================= + +# Example: GitHub Copilot MCP (requires GitHub token) +HTTP_MCP_URL = os.environ.get("MCP_SERVER_URL", "https://api.githubcopilot.com/mcp/") +HTTP_MCP_TOKEN = os.environ.get("GITHUB_PAT", "YOUR_GITHUB_TOKEN_HERE") + +# Example: SSE-based MCP server +SSE_MCP_URL = os.environ.get("SSE_MCP_URL", "https://mcp.example.com/sse") + +default_rubric = EvalRubric( + fail_threshold=0.7, + warn_threshold=0.9, +) + + +# ============================================================================= +# EVAL SUITE - HTTP MCP Server +# ============================================================================= + + +@tool_eval() +async def eval_http_mcp_server() -> EvalSuite: + """Evaluate tools from HTTP MCP server.""" + suite = EvalSuite( + name="HTTP MCP Server Evaluation", + system_message="You are a helpful assistant with access to remote tools.", + rubric=default_rubric, + ) + + print("\n Loading HTTP MCP server...") + + try: + await asyncio.wait_for( + suite.add_mcp_server( + url=HTTP_MCP_URL, + headers={"Authorization": f"Bearer {HTTP_MCP_TOKEN}"}, + use_sse=False, # Use HTTP streaming + ), + timeout=15.0, + ) + print(" β HTTP MCP server") + except asyncio.TimeoutError: + print(" β HTTP MCP server - timeout") + return suite + except Exception as e: + print(f" β HTTP MCP server - {type(e).__name__}: {e}") + return suite + + # Add test cases based on your server's tools + # Example: If your server has an echo tool + suite.add_case( + name="HTTP server tool call", + user_message="Echo 'Hello from HTTP'", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="echo", # Adjust to match your server's tool names + args={"message": "Hello from HTTP"}, + ) + ], + critics=[ + BinaryCritic(critic_field="message", weight=1.0), + ], + ) + + return suite + + +# ============================================================================= +# EVAL SUITE - SSE MCP Server +# ============================================================================= + + +@tool_eval() +async def eval_sse_mcp_server() -> EvalSuite: + """Evaluate tools from SSE MCP server.""" + suite = EvalSuite( + name="SSE MCP Server Evaluation", + system_message="You are a helpful assistant with access to SSE-connected tools.", + rubric=default_rubric, + ) + + print("\n Loading SSE MCP server...") + + try: + await asyncio.wait_for( + suite.add_mcp_server( + url=SSE_MCP_URL, + use_sse=True, # Use SSE transport + headers={"Accept": "text/event-stream"}, + ), + timeout=15.0, + ) + print(" β SSE MCP server") + except asyncio.TimeoutError: + print(" β SSE MCP server - timeout") + return suite + except Exception as e: + print(f" β SSE MCP server - {type(e).__name__}: {e}") + return suite + + # Add test cases for your SSE server's tools + suite.add_case( + name="SSE server tool call", + user_message="Get status", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="get_status", # Adjust to match your server's tools + args={}, + ) + ], + critics=[], + ) + + return suite diff --git a/examples/evals/eval_stdio_mcp_server.py b/examples/evals/eval_stdio_mcp_server.py new file mode 100644 index 000000000..30ad12c81 --- /dev/null +++ b/examples/evals/eval_stdio_mcp_server.py @@ -0,0 +1,124 @@ +"""Local stdio MCP server evaluation. + +This example demonstrates loading and evaluating tools from a local MCP server +running as a subprocess via stdio (standard input/output). + +Run: + arcade evals examples/evals/eval_stdio_mcp_server.py \\ + -p openai:gpt-4o \\ + -k openai:YOUR_KEY \\ + -o results.html -d +""" + +import asyncio +import os + +from arcade_evals import ( + BinaryCritic, + EvalRubric, + EvalSuite, + ExpectedMCPToolCall, + tool_eval, +) + +# ============================================================================= +# CONFIGURATION +# ============================================================================= + +# Path to the simple echo server +EXAMPLES_DIR = os.path.dirname(os.path.dirname(__file__)) +SIMPLE_SERVER_PATH = os.path.join(EXAMPLES_DIR, "mcp_servers", "simple") + +# Stdio server command +SIMPLE_SERVER_COMMAND = [ + "uv", + "run", + "--directory", + SIMPLE_SERVER_PATH, + "simple", +] + +default_rubric = EvalRubric( + fail_threshold=0.7, + warn_threshold=0.9, +) + + +# ============================================================================= +# EVAL SUITE +# ============================================================================= + + +@tool_eval() +async def eval_stdio_simple_server() -> EvalSuite: + """Evaluate simple echo server via stdio.""" + suite = EvalSuite( + name="Stdio MCP Server - Simple Echo", + system_message="You are a helpful assistant that can echo messages.", + rubric=default_rubric, + ) + + print("\n Loading stdio MCP server (simple)...") + + try: + await asyncio.wait_for( + suite.add_mcp_stdio_server( + command=SIMPLE_SERVER_COMMAND, + env={"PYTHONUNBUFFERED": "1"}, + ), + timeout=15.0, + ) + print(" β Simple MCP server (stdio)") + except asyncio.TimeoutError: + print(" β Simple MCP server (stdio) - timeout") + return suite + except Exception as e: + print(f" β Simple MCP server (stdio) - {type(e).__name__}: {e}") + return suite + + # Test Case 1: Simple echo + suite.add_case( + name="Echo - Hello", + user_message="Echo the word 'Hello'", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="echo", + args={"message": "Hello"}, + ) + ], + critics=[ + BinaryCritic(critic_field="message", weight=1.0), + ], + ) + + # Test Case 2: Echo with punctuation + suite.add_case( + name="Echo - Hello, World!", + user_message="Echo this: Hello, World!", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="echo", + args={"message": "Hello, World!"}, + ) + ], + critics=[ + BinaryCritic(critic_field="message", weight=1.0), + ], + ) + + # Test Case 3: Echo longer phrase + suite.add_case( + name="Echo - Longer phrase", + user_message="Please echo: The quick brown fox", + expected_tool_calls=[ + ExpectedMCPToolCall( + tool_name="echo", + args={"message": "The quick brown fox"}, + ) + ], + critics=[ + BinaryCritic(critic_field="message", weight=1.0), + ], + ) + + return suite diff --git a/libs/arcade-cli/arcade_cli/display.py b/libs/arcade-cli/arcade_cli/display.py index fae74b5e2..bfc760fe1 100644 --- a/libs/arcade-cli/arcade_cli/display.py +++ b/libs/arcade-cli/arcade_cli/display.py @@ -1,4 +1,5 @@ -from typing import TYPE_CHECKING, Any +from pathlib import Path +from typing import TYPE_CHECKING, Any, Optional from arcade_core.schema import ToolDefinition from rich.console import Console @@ -323,14 +324,14 @@ def display_tool_messages(tool_messages: list[dict]) -> None: ) -def display_eval_results(results: list[list[dict[str, Any]]], show_details: bool = False) -> None: - """ - Display evaluation results in a format inspired by pytest's output. - - Args: - results: List of dictionaries containing evaluation results for each model. - show_details: Whether to show detailed results for each case. - """ +def _display_results_to_console( + output_console: Console, + results: list[list[dict[str, Any]]], + show_details: bool = False, + failed_only: bool = False, + original_counts: Optional[tuple[int, int, int, int]] = None, +) -> None: + """Display evaluation results to a Rich console.""" total_passed = 0 total_failed = 0 total_warned = 0 @@ -343,9 +344,9 @@ def display_eval_results(results: list[list[dict[str, Any]]], show_details: bool cases = model_results.get("cases", []) total_cases += len(cases) - console.print(f"[bold]Model:[/bold] [bold magenta]{model}[/bold magenta]") + output_console.print(f"[bold]Model:[/bold] [bold magenta]{model}[/bold magenta]") if show_details: - console.print(f"[bold magenta]{rubric}[/bold magenta]") + output_console.print(f"[bold magenta]{rubric}[/bold magenta]") for case in cases: evaluation = case["evaluation"] @@ -365,24 +366,123 @@ def display_eval_results(results: list[list[dict[str, Any]]], show_details: bool # Display one-line summary for each case with score as a percentage score_percentage = evaluation.score * 100 - console.print(f"{status} {case['name']} -- Score: {score_percentage:.2f}%") + output_console.print(f"{status} {case['name']} -- Score: {score_percentage:.2f}%") if show_details: # Show detailed information for each case - console.print(f"[bold]User Input:[/bold] {case['input']}\n") - console.print("[bold]Details:[/bold]") - console.print(_format_evaluation(evaluation)) - console.print("-" * 80) - - # Summary - summary = ( - f"[bold]Summary -- [/bold]Total: {total_cases} -- [green]Passed: {total_passed}[/green]" - ) - if total_warned > 0: - summary += f" -- [yellow]Warnings: {total_warned}[/yellow]" - if total_failed > 0: - summary += f" -- [red]Failed: {total_failed}[/red]" - console.print(summary + "\n") + output_console.print(f"[bold]User Input:[/bold] {case['input']}\n") + output_console.print("[bold]Details:[/bold]") + output_console.print(_format_evaluation(evaluation)) + output_console.print("-" * 80) + + output_console.print("") + + # Summary - use original counts if filtering, otherwise use current counts + if failed_only and original_counts: + # Unpack original counts + orig_total, orig_passed, orig_failed, orig_warned = original_counts + + # Show disclaimer before summary + output_console.print( + f"[bold yellow]Note: Showing only {total_cases} failed evaluation(s) (--only-failed)[/bold yellow]" + ) + + # Build summary with original counts + summary = ( + f"[bold]Summary -- [/bold]Total: {orig_total} -- [green]Passed: {orig_passed}[/green]" + ) + if orig_warned > 0: + summary += f" -- [yellow]Warnings: {orig_warned}[/yellow]" + if orig_failed > 0: + summary += f" -- [red]Failed: {orig_failed}[/red]" + else: + # Normal summary with current counts + summary = ( + f"[bold]Summary -- [/bold]Total: {total_cases} -- [green]Passed: {total_passed}[/green]" + ) + if total_warned > 0: + summary += f" -- [yellow]Warnings: {total_warned}[/yellow]" + if total_failed > 0: + summary += f" -- [red]Failed: {total_failed}[/red]" + + output_console.print(summary + "\n") + + +def display_eval_results( + results: list[list[dict[str, Any]]], + show_details: bool = False, + output_file: Optional[str] = None, + failed_only: bool = False, + original_counts: Optional[tuple[int, int, int, int]] = None, + output_formats: list[str] | None = None, + include_context: bool = False, +) -> None: + """ + Display evaluation results in a format inspired by pytest's output. + + Args: + results: List of dictionaries containing evaluation results for each model. + show_details: Whether to show detailed results for each case. + output_file: Optional file path to write results to. + failed_only: Whether only failed cases are being displayed (adds disclaimer). + original_counts: Optional tuple of (total_cases, total_passed, total_failed, total_warned) + from before filtering. Used when failed_only is True. + output_formats: List of output formats for file output (e.g., ['txt', 'md', 'html']). + include_context: Whether to include system_message and additional_messages. + """ + # Always display to terminal with Rich formatting + try: + _display_results_to_console(console, results, show_details, failed_only, original_counts) + except Exception as e: + console.print(f"[red]Error displaying results to console: {type(e).__name__}: {e}[/red]") + + # Also write to file(s) if requested using the specified formatter(s) + if output_file and output_formats: + from arcade_cli.formatters import get_formatter + + # Get base path without extension + base_path = Path(output_file) + base_name = base_path.stem + parent_dir = base_path.parent + + try: + parent_dir.mkdir(parents=True, exist_ok=True) + except PermissionError: + console.print(f"[red]Error: Permission denied creating directory {parent_dir}[/red]") + return + except OSError as e: + console.print(f"[red]Error creating directory: {e}[/red]") + return + + for fmt in output_formats: + # Define output_path early so it's available in exception handlers + output_path = parent_dir / f"{base_name}.{fmt}" + try: + formatter = get_formatter(fmt) + formatted_output = formatter.format( + results, + show_details=show_details, + failed_only=failed_only, + original_counts=original_counts, + include_context=include_context, + ) + + # Build output path with proper extension + output_path = parent_dir / f"{base_name}.{formatter.file_extension}" + + with open(output_path, "w", encoding="utf-8") as f: + f.write(formatted_output) + + console.print(f"[green]β Results written to {output_path}[/green]") + + except PermissionError: + console.print(f"[red]Error: Permission denied writing to {output_path}[/red]") + except OSError as e: + console.print(f"[red]Error writing file: {e}[/red]") + except Exception as e: + console.print( + f"[red]Error formatting results ({fmt}): {type(e).__name__}: {e}[/red]" + ) def _format_evaluation(evaluation: "EvaluationResult") -> str: diff --git a/libs/arcade-cli/arcade_cli/evals_runner.py b/libs/arcade-cli/arcade_cli/evals_runner.py new file mode 100644 index 000000000..4c16d5ec1 --- /dev/null +++ b/libs/arcade-cli/arcade_cli/evals_runner.py @@ -0,0 +1,515 @@ +""" +Evaluation and capture mode execution logic for the CLI. + +This module contains the async execution functions for running evaluations +and capture mode operations. +""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable + +from rich.console import Console +from rich.progress import BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn +from rich.text import Text + +from arcade_cli.display import display_eval_results +from arcade_cli.formatters import get_capture_formatter +from arcade_cli.utils import ModelSpec, filter_failed_evaluations + +if TYPE_CHECKING: + from arcade_evals import CaptureResult + +logger = logging.getLogger(__name__) + + +# All supported output formats +ALL_FORMATS = ["txt", "md", "html", "json"] + + +def parse_output_formats(format_str: str, console: Console | None = None) -> list[str]: + """ + Parse output format string into a list of formats. + + Supports: + - Single format: "md" -> ["md"] + - Comma-separated: "md,html" -> ["md", "html"] + - "all" keyword: "all" -> ["txt", "md", "html", "json"] + + Args: + format_str: The format string from CLI. + console: Optional Rich console for error messages (unused now - raises instead). + + Returns: + List of valid format strings. + + Raises: + ValueError: If any invalid formats are provided. + """ + if format_str.lower() == "all": + return ALL_FORMATS.copy() + + formats = [f.strip().lower() for f in format_str.split(",")] + valid_formats = [f for f in formats if f in ALL_FORMATS] + invalid_formats = [f for f in formats if f and f not in ALL_FORMATS] + + # Fail fast on invalid formats (parse-time validation) + if invalid_formats: + valid_list = ", ".join(ALL_FORMATS) + raise ValueError( + f"Invalid format(s): {', '.join(invalid_formats)}. Valid formats: {valid_list}" + ) + + return valid_formats + + +# --- Result Types for Error Handling --- + + +@dataclass +class EvalTaskResult: + """Result of running a single evaluation task.""" + + suite_name: str + model: str + provider: str + success: bool + result: Any | None = None # EvalResult on success + error: str | None = None + error_type: str | None = None + + @property + def display_name(self) -> str: + """Get display name in format 'provider/model'.""" + return f"{self.provider}/{self.model}" + + @classmethod + def from_success( + cls, suite_name: str, model: str, provider: str, result: Any + ) -> EvalTaskResult: + """Create a successful result.""" + return cls( + suite_name=suite_name, model=model, provider=provider, success=True, result=result + ) + + @classmethod + def from_error( + cls, suite_name: str, model: str, provider: str, error: Exception + ) -> EvalTaskResult: + """Create a failed result from an exception.""" + return cls( + suite_name=suite_name, + model=model, + provider=provider, + success=False, + error=str(error), + error_type=type(error).__name__, + ) + + +@dataclass +class CaptureTaskResult: + """Result of running a single capture task.""" + + suite_name: str + model: str + provider: str + success: bool + result: list[CaptureResult] | None = None # List of CaptureResult on success + error: str | None = None + error_type: str | None = None + + @property + def display_name(self) -> str: + """Get display name in format 'provider/model'.""" + return f"{self.provider}/{self.model}" + + @classmethod + def from_success( + cls, suite_name: str, model: str, provider: str, result: list[CaptureResult] + ) -> CaptureTaskResult: + """Create a successful result.""" + return cls( + suite_name=suite_name, model=model, provider=provider, success=True, result=result + ) + + @classmethod + def from_error( + cls, suite_name: str, model: str, provider: str, error: Exception + ) -> CaptureTaskResult: + """Create a failed result from an exception.""" + return cls( + suite_name=suite_name, + model=model, + provider=provider, + success=False, + error=str(error), + error_type=type(error).__name__, + ) + + +# --- Task Wrappers with Error Handling --- + + +async def _run_eval_task( + suite_func: Callable[..., Any], + model_spec: ModelSpec, + max_concurrent: int, + include_context: bool = False, +) -> EvalTaskResult: + """ + Run a single evaluation task with error handling. + + Returns EvalTaskResult with success/failure info instead of raising. + """ + suite_name = suite_func.__name__ + + try: + result = await suite_func( + provider_api_key=model_spec.api_key, + model=model_spec.model, + max_concurrency=max_concurrent, + provider=model_spec.provider.value, + include_context=include_context, + ) + return EvalTaskResult.from_success( + suite_name, model_spec.model, model_spec.provider.value, result + ) + + except Exception as e: + logger.warning( + "Evaluation task failed: suite=%s, model=%s, provider=%s, error=%s: %s", + suite_name, + model_spec.model, + model_spec.provider.value, + type(e).__name__, + str(e), + exc_info=True, # Include full traceback for debugging + ) + return EvalTaskResult.from_error(suite_name, model_spec.model, model_spec.provider.value, e) + + +async def _run_capture_task( + suite_func: Callable[..., Any], + model_spec: ModelSpec, + max_concurrent: int, + include_context: bool, +) -> CaptureTaskResult: + """ + Run a single capture task with error handling. + + Returns CaptureTaskResult with success/failure info instead of raising. + """ + suite_name = suite_func.__name__ + + try: + result = await suite_func( + provider_api_key=model_spec.api_key, + model=model_spec.model, + max_concurrency=max_concurrent, + provider=model_spec.provider.value, + capture_mode=True, + include_context=include_context, + ) + return CaptureTaskResult.from_success( + suite_name, model_spec.model, model_spec.provider.value, result + ) + + except Exception as e: + logger.warning( + "Capture task failed: suite=%s, model=%s, provider=%s, error=%s: %s", + suite_name, + model_spec.model, + model_spec.provider.value, + type(e).__name__, + str(e), + exc_info=True, # Include full traceback for debugging + ) + return CaptureTaskResult.from_error( + suite_name, model_spec.model, model_spec.provider.value, e + ) + + +# --- Main Runner Functions --- + + +async def run_evaluations( + eval_suites: list[Callable[..., Any]], + model_specs: list[ModelSpec], + max_concurrent: int, + show_details: bool, + output_file: str | None, + output_format: str, + failed_only: bool, + console: Console, + include_context: bool = False, +) -> None: + """ + Run evaluation suites and display results. + + Individual task failures are caught and reported without crashing the entire batch. + + Args: + eval_suites: List of decorated evaluation suite functions. + model_specs: List of ModelSpec objects containing provider, model, and API key. + max_concurrent: Maximum concurrent evaluations. + show_details: Whether to show detailed results. + output_file: Optional file path to write results. + output_format: Format for file output ('txt', 'md'). + failed_only: Whether to show only failed evaluations. + console: Rich console for output. + include_context: Whether to include system_message and additional_messages. + """ + tasks = [] + + for suite_func in eval_suites: + console.print( + Text.assemble( + ("Running evaluations in ", "bold"), + (suite_func.__name__, "bold blue"), + ) + ) + for model_spec in model_specs: + task = asyncio.create_task( + _run_eval_task( + suite_func=suite_func, + model_spec=model_spec, + max_concurrent=max_concurrent, + include_context=include_context, + ) + ) + tasks.append(task) + + # Track progress with Rich progress bar (compatible with Rich console) + # Note: task_results is collected synchronously as each async task completes. + # The append() is atomic in CPython due to the GIL, and we await each future + # sequentially within the for-loop, so this is safe. + task_results: list[EvalTaskResult] = [] + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console, + transient=False, + ) as progress: + task_id = progress.add_task("[cyan]Running evaluations...", total=len(tasks)) + for f in asyncio.as_completed(tasks): + result = await f + task_results.append(result) + # Update progress with completed task info + progress.update( + task_id, + advance=1, + description=f"[cyan]Completed: {result.suite_name} ({result.display_name})", + ) + + # Separate successes and failures + successful = [r for r in task_results if r.success] + failed = [r for r in task_results if not r.success] + + # Report failures + if failed: + console.print(f"\n[bold yellow]β οΈ {len(failed)} evaluation(s) failed:[/bold yellow]") + for fail in failed: + console.print( + f" β’ {fail.suite_name} ({fail.display_name}): [red]{fail.error_type}[/red] - {fail.error}" + ) + + # Process successful results + # Normalize results structure: ensure each result is a list (for consistent formatting) + # - Regular evals return a single dict -> wrap in list + # - Comparative evals return a list of dicts -> keep as is + all_evaluations: list[list[dict[str, Any]]] = [] + for r in successful: + if r.result is None: + continue + if isinstance(r.result, list): + # Comparative eval: already a list of results (one per track) + all_evaluations.append(r.result) + else: + # Regular eval: single dict, wrap in list for consistent structure + all_evaluations.append([r.result]) + + if not all_evaluations: + console.print("\n[bold red]β No evaluations completed successfully.[/bold red]") + return + + # Filter to show only failed evaluations if requested + original_counts = None + if failed_only: + all_evaluations, original_counts = filter_failed_evaluations(all_evaluations) + + # Parse output_format as a list (handles comma-separated and "all") + output_formats = parse_output_formats(output_format, console) + + display_eval_results( + all_evaluations, + show_details=show_details, + output_file=output_file, + failed_only=failed_only, + original_counts=original_counts, + output_formats=output_formats, + include_context=include_context, + ) + + # Summary when there were failures + if failed: + console.print(f"\n[bold]Summary:[/bold] {len(successful)} succeeded, {len(failed)} failed") + + +async def run_capture( + eval_suites: list[Callable[..., Any]], + model_specs: list[ModelSpec], + max_concurrent: int, + include_context: bool, + output_file: str | None, + output_format: str, + console: Console, +) -> None: + """ + Run evaluation suites in capture mode and output results. + + Capture mode records tool calls without scoring them. + Individual task failures are caught and reported without crashing the entire batch. + + Args: + eval_suites: List of decorated evaluation suite functions. + model_specs: List of ModelSpec objects containing provider, model, and API key. + max_concurrent: Maximum concurrent operations. + include_context: Whether to include system_message and additional_messages. + output_file: Optional file path to write results. + output_format: Output format ('json', 'txt', 'md', 'html'). + console: Rich console for output. + """ + tasks = [] + + for suite_func in eval_suites: + console.print( + Text.assemble( + ("Capturing tool calls from ", "bold"), + (suite_func.__name__, "bold cyan"), + ) + ) + for model_spec in model_specs: + task = asyncio.create_task( + _run_capture_task( + suite_func=suite_func, + model_spec=model_spec, + max_concurrent=max_concurrent, + include_context=include_context, + ) + ) + tasks.append(task) + + # Track progress with Rich progress bar (compatible with Rich console) + # Note: task_results is collected synchronously as each async task completes. + # The append() is atomic in CPython due to the GIL, and we await each future + # sequentially within the for-loop, so this is safe. + task_results: list[CaptureTaskResult] = [] + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console, + transient=False, + ) as progress: + task_id = progress.add_task("[cyan]Capturing tool calls...", total=len(tasks)) + for f in asyncio.as_completed(tasks): + result = await f + task_results.append(result) + # Update progress with completed task info + progress.update( + task_id, + advance=1, + description=f"[cyan]Completed: {result.suite_name} ({result.display_name})", + ) + + # Separate successes and failures + successful = [r for r in task_results if r.success] + failed = [r for r in task_results if not r.success] + + # Report failures + if failed: + console.print(f"\n[bold yellow]β οΈ {len(failed)} capture(s) failed:[/bold yellow]") + for fail in failed: + console.print( + f" β’ {fail.suite_name} ({fail.display_name}): [red]{fail.error_type}[/red] - {fail.error}" + ) + + # Collect successful captures + all_captures: list[CaptureResult] = [] + for r in successful: + if r.result is not None: + all_captures.extend(r.result) + + if not all_captures: + console.print("\n[bold red]β No captures completed successfully.[/bold red]") + return + + # Parse output formats (handles comma-separated and "all") + output_formats = parse_output_formats(output_format, console) + + # Output to file(s) or console + if output_file: + # Get base path without extension + base_path = Path(output_file) + base_name = base_path.stem + parent_dir = base_path.parent + + try: + parent_dir.mkdir(parents=True, exist_ok=True) + except PermissionError: + console.print( + f"\n[red]β Error: Permission denied creating directory {parent_dir}[/red]" + ) + return + except OSError as e: + console.print(f"\n[red]β Error creating directory: {e}[/red]") + return + + for fmt in output_formats: + # Define file_path early so it's available in exception handlers + file_path = parent_dir / f"{base_name}.{fmt}" + try: + formatter = get_capture_formatter(fmt) + formatted_output = formatter.format(all_captures, include_context=include_context) + + # Build output path with proper extension + file_path = parent_dir / f"{base_name}.{formatter.file_extension}" + + with open(file_path, "w", encoding="utf-8") as outfile: + outfile.write(formatted_output) + console.print( + f"\n[green]β Capture results written to[/green] [bold]{file_path}[/bold]" + ) + + except ValueError as e: + console.print(f"\n[red]β {e}[/red]") + except PermissionError: + console.print(f"\n[red]β Error: Permission denied writing to {file_path}[/red]") + except OSError as e: + console.print(f"\n[red]β Error writing file: {e}[/red]") + else: + # Console output: always use JSON for best copy-paste experience + console.print("\n[bold]Capture Results:[/bold]") + json_formatter = get_capture_formatter("json") + console.print(json_formatter.format(all_captures, include_context=include_context)) + + # Summary + total_cases = sum(len(cap.captured_cases) for cap in all_captures) + total_calls = sum( + sum(len(case.tool_calls) for case in cap.captured_cases) for cap in all_captures + ) + console.print( + f"\n[bold green]Captured {total_calls} tool calls across {total_cases} cases[/bold green]" + ) + + # Summary when there were failures + if failed: + console.print(f"\n[bold]Summary:[/bold] {len(successful)} succeeded, {len(failed)} failed") diff --git a/libs/arcade-cli/arcade_cli/formatters/__init__.py b/libs/arcade-cli/arcade_cli/formatters/__init__.py new file mode 100644 index 000000000..6b23329a3 --- /dev/null +++ b/libs/arcade-cli/arcade_cli/formatters/__init__.py @@ -0,0 +1,102 @@ +"""Formatters for evaluation and capture results output.""" + +from difflib import get_close_matches + +from arcade_cli.formatters.base import CaptureFormatter, EvalResultFormatter +from arcade_cli.formatters.html import CaptureHtmlFormatter, HtmlFormatter +from arcade_cli.formatters.json import CaptureJsonFormatter, JsonFormatter +from arcade_cli.formatters.markdown import CaptureMarkdownFormatter, MarkdownFormatter +from arcade_cli.formatters.text import CaptureTextFormatter, TextFormatter + +# Registry of available formatters for evaluations +FORMATTERS: dict[str, type[EvalResultFormatter]] = { + "txt": TextFormatter, + "md": MarkdownFormatter, + "html": HtmlFormatter, + "json": JsonFormatter, +} + +# Registry of available formatters for capture mode +CAPTURE_FORMATTERS: dict[str, type[CaptureFormatter]] = { + "json": CaptureJsonFormatter, + "txt": CaptureTextFormatter, + "md": CaptureMarkdownFormatter, + "html": CaptureHtmlFormatter, +} + + +def get_formatter(format_name: str) -> EvalResultFormatter: + """ + Get a formatter instance by name. + + Args: + format_name: The format name (e.g., 'txt', 'md'). + + Returns: + An instance of the appropriate formatter. + + Raises: + ValueError: If the format is not supported. Suggests similar format names if available. + """ + formatter_class = FORMATTERS.get(format_name.lower()) + if formatter_class is None: + supported = list(FORMATTERS.keys()) + + # Try to find a close match for better error messages + close_matches = get_close_matches(format_name.lower(), supported, n=1, cutoff=0.6) + + error_msg = f"Unsupported format '{format_name}'." + if close_matches: + error_msg += f" Did you mean '{close_matches[0]}'?" + error_msg += f" Supported formats: {', '.join(supported)}" + + raise ValueError(error_msg) + return formatter_class() + + +def get_capture_formatter(format_name: str) -> CaptureFormatter: + """ + Get a capture formatter instance by name. + + Args: + format_name: The format name (e.g., 'json', 'txt', 'md', 'html'). + + Returns: + An instance of the appropriate formatter. + + Raises: + ValueError: If the format is not supported. Suggests similar format names if available. + """ + formatter_class = CAPTURE_FORMATTERS.get(format_name.lower()) + if formatter_class is None: + supported = list(CAPTURE_FORMATTERS.keys()) + + close_matches = get_close_matches(format_name.lower(), supported, n=1, cutoff=0.6) + + error_msg = f"Unsupported capture format '{format_name}'." + if close_matches: + error_msg += f" Did you mean '{close_matches[0]}'?" + error_msg += f" Supported formats: {', '.join(supported)}" + + raise ValueError(error_msg) + return formatter_class() + + +__all__ = [ + # Eval formatters + "FORMATTERS", + "EvalResultFormatter", + "HtmlFormatter", + "JsonFormatter", + "MarkdownFormatter", + "TextFormatter", + "get_formatter", + # Capture formatters + "CAPTURE_FORMATTERS", + "CaptureFormatter", + "CaptureHtmlFormatter", + "CaptureJsonFormatter", + "CaptureMarkdownFormatter", + "CaptureTextFormatter", + "get_capture_formatter", +] diff --git a/libs/arcade-cli/arcade_cli/formatters/base.py b/libs/arcade-cli/arcade_cli/formatters/base.py new file mode 100644 index 000000000..3b1d61661 --- /dev/null +++ b/libs/arcade-cli/arcade_cli/formatters/base.py @@ -0,0 +1,791 @@ +"""Base formatter for evaluation and capture results.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from arcade_evals import CaptureResult + +# Type alias for capture results +CaptureResults = list["CaptureResult"] + +# --- Type Aliases --- +# The results structure: list of suites, each containing list of model results +EvalResults = list[list[dict[str, Any]]] + +# Model -> Suite -> Cases mapping +ModelSuiteGroups = dict[str, dict[str, list[dict[str, Any]]]] + +# Statistics tuple: (total, passed, failed, warned) +EvalStats = tuple[int, int, int, int] + +# Comparative grouping: model -> base_suite -> case_name -> {input, tracks: {track: case_result}} +ComparativeCaseData = dict[str, Any] # {input, tracks: {track_name: case_result}} +ComparativeSuiteData = dict[str, ComparativeCaseData] # case_name -> ComparativeCaseData +ComparativeGroups = dict[str, dict[str, ComparativeSuiteData]] # model -> suite -> cases + +# --- Constants --- +# Maximum field value length before truncation (for display) +MAX_FIELD_DISPLAY_LENGTH = 60 +TRUNCATION_SUFFIX = "..." + + +def truncate_field_value(value: str, max_length: int = MAX_FIELD_DISPLAY_LENGTH) -> str: + """ + Truncate long field values for display. + + Args: + value: The string value to potentially truncate. + max_length: Maximum allowed length (default: 60). + + Returns: + The original value if within limits, or truncated with "..." suffix. + """ + if len(value) > max_length: + return value[: max_length - len(TRUNCATION_SUFFIX)] + TRUNCATION_SUFFIX + return value + + +def group_results_by_model( + results: EvalResults, +) -> tuple[ModelSuiteGroups, int, int, int, int]: + """ + Group evaluation results by model and suite, collecting statistics. + + This is the shared logic used by all formatters and display functions. + + Args: + results: Nested list of evaluation results by suite and model. + + Returns: + A tuple of: + - model_groups: Dict mapping model -> suite -> list of cases + - total_passed: Count of passed evaluations + - total_failed: Count of failed evaluations + - total_warned: Count of warned evaluations + - total_cases: Total count of all cases + """ + total_passed = 0 + total_failed = 0 + total_warned = 0 + total_cases = 0 + model_groups: ModelSuiteGroups = {} + + for eval_suite in results: + for model_results in eval_suite: + model = model_results.get("model", "Unknown Model") + + # suite_name is always set by EvalSuite.evaluate() + suite_name = model_results.get("suite_name") or "Unnamed Suite" + + cases = model_results.get("cases", []) + total_cases += len(cases) + + if model not in model_groups: + model_groups[model] = {} + + if suite_name not in model_groups[model]: + model_groups[model][suite_name] = [] + + for case in cases: + evaluation = case["evaluation"] + if evaluation.passed: + total_passed += 1 + elif evaluation.warning: + total_warned += 1 + else: + total_failed += 1 + + model_groups[model][suite_name].append(case) + + return model_groups, total_passed, total_failed, total_warned, total_cases + + +def is_comparative_result(results: EvalResults) -> bool: + """ + Check if results contain comparative evaluations. + + Comparative results have a 'track_name' field that indicates they came + from a multi-track comparative evaluation. + + Args: + results: Nested list of evaluation results. + + Returns: + True if any result has a 'track_name' field. + """ + for eval_suite in results: + for model_results in eval_suite: + if model_results.get("track_name"): + return True + return False + + +def _extract_base_suite_name(suite_name: str, track_name: str) -> str: + """ + Extract the base suite name by removing the track suffix. + + Examples: + "My Suite [track_a]" with track "track_a" -> "My Suite" + "Suite Name [some_track]" with track "some_track" -> "Suite Name" + """ + suffix = f" [{track_name}]" + if suite_name.endswith(suffix): + return suite_name[: -len(suffix)] + return suite_name + + +def group_comparative_by_case( + results: EvalResults, +) -> tuple[ComparativeGroups, int, int, int, int, dict[str, list[str]]]: + """ + Group comparative results by model, suite, and case name. + + This allows showing the same case across different tracks side-by-side. + + Args: + results: Nested list of evaluation results (must be comparative). + + Returns: + A tuple of: + - comparative_groups: {model: {base_suite: {case_name: {input, tracks: {track: result}}}}} + - total_passed: Count of passed evaluations + - total_failed: Count of failed evaluations + - total_warned: Count of warned evaluations + - total_cases: Total count of all cases (unique case_name * tracks) + - suite_track_order: Dict mapping base_suite -> list of track names for that suite + """ + total_passed = 0 + total_failed = 0 + total_warned = 0 + total_cases = 0 + + # Track order per suite (different suites can have different tracks) + suite_track_order: dict[str, list[str]] = {} + + # Structure: model -> base_suite -> case_name -> {input, tracks: {track: case_result}} + comparative_groups: ComparativeGroups = {} + + for eval_suite in results: + for model_results in eval_suite: + model = model_results.get("model", "Unknown Model") + suite_name = model_results.get("suite_name") or "Unnamed Suite" + track_name = model_results.get("track_name", "default") + + # Extract base suite name (without track suffix) + base_suite = _extract_base_suite_name(suite_name, track_name) + + # Track the order of tracks per suite + if base_suite not in suite_track_order: + suite_track_order[base_suite] = [] + if track_name not in suite_track_order[base_suite]: + suite_track_order[base_suite].append(track_name) + + cases = model_results.get("cases", []) + total_cases += len(cases) + + if model not in comparative_groups: + comparative_groups[model] = {} + + if base_suite not in comparative_groups[model]: + comparative_groups[model][base_suite] = {} + + for case in cases: + case_name = case["name"] + evaluation = case["evaluation"] + + # Count stats + if evaluation.passed: + total_passed += 1 + elif evaluation.warning: + total_warned += 1 + else: + total_failed += 1 + + # Initialize case entry if needed + if case_name not in comparative_groups[model][base_suite]: + comparative_groups[model][base_suite][case_name] = { + "input": case.get("input", ""), + "system_message": case.get("system_message"), + "additional_messages": case.get("additional_messages"), + "tracks": {}, + } + + # Store this track's result for this case + comparative_groups[model][base_suite][case_name]["tracks"][track_name] = { + "evaluation": evaluation, + "name": case_name, + "input": case.get("input", ""), + } + + return ( + comparative_groups, + total_passed, + total_failed, + total_warned, + total_cases, + suite_track_order, + ) + + +def compute_track_differences( + case_data: ComparativeCaseData, + track_order: list[str], +) -> dict[str, list[str]]: + """ + Compute which fields differ between tracks for a given case. + + Compares each track against the first track (baseline). + + Args: + case_data: The case data with tracks. + track_order: List of track names in order. + + Returns: + Dict mapping track_name -> list of field names that differ from baseline. + """ + differences: dict[str, list[str]] = {} + tracks = case_data.get("tracks", {}) + + if len(tracks) < 2 or not track_order: + return differences + + # First track is baseline + baseline_track = track_order[0] + if baseline_track not in tracks: + return differences + + baseline_result = tracks[baseline_track] + baseline_eval = baseline_result.get("evaluation") + if not baseline_eval or not hasattr(baseline_eval, "results"): + return differences + + # Build baseline field values + baseline_fields: dict[str, Any] = {} + for critic_result in baseline_eval.results: + field = critic_result.get("field", "") + baseline_fields[field] = { + "actual": critic_result.get("actual"), + "match": critic_result.get("match"), + "score": critic_result.get("score"), + } + + # Compare other tracks to baseline + for track_name in track_order[1:]: + if track_name not in tracks: + continue + + track_result = tracks[track_name] + track_eval = track_result.get("evaluation") + if not track_eval or not hasattr(track_eval, "results"): + continue + + diff_fields: list[str] = [] + + for critic_result in track_eval.results: + field = critic_result.get("field", "") + actual = critic_result.get("actual") + match = critic_result.get("match") + + # Check if this field exists in baseline and differs + if field in baseline_fields: + baseline_data = baseline_fields[field] + # Different if actual value differs or match status differs + if actual != baseline_data["actual"] or match != baseline_data["match"]: + diff_fields.append(field) + else: + # Field exists in this track but not baseline + diff_fields.append(field) + + differences[track_name] = diff_fields + + return differences + + +# Type for case-first comparative grouping +# Structure: suite -> case_name -> model -> {input, tracks: {track: result}} +CaseFirstComparativeGroups = dict[str, dict[str, dict[str, dict[str, Any]]]] + + +def is_multi_model_comparative(results: EvalResults) -> bool: + """ + Check if comparative results contain multiple models. + + Args: + results: Nested list of evaluation results. + + Returns: + True if this is a comparative result with more than one unique model. + """ + if not is_comparative_result(results): + return False + + models: set[str] = set() + for eval_suite in results: + for model_results in eval_suite: + model = model_results.get("model", "Unknown") + models.add(model) + if len(models) > 1: + return True + return False + + +def group_comparative_by_case_first( + results: EvalResults, +) -> tuple[CaseFirstComparativeGroups, list[str], dict[str, list[str]], int, int, int, int]: + """ + Group comparative results by suite -> case -> model for case-first comparison. + + When multiple models run the same comparative evaluation, this groups results + so the same case from different models appears together. + + Args: + results: Nested list of comparative evaluation results. + + Returns: + A tuple of: + - case_groups: {suite: {case_name: {model: {input, tracks: {track: result}}}}} + - model_order: List of model names in order of appearance + - suite_track_order: Dict mapping suite -> list of track names + - total_passed, total_failed, total_warned, total_cases + """ + total_passed = 0 + total_failed = 0 + total_warned = 0 + total_cases = 0 + + model_order: list[str] = [] + suite_track_order: dict[str, list[str]] = {} + + # Structure: base_suite -> case_name -> model -> {input, tracks: {track: result}} + case_groups: CaseFirstComparativeGroups = {} + + for eval_suite in results: + for model_results in eval_suite: + model = model_results.get("model", "Unknown Model") + suite_name = model_results.get("suite_name") or "Unnamed Suite" + track_name = model_results.get("track_name", "default") + + # Track model order + if model not in model_order: + model_order.append(model) + + # Extract base suite name (without track suffix) + base_suite = _extract_base_suite_name(suite_name, track_name) + + # Track the order of tracks per suite + if base_suite not in suite_track_order: + suite_track_order[base_suite] = [] + if track_name not in suite_track_order[base_suite]: + suite_track_order[base_suite].append(track_name) + + cases = model_results.get("cases", []) + total_cases += len(cases) + + # Initialize suite + if base_suite not in case_groups: + case_groups[base_suite] = {} + + for case in cases: + case_name = case["name"] + evaluation = case["evaluation"] + + # Count stats + if evaluation.passed: + total_passed += 1 + elif evaluation.warning: + total_warned += 1 + else: + total_failed += 1 + + # Initialize case + if case_name not in case_groups[base_suite]: + case_groups[base_suite][case_name] = {} + + # Initialize model entry for this case + if model not in case_groups[base_suite][case_name]: + case_groups[base_suite][case_name][model] = { + "input": case.get("input", ""), + "system_message": case.get("system_message"), + "additional_messages": case.get("additional_messages"), + "tracks": {}, + } + + # Store this track's result + case_groups[base_suite][case_name][model]["tracks"][track_name] = { + "evaluation": evaluation, + "name": case_name, + "input": case.get("input", ""), + } + + return ( + case_groups, + model_order, + suite_track_order, + total_passed, + total_failed, + total_warned, + total_cases, + ) + + +# ============================================================================= +# MULTI-MODEL HELPERS +# ============================================================================= + + +def is_multi_model_eval(results: EvalResults) -> bool: + """ + Check if evaluation results contain multiple models. + + Args: + results: Nested list of evaluation results. + + Returns: + True if more than one unique model is present. + """ + models: set[str] = set() + for eval_suite in results: + for model_results in eval_suite: + model = model_results.get("model", "Unknown") + models.add(model) + if len(models) > 1: + return True + return False + + +def is_multi_model_capture(captures: CaptureResults) -> bool: + """ + Check if capture results contain multiple models. + + Args: + captures: List of CaptureResult objects. + + Returns: + True if more than one unique model is present. + """ + models = {c.model for c in captures} + return len(models) > 1 + + +# Type for multi-model comparison: suite -> case -> model -> case_result +MultiModelComparisonData = dict[str, dict[str, dict[str, dict[str, Any]]]] + +# Type for per-model stats: model -> {passed, failed, warned, total, pass_rate} +PerModelStats = dict[str, dict[str, Any]] + + +def group_eval_for_comparison( + results: EvalResults, +) -> tuple[MultiModelComparisonData, list[str], PerModelStats]: + """ + Reorganize evaluation results for cross-model comparison. + + Groups results by suite -> case -> model, enabling side-by-side tables. + + Args: + results: Nested list of evaluation results. + + Returns: + A tuple of: + - comparison_data: {suite: {case_name: {model: case_result}}} + - model_order: List of model names in order of appearance + - per_model_stats: {model: {passed, failed, warned, total, pass_rate}} + """ + comparison_data: MultiModelComparisonData = {} + model_order: list[str] = [] + per_model_stats: PerModelStats = {} + + for eval_suite in results: + for model_results in eval_suite: + model = model_results.get("model", "Unknown Model") + suite_name = model_results.get("suite_name") or "Unnamed Suite" + cases = model_results.get("cases", []) + + # Track model order + if model not in model_order: + model_order.append(model) + + # Initialize per-model stats + if model not in per_model_stats: + per_model_stats[model] = { + "passed": 0, + "failed": 0, + "warned": 0, + "total": 0, + } + + # Initialize suite in comparison data + if suite_name not in comparison_data: + comparison_data[suite_name] = {} + + for case in cases: + case_name = case["name"] + evaluation = case["evaluation"] + + # Update per-model stats + per_model_stats[model]["total"] += 1 + if evaluation.passed: + per_model_stats[model]["passed"] += 1 + elif evaluation.warning: + per_model_stats[model]["warned"] += 1 + else: + per_model_stats[model]["failed"] += 1 + + # Initialize case in suite + if case_name not in comparison_data[suite_name]: + comparison_data[suite_name][case_name] = {} + + # Store this model's result for this case + comparison_data[suite_name][case_name][model] = { + "evaluation": evaluation, + "input": case.get("input", ""), + "name": case_name, + } + + # Calculate pass rates + for _model, stats in per_model_stats.items(): + if stats["total"] > 0: + stats["pass_rate"] = (stats["passed"] / stats["total"]) * 100 + else: + stats["pass_rate"] = 0.0 + + return comparison_data, model_order, per_model_stats + + +def find_best_model( + case_models: dict[str, dict[str, Any]], +) -> tuple[str | None, float]: + """ + Find the model with the highest score for a case. + + Args: + case_models: Dict mapping model -> case_result with evaluation. + + Returns: + Tuple of (best_model_name, best_score). Returns (None, 0.0) if no models + or if all evaluations are missing. + Returns ("Tie", score) if multiple models share the highest score. + """ + if not case_models: + return None, 0.0 + + best_model: str | None = None + best_score = -1.0 + tie = False + found_valid_evaluation = False + + for model, case_result in case_models.items(): + evaluation = case_result.get("evaluation") + if not evaluation: + continue + + found_valid_evaluation = True + score = evaluation.score + if score > best_score: + best_score = score + best_model = model + tie = False + elif score == best_score: + tie = True + + # Return 0.0 if no valid evaluations found (not -1.0) + if not found_valid_evaluation: + return None, 0.0 + + if tie: + return "Tie", best_score + + return best_model, best_score + + +# Type for grouped captures: suite -> case_name -> {user_message, models: {model: [tool_calls]}} +GroupedCaptures = dict[str, dict[str, dict[str, Any]]] + + +def group_captures_by_case( + captures: CaptureResults, +) -> tuple[GroupedCaptures, list[str]]: + """ + Group capture results by suite and case for multi-model comparison. + + Args: + captures: List of CaptureResult objects. + + Returns: + A tuple of: + - grouped: {suite: {case_key: {user_message, system_message, track_name, models: {model: captured_case}}}} + - model_order: List of model names in order of appearance + + Note: For comparative captures with tracks, case_key includes the track name + to keep them separate (e.g., "weather_case [track_a]"). + """ + grouped: GroupedCaptures = {} + model_order: list[str] = [] + + for capture in captures: + suite_name = capture.suite_name + model = capture.model + + # Track model order + if model not in model_order: + model_order.append(model) + + # Initialize suite + if suite_name not in grouped: + grouped[suite_name] = {} + + for case in capture.captured_cases: + # Include track_name in the key for comparative captures + track_name = getattr(case, "track_name", None) + case_key = f"{case.case_name} [{track_name}]" if track_name else case.case_name + + # Initialize case + if case_key not in grouped[suite_name]: + grouped[suite_name][case_key] = { + "user_message": case.user_message, + "system_message": case.system_message, + "additional_messages": case.additional_messages, + "track_name": track_name, + "models": {}, + } + + # Store this model's captured case + grouped[suite_name][case_key]["models"][model] = case + + return grouped, model_order + + +def group_captures_by_case_then_track( + captures: CaptureResults, +) -> tuple[dict[str, dict[str, dict[str, Any]]], list[str], list[str | None]]: + """ + Group capture results by suite, case, then track for tab-based display. + + Args: + captures: List of CaptureResult objects. + + Returns: + A tuple of: + - grouped: {suite: {base_case_name: {tracks: {track: {models: {model: case}}}, user_message, ...}}} + - model_order: List of model names in order + - track_order: List of track names in order (None for non-comparative) + """ + grouped: dict[str, dict[str, dict[str, Any]]] = {} + model_order: list[str] = [] + track_order: list[str | None] = [] + + for capture in captures: + suite_name = capture.suite_name + model = capture.model + + if model not in model_order: + model_order.append(model) + + if suite_name not in grouped: + grouped[suite_name] = {} + + for case in capture.captured_cases: + track_name = getattr(case, "track_name", None) + base_case_name = case.case_name + + # Track order + if track_name and track_name not in track_order: + track_order.append(track_name) + + # Initialize case + if base_case_name not in grouped[suite_name]: + grouped[suite_name][base_case_name] = { + "user_message": case.user_message, + "system_message": case.system_message, + "additional_messages": case.additional_messages, + "tracks": {}, # {track_name: {models: {model: case}}} + } + + # Initialize track + track_key = track_name or "_default" + if track_key not in grouped[suite_name][base_case_name]["tracks"]: + grouped[suite_name][base_case_name]["tracks"][track_key] = { + "models": {}, + } + + # Store case under track and model + grouped[suite_name][base_case_name]["tracks"][track_key]["models"][model] = case + + # If no tracks, add None to track_order for consistent handling + if not track_order: + track_order = [None] + + return grouped, model_order, track_order + + +class EvalResultFormatter(ABC): + """ + Abstract base class for evaluation result formatters. + + Implement this class to add new output formats (txt, md, json, html, etc.). + """ + + @property + @abstractmethod + def file_extension(self) -> str: + """Return the default file extension for this format (e.g., 'txt', 'md').""" + ... + + @abstractmethod + def format( + self, + results: EvalResults, + show_details: bool = False, + failed_only: bool = False, + original_counts: EvalStats | None = None, + include_context: bool = False, + ) -> str: + """ + Format evaluation results into a string. + + Args: + results: Nested list of evaluation results by suite and model. + show_details: Whether to show detailed results for each case. + failed_only: Whether only failed cases are being displayed. + original_counts: Optional (total, passed, failed, warned) from before filtering. + include_context: Whether to include system_message and additional_messages. + + Returns: + Formatted string representation of the results. + """ + ... + + +class CaptureFormatter(ABC): + """ + Abstract base class for capture result formatters. + + Implement this class to add new output formats for capture mode. + """ + + @property + @abstractmethod + def file_extension(self) -> str: + """Return the default file extension for this format.""" + ... + + @abstractmethod + def format( + self, + captures: CaptureResults, + include_context: bool = False, + ) -> str: + """ + Format capture results into a string. + + Args: + captures: List of CaptureResult objects. + include_context: Whether to include system_message and additional_messages. + + Returns: + Formatted string representation of the capture results. + """ + ... diff --git a/libs/arcade-cli/arcade_cli/formatters/html.py b/libs/arcade-cli/arcade_cli/formatters/html.py new file mode 100644 index 000000000..ab900aa20 --- /dev/null +++ b/libs/arcade-cli/arcade_cli/formatters/html.py @@ -0,0 +1,2878 @@ +"""HTML formatter for evaluation and capture results with full color support.""" + +import json +from datetime import datetime, timezone +from typing import Any + +from arcade_cli.formatters.base import ( + CaptureFormatter, + CaptureResults, + ComparativeCaseData, + EvalResultFormatter, + compute_track_differences, + find_best_model, + group_comparative_by_case, + group_comparative_by_case_first, + group_eval_for_comparison, + group_results_by_model, + is_comparative_result, + is_multi_model_capture, + is_multi_model_comparative, + is_multi_model_eval, + truncate_field_value, +) + + +class HtmlFormatter(EvalResultFormatter): + """ + HTML formatter for evaluation results. + + Produces a styled HTML document with colors matching the terminal output. + + Security Note: All user-controllable data MUST be escaped via _escape_html() + before being inserted into HTML. This includes case names, inputs, model names, + suite names, and any evaluation results or error messages. + """ + + def __init__(self) -> None: + """Initialize formatter with ID tracking for uniqueness.""" + super().__init__() + self._id_cache: dict[tuple[str, str, str], str] = {} + self._used_ids: set[str] = set() + + @property + def file_extension(self) -> str: + return "html" + + def format( + self, + results: list[list[dict[str, Any]]], + show_details: bool = False, + failed_only: bool = False, + original_counts: tuple[int, int, int, int] | None = None, + include_context: bool = False, + ) -> str: + # Check if this is a comparative evaluation + if is_comparative_result(results): + return self._format_comparative( + results, show_details, failed_only, original_counts, include_context + ) + + # Check if this is a multi-model evaluation + if is_multi_model_eval(results): + return self._format_multi_model( + results, show_details, failed_only, original_counts, include_context + ) + + return self._format_regular( + results, show_details, failed_only, original_counts, include_context + ) + + def _format_regular( + self, + results: list[list[dict[str, Any]]], + show_details: bool = False, + failed_only: bool = False, + original_counts: tuple[int, int, int, int] | None = None, + include_context: bool = False, + ) -> str: + """Format regular (non-comparative) evaluation results.""" + # Use shared grouping logic + model_groups, total_passed, total_failed, total_warned, total_cases = ( + group_results_by_model(results) + ) + + # Calculate pass rate + if total_cases > 0: + if failed_only and original_counts and original_counts[0] > 0: + pass_rate = (original_counts[1] / original_counts[0]) * 100 + else: + pass_rate = (total_passed / total_cases) * 100 + else: + pass_rate = 0 + + # Build HTML + html_parts = [self._get_html_header()] + + # Title and timestamp + html_parts.append('
| Status | Case | Score |
|---|---|---|
| {status_text} | ') + html_parts.append(f"{case_name} | ") + html_parts.append(f'{score_pct:.1f}% | ') + html_parts.append("