From 9e9782e126c0d26fc56ccbd3f5b96ccf44ccf946 Mon Sep 17 00:00:00 2001 From: song11071696 <113122791+song11071696@users.noreply.github.com> Date: Sat, 13 Jun 2026 15:31:35 +0800 Subject: [PATCH] fix: rebase benchmark suite onto latest main Resolve merge conflicts by rebuilding branch from main. All original files preserved, benchmark files added. --- benchmarks/__init__.py | 16 ++ benchmarks/base.py | 131 +++++++++++ benchmarks/evaluator.py | 86 ++++++++ benchmarks/mem0_adapter.py | 165 ++++++++++++++ benchmarks/memanto_adapter.py | 167 ++++++++++++++ benchmarks/scenario_a.py | 124 +++++++++++ benchmarks/scenario_b.py | 123 +++++++++++ datasets/persona_evolution.json | 61 ++++++ datasets/technical_logs.json | 92 ++++++++ requirements.txt | 8 + run_benchmark.py | 375 ++++++++++++++++++++++++++++++++ tests/test_adapters.py | 240 ++++++++++++++++++++ 12 files changed, 1588 insertions(+) create mode 100644 benchmarks/__init__.py create mode 100644 benchmarks/base.py create mode 100644 benchmarks/evaluator.py create mode 100644 benchmarks/mem0_adapter.py create mode 100644 benchmarks/memanto_adapter.py create mode 100644 benchmarks/scenario_a.py create mode 100644 benchmarks/scenario_b.py create mode 100644 datasets/persona_evolution.json create mode 100644 datasets/technical_logs.json create mode 100644 requirements.txt create mode 100644 run_benchmark.py create mode 100644 tests/test_adapters.py diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py new file mode 100644 index 00000000..4d6e5fe2 --- /dev/null +++ b/benchmarks/__init__.py @@ -0,0 +1,16 @@ +""" +Memory framework adapters for benchmarking. +Each adapter implements the MemoryAdapter interface. +""" + +from .base import MemoryAdapter, MemoryResult, BenchmarkMetric +from .memanto_adapter import MemantoAdapter +from .mem0_adapter import Mem0Adapter + +__all__ = [ + "MemoryAdapter", + "MemoryResult", + "BenchmarkMetric", + "MemantoAdapter", + "Mem0Adapter", +] diff --git a/benchmarks/base.py b/benchmarks/base.py new file mode 100644 index 00000000..2387e01b --- /dev/null +++ b/benchmarks/base.py @@ -0,0 +1,131 @@ +""" +Base classes for the memory benchmark framework. +""" + +import time +import statistics +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class MemoryResult: + """Result of a memory operation including success status, data, and metrics.""" + """Result from a single memory operation.""" + success: bool + latency_ms: float + tokens_used: int = 0 + data: Any = None + error: str | None = None + + +@dataclass +class BenchmarkMetric: + """Aggregated benchmark metrics for a set of runs.""" + """Aggregated metrics from a benchmark run.""" + framework: str + scenario: str + total_store_calls: int = 0 + total_retrieve_calls: int = 0 + total_store_tokens: int = 0 + total_retrieve_tokens: int = 0 + store_latencies: list[float] = field(default_factory=list) + retrieve_latencies: list[float] = field(default_factory=list) + retrieval_scores: list[float] = field(default_factory=list) + errors: int = 0 + + @property + def store_p95_latency(self) -> float: + if not self.store_latencies: + return 0.0 + sorted_l = sorted(self.store_latencies) + idx = int(len(sorted_l) * 0.95) + return sorted_l[min(idx, len(sorted_l) - 1)] + + @property + def retrieve_p95_latency(self) -> float: + if not self.retrieve_latencies: + return 0.0 + sorted_l = sorted(self.retrieve_latencies) + idx = int(len(sorted_l) * 0.95) + return sorted_l[min(idx, len(sorted_l) - 1)] + + @property + def mean_retrieval_accuracy(self) -> float: + if not self.retrieval_scores: + return 0.0 + return statistics.mean(self.retrieval_scores) + + def to_dict(self) -> dict: + return { + "framework": self.framework, + "scenario": self.scenario, + "total_store_calls": self.total_store_calls, + "total_retrieve_calls": self.total_retrieve_calls, + "total_store_tokens": self.total_store_tokens, + "total_retrieve_tokens": self.total_retrieve_tokens, + "store_p95_latency_ms": round(self.store_p95_latency, 2), + "retrieve_p95_latency_ms": round(self.retrieve_p95_latency, 2), + "mean_store_latency_ms": round( + statistics.mean(self.store_latencies), 2 + ) if self.store_latencies else 0, + "mean_retrieve_latency_ms": round( + statistics.mean(self.retrieve_latencies), 2 + ) if self.retrieve_latencies else 0, + "retrieval_accuracy": round(self.mean_retrieval_accuracy, 4), + "errors": self.errors, + } + + +class MemoryAdapter(ABC): + """Abstract base class for memory framework adapters.""" + """Abstract interface for memory framework adapters.""" + + @property + @abstractmethod + def name(self) -> str: + """Framework name.""" + ... + + @abstractmethod + def setup(self, user_id: str) -> None: + """Initialize the memory store for a user.""" + ... + + @abstractmethod + def store(self, content: str, metadata: dict | None = None) -> MemoryResult: + """Store a memory and return metrics.""" + ... + + @abstractmethod + def retrieve(self, query: str, limit: int = 5) -> MemoryResult: + """Retrieve memories matching a query.""" + ... + + @abstractmethod + def update(self, memory_id: str, content: str) -> MemoryResult: + """Update an existing memory.""" + ... + + @abstractmethod + def delete(self, memory_id: str) -> MemoryResult: + """Delete a memory.""" + ... + + @abstractmethod + def get_all(self) -> MemoryResult: + """Get all stored memories.""" + ... + + @abstractmethod + def cleanup(self) -> None: + """Clean up resources.""" + ... + + def timed_call(self, fn, *args, **kwargs) -> tuple[float, Any]: + """Time a function call and return (latency_ms, result).""" + start = time.perf_counter() + result = fn(*args, **kwargs) + elapsed = (time.perf_counter() - start) * 1000 + return elapsed, result diff --git a/benchmarks/evaluator.py b/benchmarks/evaluator.py new file mode 100644 index 00000000..e3f75ec7 --- /dev/null +++ b/benchmarks/evaluator.py @@ -0,0 +1,86 @@ +""" +LLM-as-a-Judge evaluator for retrieval accuracy. +""" + +import os +from openai import OpenAI + + +JUDGE_SYSTEM_PROMPT = """You are an expert evaluator for AI memory systems. +You will be given: +1. A QUERY that was used to search a memory system +2. A GOLDEN ANSWER (the ideal/correct response) +3. A set of RETRIEVED MEMORIES from the system + +Score the retrieval quality on a scale from 0.0 to 1.0: +- 1.0: Retrieved memories fully contain the golden answer information +- 0.7-0.9: Retrieved memories mostly contain relevant info, minor gaps +- 0.4-0.6: Partial match, some relevant info but significant gaps +- 0.1-0.3: Poor match, mostly irrelevant +- 0.0: Completely irrelevant or no useful information + +Respond with ONLY a JSON object: {"score": , "reasoning": ""}""" + + +class LLMEvaluator: + """Evaluates retrieval quality using LLM-as-a-judge with keyword fallback.""" + """Evaluates retrieval accuracy using an LLM judge.""" + + def __init__(self, model: str | None = None, api_key: str | None = None): + key = api_key or os.environ.get("OPENAI_API_KEY", "") + self.model = model or os.environ.get("JUDGE_MODEL", "gpt-4o") + self.client = OpenAI(api_key=key) if key else None + + def score_retrieval( + self, + query: str, + golden_answer: str, + retrieved_memories: list[str], + ) -> tuple[float, str]: + """Score a retrieval against a golden answer. Returns (score, reasoning).""" + if not self.client: + # Fallback: simple keyword overlap scoring + return self._keyword_score(golden_answer, retrieved_memories) + + memories_text = "\n---\n".join( + f"Memory {i+1}: {m}" for i, m in enumerate(retrieved_memories) + ) + user_prompt = f"""QUERY: {query} + +GOLDEN ANSWER: {golden_answer} + +RETRIEVED MEMORIES: +{memories_text}""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": JUDGE_SYSTEM_PROMPT}, + {"role": "user", "content": user_prompt}, + ], + temperature=0.0, + max_tokens=200, + response_format={"type": "json_object"}, + ) + import json + content = response.choices[0].message.content + parsed = json.loads(content) + return float(parsed.get("score", 0.0)), parsed.get("reasoning", "") + except Exception as e: + return self._keyword_score(golden_answer, retrieved_memories) + + @staticmethod + def _keyword_score( + golden: str, memories: list[str] + ) -> tuple[float, str]: + """Fallback keyword-overlap scoring when no LLM judge is available.""" + golden_words = set(golden.lower().split()) + if not golden_words: + return 0.0, "Empty golden answer" + + all_memory_text = " ".join(memories).lower() + memory_words = set(all_memory_text.split()) + overlap = golden_words & memory_words + score = len(overlap) / len(golden_words) if golden_words else 0.0 + return min(score, 1.0), f"Keyword overlap: {len(overlap)}/{len(golden_words)}" diff --git a/benchmarks/mem0_adapter.py b/benchmarks/mem0_adapter.py new file mode 100644 index 00000000..394f2086 --- /dev/null +++ b/benchmarks/mem0_adapter.py @@ -0,0 +1,165 @@ +""" +Mem0 framework adapter for benchmarking. + +Uses the mem0ai Python package. +Requires: MEM0_API_KEY environment variable. +""" + +import os +from .base import MemoryAdapter, MemoryResult + + +class Mem0Adapter(MemoryAdapter): + """Adapter for the Mem0 memory framework.""" + + def __init__(self): + """Initialise the Mem0 adapter with empty client and user ID.""" + self._client = None + self._user_id = None + + def _ensure_setup(self) -> None: + """Raise RuntimeError if setup() was not called. + + Raises: + RuntimeError: If the client has not been initialised via setup(). + """ + if self._client is None: + raise RuntimeError( + f"{self.name} adapter: setup() must be called before operations" + ) + + @property + def name(self) -> str: + """Return the display name of this adapter. + + Returns: + str: The string "Mem0". + """ + return "Mem0" + + def setup(self, user_id: str) -> None: + """Configure the adapter with a user ID and initialise the Mem0 client. + + Args: + user_id: The user identifier to associate memories with. + + Raises: + ValueError: If the MEM0_API_KEY environment variable is not set. + """ + api_key = os.environ.get("MEM0_API_KEY", "") + if not api_key: + raise ValueError("MEM0_API_KEY environment variable is required") + from mem0 import MemoryClient + self._client = MemoryClient(api_key=api_key) + self._user_id = user_id + + def store(self, content: str, metadata: dict | None = None) -> MemoryResult: + """Store a memory entry via the Mem0 API. + + Args: + content: The text content to store. + metadata: Optional metadata dictionary to attach to the memory. + + Returns: + MemoryResult: Result indicating success or failure, with token estimate. + """ + self._ensure_setup() + try: + meta = metadata or {} + result = self._client.add(content, user_id=self._user_id, metadata=meta) + tokens = len(content.split()) * 2 + return MemoryResult( + success=True, + latency_ms=0, + tokens_used=tokens, + data=result, + ) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def retrieve(self, query: str, limit: int = 5) -> MemoryResult: + """Search for memories matching a query via the Mem0 API. + + Args: + query: The search query string. + limit: Maximum number of results to return. Defaults to 5. + + Returns: + MemoryResult: Result containing a list of matching memories. + """ + self._ensure_setup() + try: + result = self._client.search(query, user_id=self._user_id, limit=limit) + memories = result if isinstance(result, list) else [result] + total_tokens = sum(len(str(m).split()) * 2 for m in memories) + return MemoryResult( + success=True, + latency_ms=0, + tokens_used=total_tokens, + data=memories, + ) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def update(self, memory_id: str, content: str) -> MemoryResult: + """Update an existing memory entry by ID. + + Args: + memory_id: The identifier of the memory to update. + content: The new text content. + + Returns: + MemoryResult: Result indicating success or failure. + """ + self._ensure_setup() + try: + result = self._client.update(memory_id, content) + return MemoryResult( + success=True, + latency_ms=0, + tokens_used=len(content.split()) * 2, + data=result, + ) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def delete(self, memory_id: str) -> MemoryResult: + """Delete a memory entry by ID. + + Args: + memory_id: The identifier of the memory to delete. + + Returns: + MemoryResult: Result indicating success or failure. + """ + self._ensure_setup() + try: + result = self._client.delete(memory_id) + return MemoryResult(success=True, latency_ms=0, data=result) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def get_all(self) -> MemoryResult: + """Retrieve all memories for the current user. + + Returns: + MemoryResult: Result containing all stored memories. + """ + self._ensure_setup() + try: + result = self._client.get_all(user_id=self._user_id) + return MemoryResult(success=True, latency_ms=0, data=result) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def cleanup(self) -> None: + """Delete all memories for the current user. + + Logs a warning if cleanup fails rather than raising. + """ + self._ensure_setup() + try: + self._client.delete_all(user_id=self._user_id) + except Exception as e: + import logging + logging.warning(f"Cleanup failed: {e}") diff --git a/benchmarks/memanto_adapter.py b/benchmarks/memanto_adapter.py new file mode 100644 index 00000000..4c694423 --- /dev/null +++ b/benchmarks/memanto_adapter.py @@ -0,0 +1,167 @@ +""" +Memanto framework adapter for benchmarking. + +Uses the memanto Python package with Moorcheh backend. +Requires: MOORCHEH_API_KEY environment variable. +""" + +import os +from .base import MemoryAdapter, MemoryResult + + +class MemantoAdapter(MemoryAdapter): + """Adapter for the Memanto memory framework (Moorcheh backend).""" + + def __init__(self): + """Initialise the Memanto adapter with empty client and user ID.""" + self._client = None + self._user_id = None + + def _ensure_setup(self) -> None: + """Raise RuntimeError if setup() was not called. + + Raises: + RuntimeError: If the client has not been initialised via setup(). + """ + if self._client is None: + raise RuntimeError( + f"{self.name} adapter: setup() must be called before operations" + ) + + @property + def name(self) -> str: + """Return the display name of this adapter. + + Returns: + str: The string "Memanto". + """ + return "Memanto" + + def setup(self, user_id: str) -> None: + """Configure the adapter with a user ID and initialise the Memanto client. + + Args: + user_id: The user identifier to associate memories with. + + Raises: + ValueError: If the MOORCHEH_API_KEY environment variable is not set. + """ + api_key = os.environ.get("MOORCHEH_API_KEY", "") + if not api_key: + raise ValueError("MOORCHEH_API_KEY environment variable is required") + from memanto import Memanto + self._client = Memanto(api_key=api_key, user_id=user_id) + self._user_id = user_id + + def store(self, content: str, metadata: dict | None = None) -> MemoryResult: + """Store a memory entry via the Memanto API. + + Args: + content: The text content to store. + metadata: Optional metadata dictionary to attach to the memory. + + Returns: + MemoryResult: Result indicating success or failure, with token count. + """ + self._ensure_setup() + try: + meta = metadata or {} + result = self._client.add(content, metadata=meta) + tokens = getattr(result, "tokens_used", len(content.split()) * 2) + return MemoryResult( + success=True, + latency_ms=0, # measured externally via timed_call + tokens_used=tokens, + data=result, + ) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def retrieve(self, query: str, limit: int = 5) -> MemoryResult: + """Search for memories matching a query via the Memanto API. + + Args: + query: The search query string. + limit: Maximum number of results to return. Defaults to 5. + + Returns: + MemoryResult: Result containing a list of matching memories. + """ + self._ensure_setup() + try: + result = self._client.search(query, limit=limit) + memories = result if isinstance(result, list) else [result] + total_tokens = sum( + len(str(m).split()) * 2 for m in memories + ) + return MemoryResult( + success=True, + latency_ms=0, + tokens_used=total_tokens, + data=memories, + ) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def update(self, memory_id: str, content: str) -> MemoryResult: + """Update an existing memory entry by ID. + + Args: + memory_id: The identifier of the memory to update. + content: The new text content. + + Returns: + MemoryResult: Result indicating success or failure. + """ + self._ensure_setup() + try: + result = self._client.update(memory_id, content) + return MemoryResult( + success=True, + latency_ms=0, + tokens_used=len(content.split()) * 2, + data=result, + ) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def delete(self, memory_id: str) -> MemoryResult: + """Delete a memory entry by ID. + + Args: + memory_id: The identifier of the memory to delete. + + Returns: + MemoryResult: Result indicating success or failure. + """ + self._ensure_setup() + try: + result = self._client.delete(memory_id) + return MemoryResult(success=True, latency_ms=0, data=result) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def get_all(self) -> MemoryResult: + """Retrieve all memories for the current user. + + Returns: + MemoryResult: Result containing all stored memories. + """ + self._ensure_setup() + try: + result = self._client.get_all() + return MemoryResult(success=True, latency_ms=0, data=result) + except Exception as e: + return MemoryResult(success=False, latency_ms=0, error=str(e)) + + def cleanup(self) -> None: + """Delete all memories for the current user. + + Logs a warning if cleanup fails instead of silently ignoring. + """ + self._ensure_setup() + try: + self._client.delete_all() + except Exception as e: + import logging + logging.warning(f"Cleanup failed: {e}") diff --git a/benchmarks/scenario_a.py b/benchmarks/scenario_a.py new file mode 100644 index 00000000..4bb7cb1d --- /dev/null +++ b/benchmarks/scenario_a.py @@ -0,0 +1,124 @@ +""" +Scenario A: Context-Overhead & Latency Sprint + +Feeds dense, shifting technical logs into each memory framework. +Measures token consumption per conversation turn and retrieval latency. + +Goal: Test if Memanto's active compression prevents massive token inflation +and post-ingestion delays seen in graph-based memory systems. +""" + +import json +import os +from pathlib import Path +from .base import MemoryAdapter, BenchmarkMetric +from .evaluator import LLMEvaluator + + +DATASET_PATH = Path(__file__).parent.parent / "datasets" / "technical_logs.json" + + +def load_dataset() -> list[dict]: + """Load the technical logs dataset.""" + with open(DATASET_PATH) as f: + return json.load(f) + + +def run_scenario_a( + adapter: MemoryAdapter, + evaluator: LLMEvaluator, + user_id: str = "benchmark_user_a", + dry_run: bool = False, +) -> BenchmarkMetric: + """Run Scenario A against a memory adapter. + + Args: + adapter: The memory framework adapter to test. + evaluator: LLM judge for scoring retrieval accuracy. + user_id: Unique user ID for this benchmark run. + dry_run: If True, use mock data instead of real API calls. + + Returns: + BenchmarkMetric with aggregated results. + """ + dataset = load_dataset() + metrics = BenchmarkMetric( + framework=adapter.name, + scenario="A: Context-Overhead & Latency Sprint", + ) + + if not dry_run: + adapter.setup(user_id) + + try: + # Phase 1: Ingestion — store all log entries + for entry in dataset: + content = entry["content"] + metadata = entry.get("metadata", {}) + + if dry_run: + latency = 50.0 + tokens = len(content.split()) * 2 + success = True + else: + latency, result = adapter.timed_call( + adapter.store, content, metadata + ) + success = result.success + tokens = result.tokens_used + if not success: + metrics.errors += 1 + continue + + metrics.total_store_calls += 1 + metrics.total_store_tokens += tokens + metrics.store_latencies.append(latency) + + # Phase 2: Retrieval — query for specific information + queries = [ + q for entry in dataset + for q in entry.get("retrieval_queries", []) + ] + if not queries: + queries = [entry["content"][:100] for entry in dataset[:5]] + + for query_item in queries: + if isinstance(query_item, dict): + query = query_item.get("query", "") + golden = query_item.get("golden_answer", "") + else: + query = str(query_item) + golden = "" + + if dry_run: + latency = 30.0 + tokens = 200 + retrieved_texts = ["[mock retrieved memory]"] + success = True + else: + latency, result = adapter.timed_call(adapter.retrieve, query) + success = result.success + tokens = result.tokens_used + if not success: + metrics.errors += 1 + continue + retrieved = result.data or [] + retrieved_texts = [ + str(m) if not isinstance(m, dict) + else m.get("memory", m.get("text", str(m))) + for m in retrieved + ] + + metrics.total_retrieve_calls += 1 + metrics.total_retrieve_tokens += tokens + metrics.retrieve_latencies.append(latency) + + if golden: + score, _ = evaluator.score_retrieval(query, golden, retrieved_texts) + metrics.retrieval_scores.append(score) + + finally: + if not dry_run: + adapter.cleanup() + + return metrics diff --git a/benchmarks/scenario_b.py b/benchmarks/scenario_b.py new file mode 100644 index 00000000..771e3678 --- /dev/null +++ b/benchmarks/scenario_b.py @@ -0,0 +1,123 @@ +""" +Scenario B: Shifting Persona & Temporal Tracking Test + +Simulates an evolving user with dynamically changing preferences. +Measures preference retention accuracy and temporal awareness. + +Goal: Test if Memanto effectively flags out-of-date states and surfaces +current nuances without polluting the active context window. +""" + +import json +from pathlib import Path +from .base import MemoryAdapter, BenchmarkMetric +from .evaluator import LLMEvaluator + + +DATASET_PATH = Path(__file__).parent.parent / "datasets" / "persona_evolution.json" + + +def load_dataset() -> list[dict]: + """Load the persona evolution dataset.""" + with open(DATASET_PATH) as f: + return json.load(f) + + +def run_scenario_b( + adapter: MemoryAdapter, + evaluator: LLMEvaluator, + user_id: str = "benchmark_user_b", + dry_run: bool = False, +) -> BenchmarkMetric: + """Run Scenario B against a memory adapter. + + Simulates multiple sessions where user preferences evolve and contradict. + After ingestion, tests whether the system surfaces the LATEST preferences. + + Args: + adapter: The memory framework adapter to test. + evaluator: LLM judge for scoring retrieval accuracy. + user_id: Unique user ID for this benchmark run. + dry_run: If True, use mock data instead of real API calls. + + Returns: + BenchmarkMetric with aggregated results. + """ + dataset = load_dataset() + metrics = BenchmarkMetric( + framework=adapter.name, + scenario="B: Shifting Persona & Temporal Tracking", + ) + + if not dry_run: + adapter.setup(user_id) + + try: + # Phase 1: Ingest persona evolution across sessions + for session in dataset: + session_id = session["session_id"] + messages = session["messages"] + + for msg in messages: + content = msg["content"] + metadata = { + "session_id": session_id, + "timestamp": msg.get("timestamp", ""), + "role": msg.get("role", "user"), + } + + if dry_run: + latency = 45.0 + tokens = len(content.split()) * 2 + success = True + else: + latency, result = adapter.timed_call( + adapter.store, content, metadata + ) + success = result.success + tokens = result.tokens_used + if not success: + metrics.errors += 1 + continue + + metrics.total_store_calls += 1 + metrics.total_store_tokens += tokens + metrics.store_latencies.append(latency) + + # Phase 2: Query for current preferences (should get latest) + for session in dataset: + for query_item in session.get("evaluation_queries", []): + query = query_item["query"] + golden = query_item["golden_answer"] + + if dry_run: + latency = 25.0 + tokens = 150 + retrieved_texts = ["[mock current preference]"] + success = True + else: + latency, result = adapter.timed_call(adapter.retrieve, query) + success = result.success + tokens = result.tokens_used + if not success: + metrics.errors += 1 + continue + retrieved = result.data or [] + retrieved_texts = [ + str(m) if not isinstance(m, dict) + else m.get("memory", m.get("text", str(m))) + for m in retrieved + ] + + metrics.total_retrieve_calls += 1 + metrics.total_retrieve_tokens += tokens + metrics.retrieve_latencies.append(latency) + + score, _ = evaluator.score_retrieval(query, golden, retrieved_texts) + metrics.retrieval_scores.append(score) + + finally: + if not dry_run: + adapter.cleanup() + + return metrics diff --git a/datasets/persona_evolution.json b/datasets/persona_evolution.json new file mode 100644 index 00000000..6d6189af --- /dev/null +++ b/datasets/persona_evolution.json @@ -0,0 +1,61 @@ +[ + { + "session_id": "session_01", + "timestamp": "2026-03-01", + "messages": [ + {"role": "user", "content": "I love Italian food, especially pasta carbonara. I eat it every Friday.", "timestamp": "2026-03-01T10:00:00Z"}, + {"role": "user", "content": "My favorite movie genre is action. I just watched all the John Wick movies again.", "timestamp": "2026-03-01T10:05:00Z"}, + {"role": "user", "content": "I work as a data engineer at a fintech startup. I use Python and Spark daily.", "timestamp": "2026-03-01T10:10:00Z"}, + {"role": "user", "content": "I prefer dark mode for everything. Light mode hurts my eyes.", "timestamp": "2026-03-01T10:15:00Z"} + ], + "evaluation_queries": [ + {"query": "What is the user's favorite food?", "golden_answer": "Italian food, especially pasta carbonara"}, + {"query": "What movie genre does the user prefer?", "golden_answer": "Action movies, particularly John Wick"}, + {"query": "What is the user's job?", "golden_answer": "Data engineer at a fintech startup using Python and Spark"} + ] + }, + { + "session_id": "session_02", + "timestamp": "2026-03-15", + "messages": [ + {"role": "user", "content": "Actually, I've been trying Japanese food lately. Ramen is becoming my new obsession. I might switch from Italian.", "timestamp": "2026-03-15T14:00:00Z"}, + {"role": "user", "content": "I just got promoted to Senior Data Engineer! Still at the same fintech but now I'm also managing the ML pipeline team.", "timestamp": "2026-03-15T14:10:00Z"}, + {"role": "user", "content": "I've been watching a lot of sci-fi recently. Interstellar and Dune are my new favorites. Action movies feel repetitive now.", "timestamp": "2026-03-15T14:20:00Z"} + ], + "evaluation_queries": [ + {"query": "What food does the user currently prefer?", "golden_answer": "Japanese food, especially ramen — switching from Italian"}, + {"query": "What is the user's current job role?", "golden_answer": "Senior Data Engineer at a fintech startup, managing the ML pipeline team"}, + {"query": "What movie genre does the user currently enjoy?", "golden_answer": "Sci-fi, favorites are Interstellar and Dune — finds action movies repetitive now"} + ] + }, + { + "session_id": "session_03", + "timestamp": "2026-04-10", + "messages": [ + {"role": "user", "content": "Big life update: I quit my job at the fintech startup! I'm now a freelance AI consultant. Working with multiple clients on LLM fine-tuning and RAG systems.", "timestamp": "2026-04-10T09:00:00Z"}, + {"role": "user", "content": "Also, I've gone vegetarian. No more ramen with pork broth — I'm exploring vegetable-based ramen now. Thai green curry is my new comfort food.", "timestamp": "2026-04-10T09:15:00Z"}, + {"role": "user", "content": "I switched to light mode actually! Got a new monitor with great color accuracy and light mode works better in my bright office.", "timestamp": "2026-04-10T09:20:00Z"}, + {"role": "user", "content": "For entertainment, I'm really into documentaries now. Especially nature documentaries like Planet Earth. Don't have time for long movies anymore.", "timestamp": "2026-04-10T09:30:00Z"} + ], + "evaluation_queries": [ + {"query": "What is the user's current career?", "golden_answer": "Freelance AI consultant working on LLM fine-tuning and RAG systems with multiple clients"}, + {"query": "What are the user's current dietary preferences?", "golden_answer": "Vegetarian — prefers vegetable-based ramen and Thai green curry as comfort food"}, + {"query": "What is the user's UI theme preference?", "golden_answer": "Light mode — switched from dark mode because of new monitor with good color accuracy in bright office"}, + {"query": "What entertainment does the user currently enjoy?", "golden_answer": "Nature documentaries like Planet Earth — doesn't have time for long movies anymore"} + ] + }, + { + "session_id": "session_04", + "timestamp": "2026-05-20", + "messages": [ + {"role": "user", "content": "Update on the career front: my freelance consulting is going really well. I specialize in agentic AI systems now — memory frameworks, tool-use orchestration, and multi-agent architectures. Dropped the RAG work.", "timestamp": "2026-05-20T11:00:00Z"}, + {"role": "user", "content": "Food update: I'm no longer strictly vegetarian. I follow a pescatarian diet now. Still love Thai food though. My weekly routine is sushi on Mondays and Thai curry on Wednesdays.", "timestamp": "2026-05-20T11:15:00Z"}, + {"role": "user", "content": "I'm learning Rust in my spare time. Still use Python for AI work but want to build high-performance tools in Rust.", "timestamp": "2026-05-20T11:20:00Z"} + ], + "evaluation_queries": [ + {"query": "What is the user's current professional specialization?", "golden_answer": "Freelance AI consultant specializing in agentic AI systems — memory frameworks, tool-use orchestration, multi-agent architectures. No longer doing RAG."}, + {"query": "What is the user's current diet?", "golden_answer": "Pescatarian — sushi on Mondays, Thai curry on Wednesdays"}, + {"query": "What programming languages does the user use?", "golden_answer": "Python for AI work, learning Rust for high-performance tools"} + ] + } +] diff --git a/datasets/technical_logs.json b/datasets/technical_logs.json new file mode 100644 index 00000000..88851e71 --- /dev/null +++ b/datasets/technical_logs.json @@ -0,0 +1,92 @@ +[ + { + "id": "log_001", + "content": "2026-05-15 09:23:11 [ERROR] Service mesh failure: Istio sidecar proxy on pod `payment-service-7b9d4` failed to establish mTLS connection to `fraud-detection-v2`. Certificate rotation triggered but new cert rejected by intermediate CA. Root cause: expired cert chain in Vault PKI secrets engine. Resolution: forced cert rotation via `vault write pki/issue/mesh-role common_name=payment-service` and restarted sidecar.", + "metadata": {"severity": "error", "service": "payment-service", "tags": ["istio", "mtls", "vault", "certificate"]}, + "retrieval_queries": [ + {"query": "What caused the payment service outage on May 15?", "golden_answer": "Istio sidecar proxy failed mTLS connection due to expired certificate chain in Vault PKI secrets engine"}, + {"query": "How was the certificate issue resolved?", "golden_answer": "Forced cert rotation via vault write pki/issue/mesh-role and restarted sidecar"} + ] + }, + { + "id": "log_002", + "content": "2026-05-15 10:45:33 [WARN] Database connection pool exhaustion detected on `user-db-primary`. Pool size: 200/200. 47 connections idle > 300s. Root cause: connection leak in ORM batch insert path — Session objects not properly closed in try/finally blocks in `UserService.bulk_create_users()`. Hotfix: added session.close() in finally clause and reduced pool_timeout from 30s to 10s.", + "metadata": {"severity": "warning", "service": "user-service", "tags": ["database", "connection-pool", "orm"]}, + "retrieval_queries": [ + {"query": "What was the database connection pool issue?", "golden_answer": "Connection pool exhaustion at 200/200 with 47 idle connections due to ORM session leak in bulk_create_users()"}, + {"query": "What was the hotfix for the connection leak?", "golden_answer": "Added session.close() in finally clause and reduced pool_timeout from 30s to 10s"} + ] + }, + { + "id": "log_003", + "content": "2026-05-16 14:12:05 [CRITICAL] Kafka consumer lag spike: consumer group `order-processor` lag reached 2.3M messages on topic `orders.created`. Partition rebalancing triggered by consumer restart cascade. Cause: JVM heap pressure (GC pause 12s) on consumer instances due to unbounded in-memory aggregation in `OrderEnrichmentProcessor`. Fix: switched to RocksDB-backed state store and added max.poll.records=100 limit.", + "metadata": {"severity": "critical", "service": "order-processor", "tags": ["kafka", "consumer-lag", "jvm", "gc"]}, + "retrieval_queries": [ + {"query": "What caused the Kafka consumer lag spike?", "golden_answer": "JVM heap pressure with 12s GC pauses from unbounded in-memory aggregation in OrderEnrichmentProcessor"}, + {"query": "How was the Kafka lag issue fixed?", "golden_answer": "Switched to RocksDB-backed state store and added max.poll.records=100 limit"} + ] + }, + { + "id": "log_004", + "content": "2026-05-17 08:33:22 [INFO] Performance regression detected in search API: p99 latency increased from 120ms to 890ms after deploying commit `a3f7c2d`. Profiling revealed N+1 query pattern in `SearchService.hybrid_search()` — each semantic result triggered an individual SQL lookup for metadata enrichment. Fix: batch metadata fetch using `WHERE id IN (...)` with chunked queries (500 IDs per batch).", + "metadata": {"severity": "info", "service": "search-api", "tags": ["performance", "n-plus-1", "sql", "latency"]}, + "retrieval_queries": [ + {"query": "Why did the search API latency spike?", "golden_answer": "N+1 query pattern in hybrid_search() where each semantic result triggered individual SQL lookups for metadata"}, + {"query": "What was the search API optimization?", "golden_answer": "Batch metadata fetch using WHERE id IN with chunked queries of 500 IDs per batch"} + ] + }, + { + "id": "log_005", + "content": "2026-05-18 16:55:44 [ERROR] Redis cluster failover: node `redis-3` OOM killed at 98% memory. Sentinel promoted replica but 12s failover window caused 340 timeout errors in rate limiter. Root cause: unbounded key growth in session cache — no TTL set on `session:*` keys after migration from Memcached. Fix: added EXPIRE 3600 on all session keys and implemented LRU eviction policy with maxmemory 8gb.", + "metadata": {"severity": "error", "service": "rate-limiter", "tags": ["redis", "oom", "failover", "session-cache"]}, + "retrieval_queries": [ + {"query": "What caused the Redis OOM and failover?", "golden_answer": "Unbounded key growth in session cache with no TTL on session:* keys after migration from Memcached"}, + {"query": "How was the Redis memory issue resolved?", "golden_answer": "Added EXPIRE 3600 on session keys and implemented LRU eviction with maxmemory 8gb"} + ] + }, + { + "id": "log_006", + "content": "2026-05-19 11:20:01 [WARN] Kubernetes HPA misconfiguration: deployment `recommendation-engine` scaled from 3 to 48 pods in 2 minutes due to CPU metric stuck at 95%. Root cause: custom metrics adapter caching stale values from Prometheus. The adapter was querying a 5-minute window but HPA evaluated every 15s. Fix: reduced metricsWindow to 30s and added stabilizationWindowSeconds: 60 to HPA spec.", + "metadata": {"severity": "warning", "service": "recommendation-engine", "tags": ["kubernetes", "hpa", "autoscaling", "prometheus"]}, + "retrieval_queries": [ + {"query": "Why did recommendation-engine scale to 48 pods?", "golden_answer": "Custom metrics adapter cached stale Prometheus values — 5-minute query window vs 15s HPA evaluation"}, + {"query": "What was the HPA scaling fix?", "golden_answer": "Reduced metricsWindow to 30s and added stabilizationWindowSeconds: 60"} + ] + }, + { + "id": "log_007", + "content": "2026-05-20 03:44:17 [CRITICAL] Data pipeline corruption: Spark job `feature-store-backfill` wrote NaN values to feature store after upstream schema change in `clickstream-events` topic. New field `engagement_score` (nullable float) was cast without null-check. 2.1M corrupted feature vectors propagated to ML serving layer before circuit breaker tripped. Fix: added schema validation with Avro schema registry and null-coalescing defaults.", + "metadata": {"severity": "critical", "service": "feature-store", "tags": ["spark", "data-corruption", "schema", "ml"]}, + "retrieval_queries": [ + {"query": "What caused the feature store data corruption?", "golden_answer": "Spark job wrote NaN values after upstream schema change — new nullable float field cast without null-check"}, + {"query": "How was the data pipeline corruption prevented from recurring?", "golden_answer": "Added Avro schema registry validation and null-coalescing defaults"} + ] + }, + { + "id": "log_008", + "content": "2026-05-21 19:08:55 [ERROR] API gateway rate limit bypass: malicious client using rotating IP addresses from AWS us-east-1 exploited per-IP rate limiting. Traffic spike: 45K RPS to `/api/v2/auth/login` endpoint. Bot detection failed because requests mimicked valid browser fingerprints. Fix: implemented sliding window rate limit on user-agent+accept-language hash, added Cloudflare Bot Management, and deployed WAF rules for credential stuffing patterns.", + "metadata": {"severity": "error", "service": "api-gateway", "tags": ["security", "rate-limit", "bot", "waf"]}, + "retrieval_queries": [ + {"query": "How was the API gateway rate limit bypassed?", "golden_answer": "Malicious client used rotating IPs from AWS and mimicked valid browser fingerprints to bypass per-IP rate limiting"}, + {"query": "What security measures were added to prevent the attack?", "golden_answer": "Sliding window rate limit on user-agent+accept-language hash, Cloudflare Bot Management, and WAF rules for credential stuffing"} + ] + }, + { + "id": "log_009", + "content": "2026-05-22 07:15:33 [INFO] Microservice mesh circular dependency detected: Service A (auth) → Service B (user-profile) → Service C (notification) → Service A (auth). During Service C deployment, cascading timeout caused 503s across all three services for 8 minutes. Root cause: synchronous gRPC calls in deployment readiness path. Fix: introduced async event bus for notification triggers and added circuit breakers with 3s timeout on all inter-service gRPC calls.", + "metadata": {"severity": "info", "service": "auth", "tags": ["microservices", "circular-dependency", "grpc", "circuit-breaker"]}, + "retrieval_queries": [ + {"query": "What circular dependency caused the cascading 503s?", "golden_answer": "Auth → user-profile → notification → auth circular dependency with synchronous gRPC calls"}, + {"query": "How was the circular dependency resolved?", "golden_answer": "Async event bus for notifications and circuit breakers with 3s timeout on inter-service gRPC"} + ] + }, + { + "id": "log_010", + "content": "2026-05-23 22:30:09 [WARN] Observability gap: distributed tracing spans missing for 60% of requests through `inventory-service`. Root cause: OpenTelemetry SDK sampling rate set to 0.4 in production config (copy-paste from staging). Additionally, trace context propagation headers dropped by custom HTTP middleware that didn't forward `traceparent` and `tracestate` headers. Fix: set sampling to parentbased_always_on and patched middleware to propagate W3C trace context.", + "metadata": {"severity": "warning", "service": "inventory-service", "tags": ["observability", "tracing", "opentelemetry", "middleware"]}, + "retrieval_queries": [ + {"query": "Why were distributed tracing spans missing for inventory-service?", "golden_answer": "Sampling rate 0.4 from staging config and custom middleware dropping traceparent/tracestate headers"}, + {"query": "How was the tracing gap fixed?", "golden_answer": "Set sampling to parentbased_always_on and patched middleware for W3C trace context propagation"} + ] + } +] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..2c81689c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +memanto>=0.2.0 +mem0ai>=2.0.0b2 +openai>=1.30.0 +python-dotenv>=1.0.0 +numpy>=1.24.0 +tabulate>=0.9.0 +jinja2>=3.1.0 +pytest>=8.0.0 diff --git a/run_benchmark.py b/run_benchmark.py new file mode 100644 index 00000000..0d854116 --- /dev/null +++ b/run_benchmark.py @@ -0,0 +1,375 @@ +#!/usr/bin/env python3 +""" +The Great Agentic Memory Showdown — Main Benchmark Runner + +Compares Memanto vs Mem0 across two scenarios: + A) Context-Overhead & Latency Sprint (technical logs) + B) Shifting Persona & Temporal Tracking (evolving preferences) + +Usage: + python run_benchmark.py # Full benchmark (requires API keys) + python run_benchmark.py --dry-run # Dry run with mock data (no API keys needed) + python run_benchmark.py --scenario a # Run only Scenario A + python run_benchmark.py --scenario b # Run only Scenario B +""" + +import argparse +import json +import os +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +from dotenv import load_dotenv + +# Add project root to path +sys.path.insert(0, str(Path(__file__).parent)) + +from benchmarks.base import BenchmarkMetric +from benchmarks.memanto_adapter import MemantoAdapter +from benchmarks.mem0_adapter import Mem0Adapter +from benchmarks.evaluator import LLMEvaluator +from benchmarks.scenario_a import run_scenario_a +from benchmarks.scenario_b import run_scenario_b + + +REPORTS_DIR = Path(__file__).parent / "reports" + + +def run_benchmark( + scenario: str = "all", + num_runs: int = 3, + dry_run: bool = False, +) -> list[dict]: + """Run the full benchmark suite. + + Args: + scenario: "a", "b", or "all" + num_runs: Number of runs per scenario for statistical aggregation + dry_run: If True, use mock data + + Returns: + List of result dicts for all runs. + """ + evaluator = LLMEvaluator() + adapters = [MemantoAdapter(), Mem0Adapter()] + + scenarios = [] + if scenario in ("a", "all"): + scenarios.append(("a", run_scenario_a)) + if scenario in ("b", "all"): + scenarios.append(("b", run_scenario_b)) + + all_results = [] + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + + for run_idx in range(num_runs): + for scenario_key, run_fn in scenarios: + for adapter in adapters: + user_id = f"bench_{adapter.name}_{scenario_key}_run{run_idx}" + print( + f" [{run_idx+1}/{num_runs}] {adapter.name} — " + f"Scenario {scenario_key.upper()}...", + end=" ", + flush=True, + ) + + start = time.perf_counter() + try: + metrics = run_fn(adapter, evaluator, user_id, dry_run=dry_run) + elapsed = time.perf_counter() - start + print(f"✓ ({elapsed:.1f}s)") + except Exception as e: + print(f"✗ Error: {e}") + metrics = BenchmarkMetric( + framework=adapter.name, + scenario=f"{scenario_key}: Error", + ) + metrics.errors += 1 + + result = metrics.to_dict() + result["run_index"] = run_idx + result["timestamp"] = timestamp + all_results.append(result) + + return all_results + + +def generate_console_report(results: list[dict]) -> None: + """Print a comparison table to the console.""" + from tabulate import tabulate + + # Group by scenario, aggregate across runs + aggregated = {} + for r in results: + key = (r["framework"], r["scenario"]) + if key not in aggregated: + aggregated[key] = { + "framework": r["framework"], + "scenario": r["scenario"], + "store_tokens": [], + "retrieve_tokens": [], + "store_p95": [], + "retrieve_p95": [], + "accuracy": [], + "errors": [], + } + agg = aggregated[key] + agg["store_tokens"].append(r["total_store_tokens"]) + agg["retrieve_tokens"].append(r["total_retrieve_tokens"]) + agg["store_p95"].append(r["store_p95_latency_ms"]) + agg["retrieve_p95"].append(r["retrieve_p95_latency_ms"]) + agg["accuracy"].append(r["retrieval_accuracy"]) + agg["errors"].append(r["errors"]) + + import numpy as np + + table = [] + for key, agg in aggregated.items(): + table.append([ + agg["framework"], + agg["scenario"], + f"{int(np.mean(agg['store_tokens'])):,}", + f"{int(np.mean(agg['retrieve_tokens'])):,}", + f"{np.mean(agg['store_p95']):.1f}", + f"{np.mean(agg['retrieve_p95']):.1f}", + f"{np.mean(agg['accuracy']):.3f}", + sum(agg["errors"]), + ]) + + headers = [ + "Framework", "Scenario", "Store Tokens", "Retrieve Tokens", + "Store p95 (ms)", "Retrieve p95 (ms)", "Accuracy", "Errors", + ] + print("\n" + "=" * 100) + print(" THE GREAT AGENTIC MEMORY SHOWDOWN — BENCHMARK RESULTS") + print("=" * 100) + print(tabulate(table, headers=headers, tablefmt="grid")) + print() + + +def generate_json_report(results: list[dict], output_path: Path) -> None: + """Save results to JSON.""" + report = { + "title": "The Great Agentic Memory Showdown — Benchmark Results", + "generated_at": datetime.now(timezone.utc).isoformat(), + "environment": { + "python_version": sys.version, + "platform": sys.platform, + "judge_model": os.environ.get("JUDGE_MODEL", "gpt-4o"), + "benchmark_runs": len(set(r["run_index"] for r in results)), + }, + "results": results, + } + with open(output_path, "w") as f: + json.dump(report, f, indent=2) + print(f"JSON report saved: {output_path}") + + +def generate_html_report(results: list[dict], output_path: Path) -> None: + """Generate an HTML report with embedded charts.""" + from jinja2 import Template + + template = Template(""" + + + +{{ title }} + + + + +

🐜 The Great Agentic Memory Showdown

+

Generated: {{ generated_at }} | Judge Model: {{ judge_model }}

+ +

📊 Results Summary

+
+ + + +{% for r in summary %} + + + + +{% endfor %} +
FrameworkScenarioStore TokensRetrieve TokensStore p95 (ms)Retrieve p95 (ms)AccuracyErrors
{{ r.framework }}{{ r.scenario }}{{ r.store_tokens }}{{ r.retrieve_tokens }}{{ r.store_p95 }}{{ r.retrieve_p95 }}{{ r.accuracy }}{{ r.errors }}
+
+ +

📈 Token Efficiency

+
+ +
+ +

⏱️ Latency (p95)

+
+ +
+ +

🎯 Retrieval Accuracy

+
+ +
+ + +""") + + # Aggregate results + import numpy as np + aggregated = {} + for r in results: + key = (r["framework"], r["scenario"]) + if key not in aggregated: + aggregated[key] = {"store_t": [], "retrieve_t": [], "store_p": [], + "retrieve_p": [], "acc": [], "err": []} + a = aggregated[key] + a["store_t"].append(r["total_store_tokens"]) + a["retrieve_t"].append(r["total_retrieve_tokens"]) + a["store_p"].append(r["store_p95_latency_ms"]) + a["retrieve_p"].append(r["retrieve_p95_latency_ms"]) + a["acc"].append(r["retrieval_accuracy"]) + a["err"].append(r["errors"]) + + summary = [] + labels = [] + store_tokens = [] + retrieve_tokens = [] + store_p95 = [] + retrieve_p95 = [] + accuracy = [] + + for (fw, sc), a in aggregated.items(): + label = f"{fw} — {sc[:20]}" + labels.append(label) + st = int(np.mean(a["store_t"])) + rt = int(np.mean(a["retrieve_t"])) + sp = round(np.mean(a["store_p"]), 1) + rp = round(np.mean(a["retrieve_p"]), 1) + ac = round(np.mean(a["acc"]), 3) + er = sum(a["err"]) + store_tokens.append(st) + retrieve_tokens.append(rt) + store_p95.append(sp) + retrieve_p95.append(rp) + accuracy.append(ac) + summary.append({ + "framework": fw, "scenario": sc, + "store_tokens": f"{st:,}", "retrieve_tokens": f"{rt:,}", + "store_p95": str(sp), "retrieve_p95": str(rp), + "accuracy": str(ac), "errors": er, + }) + + html = template.render( + title="The Great Agentic Memory Showdown", + generated_at=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"), + judge_model=os.environ.get("JUDGE_MODEL", "gpt-4o"), + summary=summary, labels=labels, store_tokens=store_tokens, + retrieve_tokens=retrieve_tokens, store_p95=store_p95, + retrieve_p95=retrieve_p95, accuracy=accuracy, + ) + with open(output_path, "w") as f: + f.write(html) + print(f"HTML report saved: {output_path}") + + +def main(): + parser = argparse.ArgumentParser( + description="The Great Agentic Memory Showdown — Benchmark Runner" + ) + parser.add_argument( + "--scenario", choices=["a", "b", "all"], default="all", + help="Which scenario to run (default: all)", + ) + parser.add_argument( + "--runs", type=int, default=3, + help="Number of runs per scenario (default: 3)", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Run with mock data (no API keys needed)", + ) + args = parser.parse_args() + + load_dotenv() + + print("=" * 60) + print(" 🐜 THE GREAT AGENTIC MEMORY SHOWDOWN") + print("=" * 60) + print(f" Scenario: {args.scenario.upper()}") + print(f" Runs: {args.runs}") + print(f" Dry Run: {args.dry_run}") + print(f" Judge Model: {os.environ.get('JUDGE_MODEL', 'gpt-4o')}") + print() + + REPORTS_DIR.mkdir(exist_ok=True) + + results = run_benchmark( + scenario=args.scenario, + num_runs=args.runs, + dry_run=args.dry_run, + ) + + generate_console_report(results) + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + json_path = REPORTS_DIR / f"benchmark_results_{timestamp}.json" + html_path = REPORTS_DIR / f"benchmark_report_{timestamp}.html" + + generate_json_report(results, json_path) + generate_html_report(results, html_path) + + print("\n✅ Benchmark complete!") + print(f" JSON: {json_path}") + print(f" HTML: {html_path}") + + +if __name__ == "__main__": + main() diff --git a/tests/test_adapters.py b/tests/test_adapters.py new file mode 100644 index 00000000..9c977a43 --- /dev/null +++ b/tests/test_adapters.py @@ -0,0 +1,240 @@ +"""Tests for the benchmark adapter interfaces and evaluator.""" + +import pytest +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from benchmarks.base import MemoryAdapter, MemoryResult, BenchmarkMetric +from benchmarks.evaluator import LLMEvaluator + + +# -- MemoryResult tests -- + +class TestMemoryResult: + """Tests for the MemoryResult dataclass.""" + + def test_success_result(self): + """Test that a successful MemoryResult stores fields correctly.""" + r = MemoryResult(success=True, latency_ms=42.5, tokens_used=100, data="ok") + assert r.success is True + assert r.latency_ms == 42.5 + assert r.tokens_used == 100 + + def test_failure_result(self): + """Test that a failed MemoryResult stores the error message.""" + r = MemoryResult(success=False, latency_ms=0, error="connection refused") + assert r.success is False + assert r.error == "connection refused" + + +# -- BenchmarkMetric tests -- + +class TestBenchmarkMetric: + """Tests for the BenchmarkMetric dataclass.""" + + def test_empty_metric(self): + """Test that a new BenchmarkMetric has zero defaults.""" + m = BenchmarkMetric(framework="Test", scenario="S1") + assert m.store_p95_latency == 0.0 + assert m.retrieve_p95_latency == 0.0 + assert m.mean_retrieval_accuracy == 0.0 + + def test_p95_latency(self): + """Test p95 latency calculation from a list of latencies.""" + m = BenchmarkMetric(framework="Test", scenario="S1") + m.store_latencies = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] + assert m.store_p95_latency == 100.0 # 95th percentile of 10 items + + def test_mean_accuracy(self): + """Test mean retrieval accuracy calculation.""" + m = BenchmarkMetric(framework="Test", scenario="S1") + m.retrieval_scores = [0.8, 0.9, 0.7] + assert abs(m.mean_retrieval_accuracy - 0.8) < 0.01 + + def test_to_dict(self): + """Test conversion of BenchmarkMetric to a dictionary.""" + m = BenchmarkMetric(framework="Test", scenario="S1") + m.store_latencies = [10, 20] + d = m.to_dict() + assert d["framework"] == "Test" + assert d["scenario"] == "S1" + assert "store_p95_latency_ms" in d + assert "retrieval_accuracy" in d + + +# -- LLMEvaluator tests -- + +class TestLLMEvaluator: + """Tests for the LLMEvaluator scoring logic.""" + + def test_keyword_score_perfect_match(self): + """Test keyword scoring returns 1.0 for full overlap.""" + score, reasoning = LLMEvaluator._keyword_score( + "the cat sat on the mat", + ["the cat sat on the mat and looked around"], + ) + assert score == 1.0 + assert "overlap" in reasoning.lower() + + def test_keyword_score_partial_match(self): + """Test keyword scoring returns a value between 0 and 1 for partial overlap.""" + score, reasoning = LLMEvaluator._keyword_score( + "the quick brown fox jumps over the lazy dog", + ["the fox jumped over a dog"], + ) + assert 0.0 < score < 1.0 + + def test_keyword_score_no_match(self): + """Test keyword scoring returns 0.0 when there is no overlap.""" + score, reasoning = LLMEvaluator._keyword_score( + "machine learning neural network", + ["pizza pasta carbonara recipe"], + ) + assert score == 0.0 + + def test_keyword_score_empty_golden(self): + """Test keyword scoring returns 0.0 for an empty golden answer.""" + score, _ = LLMEvaluator._keyword_score("", ["some text"]) + assert score == 0.0 + + def test_evaluator_without_api_key(self, monkeypatch): + """Test that the evaluator falls back to keyword scoring without an API key.""" + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + evaluator = LLMEvaluator() + assert evaluator.client is None + # Should fall back to keyword scoring + score, _ = evaluator.score_retrieval( + "test query", "test answer", ["test answer is here"] + ) + assert score > 0 + + +# -- Integration: dry-run scenario A -- + +class TestDryRunScenarioA: + """Integration tests for Scenario A in dry-run mode.""" + + def test_dry_run_loads_dataset(self): + """Test that the Scenario A dataset loads with expected structure.""" + from benchmarks.scenario_a import load_dataset + dataset = load_dataset() + assert len(dataset) == 10 + assert "content" in dataset[0] + assert "retrieval_queries" in dataset[0] + + def test_dry_run_scenario_a(self): + """Run Scenario A in dry-run mode with a mock adapter.""" + class MockAdapter(MemoryAdapter): + """Mock adapter that returns canned responses for testing.""" + + @property + def name(self): + """Return the mock adapter name.""" + return "Mock" + + def setup(self, uid): + """No-op setup for the mock adapter.""" + + def store(self, content, metadata=None): + """Return a successful store result with fake latency.""" + + return MemoryResult(True, 5.0, len(content.split())*2) + + def retrieve(self, query, limit=5): + """Return a successful retrieve result with a mock memory.""" + + return MemoryResult(True, 3.0, 100, ["mock memory"]) + + def update(self, mid, content): + """Return a successful update result.""" + + return MemoryResult(True, 5.0) + + def delete(self, mid): + """Return a successful delete result.""" + + return MemoryResult(True, 2.0) + + def get_all(self): + """Return an empty list of memories.""" + + return MemoryResult(True, 1.0, data=[]) + + def cleanup(self): + """No-op cleanup for the mock adapter.""" + + from benchmarks.scenario_a import run_scenario_a + from benchmarks.evaluator import LLMEvaluator + import os + monkeypatch_env = {"OPENAI_API_KEY": ""} + evaluator = LLMEvaluator(api_key="") + adapter = MockAdapter() + metrics = run_scenario_a(adapter, evaluator, dry_run=True) + assert metrics.total_store_calls == 10 + assert metrics.total_retrieve_calls > 0 + assert metrics.store_p95_latency > 0 + + +# -- Integration: dry-run scenario B -- + +class TestDryRunScenarioB: + """Integration tests for Scenario B in dry-run mode.""" + + def test_dry_run_loads_dataset(self): + """Test that the Scenario B dataset loads with expected structure.""" + from benchmarks.scenario_b import load_dataset + dataset = load_dataset() + assert len(dataset) == 4 + assert "session_id" in dataset[0] + assert "evaluation_queries" in dataset[0] + + def test_dry_run_scenario_b(self): + """Run Scenario B in dry-run mode with a mock adapter.""" + from benchmarks.scenario_b import run_scenario_b + + class MockAdapter(MemoryAdapter): + """Mock adapter that returns canned responses for testing.""" + + @property + def name(self): + """Return the mock adapter name.""" + return "Mock" + + def setup(self, uid): + """No-op setup for the mock adapter.""" + + def store(self, content, metadata=None): + """Return a successful store result with fake latency.""" + + return MemoryResult(True, 5.0, len(content.split())*2) + + def retrieve(self, query, limit=5): + """Return a successful retrieve result with a mock memory.""" + + return MemoryResult(True, 3.0, 100, ["mock"]) + + def update(self, mid, content): + """Return a successful update result.""" + + return MemoryResult(True, 5.0) + + def delete(self, mid): + """Return a successful delete result.""" + + return MemoryResult(True, 2.0) + + def get_all(self): + """Return an empty list of memories.""" + + return MemoryResult(True, 1.0, data=[]) + + def cleanup(self): + """No-op cleanup for the mock adapter.""" + + evaluator = LLMEvaluator(api_key="") + adapter = MockAdapter() + metrics = run_scenario_b(adapter, evaluator, dry_run=True) + assert metrics.total_store_calls > 0 + assert metrics.total_retrieve_calls > 0