diff --git a/dependencies.yaml b/dependencies.yaml index 744e4d9227..16aba99374 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -214,6 +214,14 @@ files: table: project includes: - bench_python + py_elastic_cuvs_bench: + output: pyproject + pyproject_dir: python/cuvs_bench + extras: + table: project.optional-dependencies + key: elastic + includes: + - bench_elastic py_opensearch_cuvs_bench: output: pyproject pyproject_dir: python/cuvs_bench @@ -585,6 +593,11 @@ dependencies: - output_types: [requirements, pyproject] packages: - matplotlib>=3.9 + bench_elastic: + common: + - output_types: [conda, pyproject, requirements] + packages: + - cuvs-bench-elastic>=26.4.0 bench_python_opensearch: common: - output_types: [conda, pyproject] diff --git a/python/cuvs_bench/cuvs_bench/backends/elasticsearch.py b/python/cuvs_bench/cuvs_bench/backends/elasticsearch.py new file mode 100644 index 0000000000..dc4974a7a5 --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/backends/elasticsearch.py @@ -0,0 +1,921 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +""" +Elasticsearch GPU HTTP backend for cuvs-bench. + +Install with: pip install cuvs-bench[elastic] + +Build params (index_options): type, m, ef_construction. + type: hnsw, int8_hnsw, int4_hnsw, bbq_hnsw (per ES-GPU-API-REFERENCE.md) + similarity: l2_norm, cosine, max_inner_product (overrides dataset distance) +Index settings: number_of_shards, number_of_replicas, vector_field. + GPU indexing is configured at the node level (vectors.indexing.use_gpu in elasticsearch.yml). +Search params (knn): num_candidates, vector_field. +""" + +import os +import time +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple + +import numpy as np + +from .base import BenchmarkBackend, BuildResult, Dataset, SearchResult +from .registry import register_backend, register_config_loader +from ._utils import load_vectors +from ..orchestrator.config_loaders import ( + BenchmarkConfig, + ConfigLoader, + DatasetConfig, + IndexConfig, +) + +if TYPE_CHECKING: + from elasticsearch import Elasticsearch + + +def _load_fbin(path: Path) -> np.ndarray: + """Load big-ann-bench fbin format via shared vector loader.""" + return load_vectors(os.fspath(path)) + + +def _load_ibin(path: Path) -> np.ndarray: + """Load big-ann-bench ibin format via shared vector loader.""" + return load_vectors(os.fspath(path)) + + +def _distance_to_similarity(distance: str) -> str: + """Map cuvs-bench distance metric to ES dense_vector similarity.""" + m = { + "euclidean": "l2_norm", + "inner_product": "max_inner_product", + "cosine": "cosine", + } + return m.get(distance, "l2_norm") + + +def _supported_elastic_algorithms_message() -> str: + """Return a readable list of supported elastic algorithm names.""" + return ", ".join(f"'{algo}'" for algo in _SUPPORTED_ALGOS) + + +def _validate_elastic_index_type(index_type: str) -> str: + """Validate an Elasticsearch dense_vector index_options type.""" + if index_type not in _SUPPORTED_INDEX_TYPES: + raise ValueError( + "Received params for unsupported Elasticsearch index type " + f"{index_type!r}. Supported index types are: " + f"{', '.join(_SUPPORTED_INDEX_TYPES)}. " + "Please check the benchmark configuration." + ) + return index_type + + +def _validate_elastic_similarity(similarity: str) -> str: + """Validate an Elasticsearch dense_vector similarity value.""" + if similarity not in _SUPPORTED_SIMILARITIES: + raise ValueError( + "Received unsupported Elasticsearch similarity " + f"{similarity!r}. Supported similarities are: " + f"{', '.join(_SUPPORTED_SIMILARITIES)}. " + "Please check the benchmark configuration or dataset distance metric." + ) + return similarity + + +def _validate_elastic_algorithm(algo_name: str) -> str: + """Validate a cuvs-bench elastic algorithm name.""" + if algo_name not in _SUPPORTED_ALGOS: + raise ValueError( + "Received unsupported algorithm " + f"{algo_name!r} for the Elasticsearch backend. " + "Supported algorithms are: " + f"{_supported_elastic_algorithms_message()}." + ) + return algo_name + + +# Defaults for index creation when not specified in config +_DEFAULT_INDEX_TYPE = "hnsw" +_DEFAULT_M = 16 +_DEFAULT_EF_CONSTRUCTION = 100 +_DEFAULT_NUM_SHARDS = 1 +_DEFAULT_NUM_REPLICAS = 0 + +_DEFAULT_VECTOR_FIELD = "embedding" +_DEFAULT_NUM_CANDIDATES = 100 + +_SUPPORTED_INDEX_TYPES = ("hnsw", "int8_hnsw", "int4_hnsw", "bbq_hnsw") +_SUPPORTED_SIMILARITIES = ("l2_norm", "cosine", "max_inner_product") +_SUPPORTED_ALGOS = tuple( + f"elastic_{index_type}" for index_type in _SUPPORTED_INDEX_TYPES +) + +_BUILD_PARAM_KEYS = ( + "type", + "m", + "ef_construction", + "similarity", + "number_of_shards", + "number_of_replicas", + "vector_field", +) +_SEARCH_PARAM_KEYS = ("num_candidates", "vector_field") + + +class ElasticBackend(BenchmarkBackend): + """Elasticsearch GPU backend for vector benchmarking.""" + + def __init__(self, config: Dict[str, Any]): + super().__init__(config) + self._client: Optional["Elasticsearch"] = None + + @property + def algo(self) -> str: + """Algorithm name from config (e.g. elastic_hnsw, elastic_int8_hnsw).""" + index_type = self.config.get("type", _DEFAULT_INDEX_TYPE) + return f"elastic_{index_type}" + + def _get_client(self) -> "Elasticsearch": + if self._client is None: + try: + from elasticsearch import Elasticsearch + except ImportError as e: + raise ImportError( + "`elasticsearch` is required for the Elasticsearch backend. " + "Install with: pip install cuvs-bench[elastic]" + ) from e + host = self.config.get("host", "localhost") + port = self.config.get("port", 9200) + scheme = self.config.get("scheme", "http") + kwargs: Dict[str, Any] = { + "hosts": [f"{scheme}://{host}:{port}"], + "request_timeout": 300, # 5 min for slow bulk ops / remote ES + } + basic_auth = self.config.get("basic_auth") + if basic_auth is not None: + if ( + isinstance(basic_auth, (list, tuple)) + and len(basic_auth) >= 2 + ): + kwargs["basic_auth"] = ( + str(basic_auth[0]), + str(basic_auth[1]), + ) + elif isinstance(basic_auth, str) and ":" in basic_auth: + user, _, passwd = basic_auth.partition(":") + kwargs["basic_auth"] = (user, passwd) + self._client = Elasticsearch(**kwargs) + return self._client + + def cleanup(self) -> None: + """Close Elasticsearch client connection.""" + if self._client is not None: + self._client.close() + self._client = None + + @property + def requires_network(self) -> bool: + """Elasticsearch backend requires network connectivity.""" + return True + + def _check_network_available(self) -> bool: + """Verify Elasticsearch is reachable.""" + try: + return self._get_client().ping() + except Exception: + return False + + def build( + self, + dataset: Dataset, + indexes: List[IndexConfig], + force: bool = False, + dry_run: bool = False, + ) -> BuildResult: + """Build ES index from dataset vectors (fbin).""" + if dry_run: + return BuildResult( + index_path="", + build_time_seconds=0, + index_size_bytes=0, + algorithm=self.algo, + build_params={}, + success=True, + ) + + skip_reason = self._pre_flight_check() + if skip_reason: + return BuildResult( + index_path="", + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params={}, + success=False, + error_message=f"pre-flight check failed: {skip_reason}", + ) + + index_name = self.config.get("index_name", "cuvs_bench_vectors") + idx = indexes[0] if indexes else None + build_params = dict(idx.build_param or {}) if idx else {} + for k, v in self.config.items(): + if k not in build_params and k in _BUILD_PARAM_KEYS: + build_params[k] = v + + try: + client = self._get_client() + if client.indices.exists(index=index_name): + if not force: + stats = client.indices.stats(index=index_name) + index_size = stats["_all"]["primaries"]["store"][ + "size_in_bytes" + ] + return BuildResult( + index_path=index_name, + build_time_seconds=0, + index_size_bytes=index_size, + algorithm=self.algo, + build_params=build_params, + success=True, + ) + client.indices.delete(index=index_name) + except Exception as e: + return BuildResult( + index_path="", + build_time_seconds=0, + index_size_bytes=0, + algorithm=self.algo, + build_params={}, + success=False, + error_message=str(e), + ) + + vectors = dataset.training_vectors + if vectors.size == 0: + return BuildResult( + index_path="", + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params={}, + success=False, + error_message=( + "training_vectors are required for Elasticsearch backend " + "(directly or via dataset.base_file)" + ), + ) + + # similarity: from config, or derive from dataset distance + similarity = build_params.get("similarity") or _distance_to_similarity( + getattr(dataset, "distance_metric", None) or "euclidean" + ) + + index_type = build_params.get("type", _DEFAULT_INDEX_TYPE) + try: + _validate_elastic_index_type(index_type) + _validate_elastic_similarity(similarity) + except ValueError as e: + return BuildResult( + index_path="", + build_time_seconds=0.0, + index_size_bytes=0, + algorithm=self.algo, + build_params=build_params, + success=False, + error_message=str(e), + ) + + try: + n_vectors = len(vectors) + dims = vectors.shape[1] + client = self._get_client() + + vector_field = build_params.get( + "vector_field", _DEFAULT_VECTOR_FIELD + ) + m = build_params.get("m", _DEFAULT_M) + ef_construction = build_params.get( + "ef_construction", _DEFAULT_EF_CONSTRUCTION + ) + num_shards = build_params.get( + "number_of_shards", _DEFAULT_NUM_SHARDS + ) + num_replicas = build_params.get( + "number_of_replicas", _DEFAULT_NUM_REPLICAS + ) + settings: Dict[str, Any] = { + "number_of_shards": num_shards, + "number_of_replicas": num_replicas, + } + # Note: GPU indexing is controlled at the node level via + # vectors.indexing.use_gpu in elasticsearch.yml, not per-index. + + index_options: Dict[str, Any] = { + "type": index_type, + "m": m, + "ef_construction": ef_construction, + } + + index_config: Dict[str, Any] = { + "settings": settings, + "mappings": { + "properties": { + vector_field: { + "type": "dense_vector", + "dims": dims, + "index": True, + "similarity": similarity, + "index_options": index_options, + }, + }, + }, + } + + t0 = time.perf_counter() + client.indices.create(index=index_name, body=index_config) + + from elasticsearch.helpers import bulk + + chunk_size = 1000 + progress_interval = max( + 50, n_vectors // (chunk_size * 20) + ) # ~20 progress lines + for i in range(0, n_vectors, chunk_size): + chunk = vectors[i : i + chunk_size] + actions = [ + { + "_index": index_name, + "_id": str(i + j), + vector_field: vec.tolist(), + } + for j, vec in enumerate(chunk) + ] + bulk(client, actions, raise_on_error=True) + if ( + progress_interval + and (i // chunk_size) % progress_interval == 0 + ): + print( + f" Indexed {min(i + chunk_size, n_vectors):,}/{n_vectors:,} vectors" + ) + + client.indices.refresh(index=index_name) + build_time = time.perf_counter() - t0 + + stats = client.indices.stats(index=index_name) + index_size = stats["_all"]["primaries"]["store"]["size_in_bytes"] + + return BuildResult( + index_path=index_name, + build_time_seconds=build_time, + index_size_bytes=index_size, + algorithm=self.algo, + build_params=build_params, + success=True, + ) + except Exception as e: + return BuildResult( + index_path="", + build_time_seconds=0, + index_size_bytes=0, + algorithm=self.algo, + build_params={}, + success=False, + error_message=str(e), + ) + + def search( + self, + dataset: Dataset, + indexes: List[IndexConfig], + k: int = 10, + batch_size: int = 10000, + mode: str = "latency", + force: bool = False, + search_threads: Optional[int] = None, + dry_run: bool = False, + ) -> SearchResult: + """Run kNN search over all search-param combinations and compute recall.""" + if dry_run: + return SearchResult( + neighbors=np.empty((0, k), dtype=np.int64), + distances=np.empty((0, k), dtype=np.float32), + search_time_ms=0, + queries_per_second=0, + recall=0, + algorithm=self.algo, + search_params=[], + success=True, + ) + + skip_reason = self._pre_flight_check() + if skip_reason: + return SearchResult( + neighbors=np.empty((0, k), dtype=np.int64), + distances=np.empty((0, k), dtype=np.float32), + search_time_ms=0, + queries_per_second=0, + recall=0, + algorithm=self.algo, + search_params=[], + success=False, + error_message=f"pre-flight check failed: {skip_reason}", + ) + + if not indexes: + return SearchResult( + neighbors=np.empty((0, k), dtype=np.int64), + distances=np.empty((0, k), dtype=np.float32), + search_time_ms=0, + queries_per_second=0, + recall=0, + algorithm=self.algo, + search_params=[], + success=False, + error_message="No indexes provided", + ) + + query_vectors = dataset.query_vectors + if query_vectors.size == 0: + return SearchResult( + neighbors=np.empty((0, k), dtype=np.int64), + distances=np.empty((0, k), dtype=np.float32), + search_time_ms=0, + queries_per_second=0, + recall=0, + algorithm=self.algo, + search_params=[], + success=False, + error_message=( + "query_vectors are required for Elasticsearch backend " + "(directly or via dataset.query_file)" + ), + ) + + try: + n_queries = len(query_vectors) + + groundtruth = dataset.groundtruth_neighbors + + index_name = self.config.get("index_name", "cuvs_bench_vectors") + index_cfg = indexes[0] + search_params_list = index_cfg.search_params or [{}] + + per_param_results: List[Dict[str, Any]] = [] + last_neighbors = np.full((n_queries, k), -1, dtype=np.int64) + last_distances = np.zeros((n_queries, k), dtype=np.float32) + + for sp in search_params_list: + num_candidates = sp.get( + "num_candidates", _DEFAULT_NUM_CANDIDATES + ) + vector_field = sp.get("vector_field", _DEFAULT_VECTOR_FIELD) + + neighbors = np.full((n_queries, k), -1, dtype=np.int64) + distances = np.zeros((n_queries, k), dtype=np.float32) + latencies: List[float] = [] + + t0 = time.perf_counter() + for i, qv in enumerate(query_vectors): + body = { + "knn": { + "field": vector_field, + "query_vector": qv.tolist(), + "k": k, + "num_candidates": num_candidates, + } + } + t_q = time.perf_counter() + resp = self._get_client().search( + index=index_name, body=body, size=k + ) + latencies.append((time.perf_counter() - t_q) * 1000) + hits = resp.get("hits", {}).get("hits", []) + for j, hit in enumerate(hits[:k]): + neighbors[i, j] = int(hit["_id"]) + distances[i, j] = float(hit["_score"]) + + elapsed_ms = (time.perf_counter() - t0) * 1000 + qps = ( + n_queries / (elapsed_ms / 1000) if elapsed_ms > 0 else 0.0 + ) + + recall = 0.0 + if groundtruth is not None: + gt_k = min(k, groundtruth.shape[1]) + n_correct = sum( + len( + set(neighbors[i, :k].tolist()) + & set(groundtruth[i, :gt_k].tolist()) + ) + for i in range(n_queries) + ) + recall = ( + n_correct / (n_queries * gt_k) if gt_k > 0 else 0.0 + ) + + per_param_results.append( + { + "search_params": sp, + "search_time_ms": elapsed_ms, + "queries_per_second": qps, + "recall": recall, + "p50_ms": float(np.percentile(latencies, 50)), + "p95_ms": float(np.percentile(latencies, 95)), + "p99_ms": float(np.percentile(latencies, 99)), + } + ) + last_neighbors = neighbors + last_distances = distances + + avg_recall = float( + np.mean([r["recall"] for r in per_param_results]) + ) + avg_qps = float( + np.mean([r["queries_per_second"] for r in per_param_results]) + ) + total_ms = float( + sum(r["search_time_ms"] for r in per_param_results) + ) + + return SearchResult( + neighbors=last_neighbors, + distances=last_distances, + search_time_ms=total_ms, + queries_per_second=avg_qps, + recall=avg_recall, + algorithm=self.algo, + search_params=search_params_list, + latency_percentiles={ + "p50_ms": per_param_results[-1]["p50_ms"], + "p95_ms": per_param_results[-1]["p95_ms"], + "p99_ms": per_param_results[-1]["p99_ms"], + }, + metadata={ + "per_search_param_results": per_param_results, + "recall_is_authoritative": True, + }, + success=True, + ) + except Exception as e: + return SearchResult( + neighbors=np.empty((0, k), dtype=np.int64), + distances=np.empty((0, k), dtype=np.float32), + search_time_ms=0, + queries_per_second=0, + recall=0, + algorithm=self.algo, + search_params=[], + success=False, + error_message=str(e), + ) + + +def _get_cuvs_bench_config_path() -> str: + """Get cuvs_bench config directory for shared datasets.yaml.""" + import cuvs_bench.orchestrator.config_loaders as _config_loaders + + mod_file = getattr(_config_loaders, "__file__", None) + if mod_file: + # config_loaders is at cuvs_bench/orchestrator/config_loaders.py + # config is at cuvs_bench/config + pkg_dir = Path(mod_file).resolve().parent.parent + return str(pkg_dir / "config") + import cuvs_bench + + if cuvs_bench.__file__: + return os.path.join(os.path.dirname(cuvs_bench.__file__), "config") + raise RuntimeError( + "Cannot determine cuvs_bench config path. " + "Ensure cuvs_bench is properly installed or on PYTHONPATH." + ) + + +def _get_elastic_config_path() -> str: + """Get the config directory for elastic.yaml.""" + return os.path.join(os.path.dirname(__file__), "../config") + + +class ElasticConfigLoader(ConfigLoader): + """Config loader for Elasticsearch backend.""" + + def __init__(self, config_path: Optional[str] = None): + self.config_path = config_path or _get_cuvs_bench_config_path() + + @property + def backend_type(self) -> str: + return "elastic" + + def load( + self, + dataset: str = "", + dataset_path: str = "", + **kwargs, + ) -> Tuple[DatasetConfig, List[BenchmarkConfig]]: + """Load Elasticsearch benchmark configuration via shared ConfigLoader flow.""" + return super().load( + dataset=dataset, dataset_path=dataset_path, **kwargs + ) + + def _discover_algo_groups( + self, + dataset_conf, + dataset, + dataset_path, + **kwargs, + ): + """Discover elastic algorithm groups using shared loader semantics.""" + algorithm_configuration = kwargs.get("algorithm_configuration") + algorithms_arg = kwargs.get("algorithms") + groups_arg = kwargs.get("groups") + algo_groups_arg = kwargs.get("algo_groups") + + algos_conf_fs = self.gather_algorithm_configs( + self.config_path, algorithm_configuration + ) + + elastic_algos = [] + for algo_f in algos_conf_fs: + try: + algo_conf = self.load_yaml_file(algo_f) + except Exception: + continue + if not isinstance(algo_conf, dict): + continue + algo_name = algo_conf.get("name", "") + if not algo_name.startswith("elastic_"): + continue + elastic_algos.append(algo_conf) + + if not elastic_algos: + default_grp = { + "build": {"m": [16], "ef_construction": [100]}, + "search": {"num_candidates": [100]}, + } + elastic_algos = [ + {"name": "elastic_hnsw", "groups": {"base": default_grp}} + ] + + allowed_algos = ( + [a.strip() for a in algorithms_arg.split(",") if a.strip()] + if algorithms_arg + else None + ) + allowed_groups = ( + [g.strip() for g in groups_arg.split(",") if g.strip()] + if groups_arg + else None + ) + algo_group_map: Dict[str, set] = {} + if algo_groups_arg: + for item in algo_groups_arg.split(","): + item = item.strip() + if not item or "." not in item: + continue + algo_name, group_name = item.split(".", 1) + algo_group_map.setdefault(algo_name, set()).add(group_name) + + # Backward-compatible fallback for older elastic usage where `algorithms` + # was used as a group selector (e.g. algorithms="test"). + if allowed_algos and not allowed_groups and not algo_group_map: + known_groups = { + group_name + for algo_conf in elastic_algos + for group_name in algo_conf.get("groups", {}) + } + if all(name in known_groups for name in allowed_algos): + allowed_groups = allowed_algos + allowed_algos = None + + if allowed_groups is None and not algo_group_map: + allowed_groups = ["base"] + + result = [] + for algo_conf in elastic_algos: + algo_name = algo_conf["name"] + if allowed_algos and algo_name not in allowed_algos: + continue + + groups = dict(algo_conf.get("groups", {})) + if allowed_groups is not None: + groups = { + group_name: group_conf + for group_name, group_conf in groups.items() + if group_name in allowed_groups + } + if algo_name in algo_group_map: + groups = { + group_name: group_conf + for group_name, group_conf in groups.items() + if group_name in algo_group_map[algo_name] + } + + for group_name, group_conf in groups.items(): + result.append((algo_name, group_name, group_conf, {})) + + if not result and allowed_groups: + raise ValueError( + f"Could not find elastic groups {allowed_groups} in elastic configs" + ) + if not result and allowed_algos: + raise ValueError( + f"Could not find elastic algorithms {allowed_algos} in elastic configs" + ) + + return result + + def _build_benchmark_configs( + self, + dataset_config, + dataset_conf, + dataset, + dataset_path, + expanded_groups, + **kwargs, + ): + """Build BenchmarkConfigs from shared expanded elastic parameter groups.""" + host = kwargs.get("host", "localhost") + port = kwargs.get("port", 9200) + scheme = kwargs.get("scheme", "http") + basic_auth = kwargs.get("basic_auth") + username = kwargs.get("username") + password = kwargs.get("password") + if basic_auth is None and username and password: + basic_auth = (username, password) + + tune_mode = kwargs.get("_tune_mode", False) + tune_build_params = kwargs.get("_tune_build_params") + tune_search_params = kwargs.get("_tune_search_params") + + benchmark_configs = [] + for ( + algo_name, + group_name, + _group_conf, + build_combos, + search_combos, + _group_meta, + ) in expanded_groups: + _validate_elastic_algorithm(algo_name) + if tune_mode and tune_build_params is not None: + actual_build = [dict(tune_build_params)] + actual_search = ( + [dict(tune_search_params)] if tune_search_params else [{}] + ) + else: + actual_build = build_combos + actual_search = search_combos + + for build_param in actual_build: + build_param = dict(build_param) + if "type" not in build_param and algo_name.startswith( + "elastic_" + ): + build_param["type"] = algo_name.replace("elastic_", "", 1) + _validate_elastic_index_type(build_param["type"]) + if "similarity" in build_param: + _validate_elastic_similarity(build_param["similarity"]) + + if tune_mode: + label_prefix = f"{algo_name}_tune" + elif group_name != "base": + label_prefix = f"{algo_name}_{group_name}" + else: + label_prefix = algo_name + + name_parts = [ + f"{k}{v}" + for k, v in build_param.items() + if k in ("m", "ef_construction") + ] + index_label = ( + "_".join([label_prefix] + name_parts) + if name_parts + else label_prefix + ) + es_index_name = index_label.lower().replace(".", "_") + + index_config = IndexConfig( + name=index_label, + algo=algo_name, + build_param=build_param, + search_params=[dict(sp) for sp in actual_search], + file="", + ) + benchmark_configs.append( + BenchmarkConfig( + indexes=[index_config], + backend_config={ + "name": index_label, + "host": host, + "port": port, + "scheme": scheme, + "index_name": es_index_name, + "basic_auth": basic_auth, + **build_param, + }, + ) + ) + + return benchmark_configs + + +def register() -> None: + """Register Elasticsearch backend and config loader (idempotent).""" + from cuvs_bench.backends.registry import ( + _CONFIG_LOADER_REGISTRY, + get_registry, + ) + + reg = get_registry() + if not reg.is_registered("elastic"): + register_backend("elastic", ElasticBackend) + if "elastic" not in _CONFIG_LOADER_REGISTRY: + register_config_loader("elastic", ElasticConfigLoader) + + +# ── Convenience API ─────────────────────────────────────────────────────────── + + +def run_build( + dataset: str = "test-data", + dataset_path: str = "./datasets", + host: str = "localhost", + port: int = 9200, + algorithms: str = "test", + force: bool = False, + **kwargs, +): + """Build an Elasticsearch vector index. Returns list of BuildResult.""" + register() + from cuvs_bench.orchestrator import BenchmarkOrchestrator + + orch = BenchmarkOrchestrator(backend_type="elastic") + return orch.run_benchmark( + build=True, + search=False, + dataset=dataset, + dataset_path=dataset_path, + host=host, + port=port, + algorithms=algorithms, + force=force, + **kwargs, + ) + + +def run_search( + dataset: str = "test-data", + dataset_path: str = "./datasets", + host: str = "localhost", + port: int = 9200, + algorithms: str = "test", + **kwargs, +): + """Run kNN search against an existing Elasticsearch index. Returns list of SearchResult.""" + register() + from cuvs_bench.orchestrator import BenchmarkOrchestrator + + orch = BenchmarkOrchestrator(backend_type="elastic") + return orch.run_benchmark( + build=False, + search=True, + dataset=dataset, + dataset_path=dataset_path, + host=host, + port=port, + algorithms=algorithms, + **kwargs, + ) + + +def run_benchmark( + dataset: str = "test-data", + dataset_path: str = "./datasets", + host: str = "localhost", + port: int = 9200, + algorithms: str = "test", + build: bool = True, + search: bool = True, + force: bool = False, + **kwargs, +): + """Run build and/or search. Returns list of results.""" + register() + from cuvs_bench.orchestrator import BenchmarkOrchestrator + + orch = BenchmarkOrchestrator(backend_type="elastic") + return orch.run_benchmark( + build=build, + search=search, + dataset=dataset, + dataset_path=dataset_path, + host=host, + port=port, + algorithms=algorithms, + force=force, + **kwargs, + ) diff --git a/python/cuvs_bench/cuvs_bench/backends/opensearch.py b/python/cuvs_bench/cuvs_bench/backends/opensearch.py index 026fe98fd6..3f7e1c04ac 100644 --- a/python/cuvs_bench/cuvs_bench/backends/opensearch.py +++ b/python/cuvs_bench/cuvs_bench/backends/opensearch.py @@ -559,8 +559,8 @@ def _failed_search_result( search_params: Optional[List[Dict[str, Any]]] = None, ) -> SearchResult: return SearchResult( - neighbors=np.zeros((0, k), dtype=np.int64), - distances=np.zeros((0, k), dtype=np.float32), + neighbors=np.empty((0, k), dtype=np.int64), + distances=np.empty((0, k), dtype=np.float32), search_time_ms=0.0, queries_per_second=0.0, recall=0.0, @@ -871,8 +871,8 @@ def search( ) return SearchResult( - neighbors=np.zeros((0, k), dtype=np.int64), - distances=np.zeros((0, k), dtype=np.float32), + neighbors=np.empty((0, k), dtype=np.int64), + distances=np.empty((0, k), dtype=np.float32), search_time_ms=0.0, queries_per_second=0.0, recall=0.0, diff --git a/python/cuvs_bench/cuvs_bench/backends/registry.py b/python/cuvs_bench/cuvs_bench/backends/registry.py index 1a4dd88dde..b397dfc9d6 100644 --- a/python/cuvs_bench/cuvs_bench/backends/registry.py +++ b/python/cuvs_bench/cuvs_bench/backends/registry.py @@ -12,10 +12,47 @@ from typing import Dict, Type, Optional from pathlib import Path import importlib +import importlib.metadata import yaml from .base import BenchmarkBackend +# Entry point group names for plugin discovery +_BACKENDS_GROUP = "cuvs_bench.backends" +_CONFIG_LOADERS_GROUP = "cuvs_bench.config_loaders" + +_OPTIONAL_BACKEND_EXTRAS = { + "elastic": ("elasticsearch", "pip install cuvs-bench[elastic]"), + "opensearch": ("opensearchpy", "pip install cuvs-bench[opensearch]"), +} + + +def _optional_backend_install_hint(name: str) -> str: + """Return an install hint for optional backends.""" + extra = _OPTIONAL_BACKEND_EXTRAS.get(name) + if extra is None: + return "" + return f" Install with: {extra[1]}" + + +def _rewrite_optional_backend_import_error( + name: str, error: ImportError +) -> ImportError | None: + """Rewrite ImportError with an install hint for known optional backends.""" + extra = _OPTIONAL_BACKEND_EXTRAS.get(name) + if extra is None: + return None + + module_name, install_cmd = extra + message = str(error).lower() + if module_name.lower() in message: + backend_name = name.capitalize() + return ImportError( + f"{backend_name} backend requires the '{name}' extra. " + f"Install with: {install_cmd}" + ) + return None + class BackendRegistry: """ @@ -375,10 +412,41 @@ def get_backend(name: str, config: Dict) -> BenchmarkBackend: return registry.get_backend(name, config) +def _try_load_plugin(name: str) -> None: + """ + Try to load backend and config loader from entry points for the given name. + + Plugins register themselves when their entry point is loaded. + Raises ImportError with install instructions if the plugin requires + an optional dependency that is not installed. + """ + for group in (_BACKENDS_GROUP, _CONFIG_LOADERS_GROUP): + try: + eps = importlib.metadata.entry_points(group=group) + except TypeError: + eps = importlib.metadata.entry_points().get(group, []) + if hasattr(eps, "select"): # Python 3.10+ + eps = list(eps.select(name=name)) + else: + eps = [e for e in eps if e.name == name] + for ep in eps: + try: + ep.load()() + except ImportError as e: + rewritten = _rewrite_optional_backend_import_error(name, e) + if rewritten is not None: + raise rewritten from e + raise + return # Plugin loaded successfully + + def get_backend_class(name: str) -> Type[BenchmarkBackend]: """ Get the backend class (not instance) from the global registry. + If the backend is not registered, attempts to load it from entry points + (e.g., optional plugins like elastic). + Parameters ---------- name : str @@ -390,10 +458,13 @@ def get_backend_class(name: str) -> Type[BenchmarkBackend]: Backend class """ registry = get_registry() + if name not in registry._backends: + _try_load_plugin(name) if name not in registry._backends: available = ", ".join(registry._backends.keys()) + hint = _optional_backend_install_hint(name) raise ValueError( - f"Backend '{name}' not found. Available backends: {available or '(none)'}" + f"Backend '{name}' not found. Available backends: {available or '(none)'}.{hint}" ) return registry._backends[name] @@ -440,6 +511,9 @@ def get_config_loader(name: str) -> Type: """ Get a registered config loader class by name. + If the config loader is not registered, attempts to load it from entry points + (e.g., optional plugins like elastic). + Parameters ---------- name : str @@ -455,11 +529,13 @@ def get_config_loader(name: str) -> Type: ValueError If config loader is not registered """ - # _CONFIG_LOADER_REGISTRY is a dictionary that maps backend names to config loader classes + if name not in _CONFIG_LOADER_REGISTRY: + _try_load_plugin(name) if name not in _CONFIG_LOADER_REGISTRY: available = ", ".join(_CONFIG_LOADER_REGISTRY.keys()) or "none" + hint = _optional_backend_install_hint(name) raise ValueError( - f"Unknown config loader for backend: '{name}'. Available: {available}" + f"Unknown config loader for backend: '{name}'. Available: {available}.{hint}" ) return _CONFIG_LOADER_REGISTRY[name] @@ -467,3 +543,16 @@ def get_config_loader(name: str) -> Type: def list_config_loaders() -> Dict[str, Type]: """Return all registered config loaders.""" return dict(_CONFIG_LOADER_REGISTRY) + + +def unregister_config_loader(name: str) -> None: + """ + Unregister a config loader by name (primarily for testing). + + Parameters + ---------- + name : str + Backend name to unregister + """ + if name in _CONFIG_LOADER_REGISTRY: + del _CONFIG_LOADER_REGISTRY[name] diff --git a/python/cuvs_bench/cuvs_bench/backends/search_spaces.py b/python/cuvs_bench/cuvs_bench/backends/search_spaces.py index 57949eb86e..a0aba3e52e 100644 --- a/python/cuvs_bench/cuvs_bench/backends/search_spaces.py +++ b/python/cuvs_bench/cuvs_bench/backends/search_spaces.py @@ -159,6 +159,47 @@ }, }, # ========================================================================= + # Elasticsearch GPU HNSW (hnsw, int8_hnsw, int4_hnsw, bbq_hnsw) + # Per ES-GPU-API-REFERENCE.md: index_options (m, ef_construction), + # knn (num_candidates) + # ========================================================================= + "elastic_hnsw": { + "build": { + "m": {"type": "int", "min": 8, "max": 64}, + "ef_construction": {"type": "int", "min": 50, "max": 500}, + }, + "search": { + "num_candidates": {"type": "int", "min": 50, "max": 500}, + }, + }, + "elastic_int8_hnsw": { + "build": { + "m": {"type": "int", "min": 8, "max": 64}, + "ef_construction": {"type": "int", "min": 50, "max": 500}, + }, + "search": { + "num_candidates": {"type": "int", "min": 50, "max": 500}, + }, + }, + "elastic_int4_hnsw": { + "build": { + "m": {"type": "int", "min": 8, "max": 64}, + "ef_construction": {"type": "int", "min": 50, "max": 500}, + }, + "search": { + "num_candidates": {"type": "int", "min": 50, "max": 500}, + }, + }, + "elastic_bbq_hnsw": { + "build": { + "m": {"type": "int", "min": 8, "max": 64}, + "ef_construction": {"type": "int", "min": 50, "max": 500}, + }, + "search": { + "num_candidates": {"type": "int", "min": 50, "max": 500}, + }, + }, + # ========================================================================= # OpenSearch HNSW (faiss engine) # ========================================================================= "opensearch_faiss_hnsw": { diff --git a/python/cuvs_bench/cuvs_bench/config/algos/elastic.yaml b/python/cuvs_bench/cuvs_bench/config/algos/elastic.yaml new file mode 100644 index 0000000000..daa4fa22ab --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/config/algos/elastic.yaml @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +# Elasticsearch GPU HNSW: index_options (build) and knn (search) +# Per ES-GPU-API-REFERENCE.md and DOCKER-INSPECT-FINDINGS.md +# +# Build (index_options): type, m, ef_construction +# type: hnsw (float), int8_hnsw (4x quantized), int4_hnsw (8x), bbq_hnsw (32x) +# similarity: l2_norm, cosine, max_inner_product (overrides dataset distance) +# Index settings: number_of_shards, number_of_replicas, use_gpu, vector_field +# Search (knn): num_candidates, vector_field +name: elastic_hnsw +groups: + base: + build: + type: [hnsw] + m: [16, 32] + ef_construction: [100, 200] + similarity: [l2_norm, cosine] + number_of_shards: [1] + number_of_replicas: [0] + + vector_field: [embedding] + search: + num_candidates: [100, 200] + vector_field: [embedding] + quantized: + build: + type: [hnsw, int8_hnsw] + m: [16, 32] + ef_construction: [100] + similarity: [l2_norm] + number_of_shards: [1] + number_of_replicas: [0] + + vector_field: [embedding] + search: + num_candidates: [100, 200] + vector_field: [embedding] + test: + build: + type: [hnsw] + m: [16] + ef_construction: [100] + similarity: [l2_norm] + number_of_shards: [1] + number_of_replicas: [0] + + vector_field: [embedding] + search: + num_candidates: [100] + vector_field: [embedding] diff --git a/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py b/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py index 580291c2f0..4d000ed232 100644 --- a/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py +++ b/python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py @@ -24,6 +24,15 @@ from .config_loaders import DatasetConfig +def _should_compute_recall(result: SearchResult) -> bool: + """Return True when orchestrator should derive recall from neighbors.""" + return ( + result.success + and result.neighbors.size > 0 + and not result.metadata.get("recall_is_authoritative", False) + ) + + class BenchmarkOrchestrator: """ Orchestrator for running benchmarks using the pluggable backend system. @@ -260,13 +269,7 @@ def _run_sweep( # Compute recall for backends that return actual neighbors. # The C++ backend computes recall in the subprocess and returns # empty neighbors, so this is skipped for it. - # Empty neighbors or nonzero recall indicate that the backend - # already handled recall itself. - if ( - search_result.success - and search_result.neighbors.size > 0 - and search_result.recall == 0.0 - ): + if _should_compute_recall(search_result): gt = bench_dataset.groundtruth_neighbors if gt is not None: search_result.recall = compute_recall( @@ -572,11 +575,10 @@ def _run_trial( "append_results": append_results, } backend = self.backend_class(backend_config) + result = None try: backend.initialize() - result = None - if build: result = backend.build( dataset=bench_dataset, @@ -600,13 +602,7 @@ def _run_trial( ) # Compute recall for backends that return actual neighbors. - # Empty neighbors or nonzero recall indicate that the backend - # already handled recall itself. - if ( - result.success - and result.neighbors.size > 0 - and result.recall == 0.0 - ): + if _should_compute_recall(result): gt = bench_dataset.groundtruth_neighbors if gt is not None: result.recall = compute_recall( diff --git a/python/cuvs_bench/cuvs_bench/tests/conftest.py b/python/cuvs_bench/cuvs_bench/tests/conftest.py new file mode 100644 index 0000000000..c003df9b3a --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/tests/conftest.py @@ -0,0 +1,23 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +"""Pytest configuration for cuvs_bench tests.""" + + +def pytest_configure(config): + """Register elastic plugin when elasticsearch is available. + + Ensures elastic tests run when elasticsearch is installed, even if + cuvs-bench-elastic was not installed via pip (e.g. using PYTHONPATH). + """ + try: + import elasticsearch # noqa: F401 + except ImportError: + return + + try: + from cuvs_bench_elastic import register + register() + except ImportError: + pass diff --git a/python/cuvs_bench/cuvs_bench/tests/test_modularization.py b/python/cuvs_bench/cuvs_bench/tests/test_modularization.py new file mode 100644 index 0000000000..f82f2c69ba --- /dev/null +++ b/python/cuvs_bench/cuvs_bench/tests/test_modularization.py @@ -0,0 +1,1002 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. +# SPDX-License-Identifier: Apache-2.0 +# +""" +Smoke tests for cuvs-bench modularization (optional deps, entry points, lazy loading). + +These tests verify the plugin infrastructure without requiring full backend implementations. +""" + +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from cuvs_bench.backends.base import ( + BenchmarkBackend, + BuildResult, + Dataset, + SearchResult, +) +from cuvs_bench.backends.registry import ( + get_backend_class, + get_config_loader, + get_registry, + list_backends, + list_config_loaders, + register_backend, + register_config_loader, + unregister_config_loader, +) +from cuvs_bench.orchestrator.config_loaders import ConfigLoader, IndexConfig + + +class TestModularizationSmoke: + """Smoke tests for optional backend loading and error handling.""" + + def test_cpp_gbench_available(self): + """cpp_gbench (built-in) should always be available.""" + backends = list_backends() + assert "cpp_gbench" in backends + cls = get_backend_class("cpp_gbench") + assert cls is not None + + def test_cpp_gbench_config_loader_available(self): + """cpp_gbench config loader should always be registered.""" + loaders = list_config_loaders() + assert "cpp_gbench" in loaders + loader_cls = get_config_loader("cpp_gbench") + assert loader_cls is not None + + def test_elastic_without_extra_raises_clear_error(self): + """Requesting elastic without [elastic] installed raises helpful error.""" + try: + import elasticsearch # noqa: F401 + + pytest.skip( + "elasticsearch is installed; cannot test missing-plugin path" + ) + except ImportError: + pass + import importlib.metadata + + all_eps = importlib.metadata.entry_points() + if hasattr(all_eps, "select"): + eps = list( + all_eps.select(group="cuvs_bench.backends", name="elastic") + ) + else: + eps = [ + e + for e in all_eps.get("cuvs_bench.backends", []) + if e.name == "elastic" + ] + if eps: + pytest.skip( + "cuvs-bench-elastic is installed; cannot test missing-plugin path" + ) + + with pytest.raises((ImportError, ValueError)) as exc_info: + get_backend_class("elastic") + + msg = str(exc_info.value) + assert "elastic" in msg.lower() + + def test_elastic_config_loader_without_extra_raises_clear_error(self): + """Requesting elastic config loader without [elastic] raises helpful error.""" + try: + import elasticsearch # noqa: F401 + + pytest.skip( + "elasticsearch is installed; cannot test missing-plugin path" + ) + except ImportError: + pass + import importlib.metadata + + all_eps = importlib.metadata.entry_points() + if hasattr(all_eps, "select"): + eps = list( + all_eps.select( + group="cuvs_bench.config_loaders", name="elastic" + ) + ) + else: + eps = [ + e + for e in all_eps.get("cuvs_bench.config_loaders", []) + if e.name == "elastic" + ] + if eps: + pytest.skip( + "cuvs-bench-elastic is installed; cannot test missing-plugin path" + ) + + with pytest.raises((ImportError, ValueError)) as exc_info: + get_config_loader("elastic") + + msg = str(exc_info.value) + assert "elastic" in msg.lower() + + def test_unknown_backend_raises_value_error(self): + """Requesting unknown backend raises ValueError with available backends.""" + with pytest.raises(ValueError) as exc_info: + get_backend_class("nonexistent_backend_xyz") + + msg = str(exc_info.value) + assert "nonexistent_backend_xyz" in msg + assert "cpp_gbench" in msg + + def test_unknown_config_loader_raises_value_error(self): + """Requesting unknown config loader raises ValueError.""" + with pytest.raises(ValueError) as exc_info: + get_config_loader("nonexistent_loader_xyz") + + msg = str(exc_info.value) + assert "nonexistent_loader_xyz" in msg + + def test_orchestrator_cpp_gbench_no_regression(self): + """Initializing BenchmarkOrchestrator with cpp_gbench should work.""" + from cuvs_bench.orchestrator import BenchmarkOrchestrator + + assert "cpp_gbench" in BenchmarkOrchestrator.available_backends() + orch = BenchmarkOrchestrator(backend_type="cpp_gbench") + assert orch.backend_type == "cpp_gbench" + assert orch.backend_class is not None + assert orch.config_loader is not None + + def test_orchestrator_respects_authoritative_backend_recall(self): + """Backends can mark their own recall as authoritative.""" + from cuvs_bench.orchestrator.orchestrator import _should_compute_recall + + result = SearchResult( + neighbors=np.array([[1, 2, 3]], dtype=np.int64), + distances=np.array([[0.1, 0.2, 0.3]], dtype=np.float32), + search_time_ms=1.0, + queries_per_second=1.0, + recall=0.5, + algorithm="elastic_hnsw", + search_params=[{"num_candidates": 100}], + metadata={"recall_is_authoritative": True}, + success=True, + ) + + assert not _should_compute_recall(result) + + +class TestPluginLoaderMocked: + """ + Tests using mocked entry points (NAT-style). + + These tests do not require elasticsearch or real plugins. + """ + + _MOCK_PLUGIN_NAME = "mock_plugin_test" + + @staticmethod + def _make_mock_register(): + """Create a register function that registers a minimal stub backend and loader.""" + + class StubBackend(BenchmarkBackend): + def build(self, dataset, indexes, force=False, dry_run=False): + return BuildResult( + index_path="", + build_time_seconds=0, + index_size_bytes=0, + algorithm="stub", + build_params={}, + success=True, + ) + + def search( + self, + dataset, + indexes, + k=10, + batch_size=10000, + mode="latency", + force=False, + search_threads=None, + dry_run=False, + ): + import numpy as np + + return SearchResult( + neighbors=np.empty((0, k)), + distances=np.empty((0, k)), + search_time_ms=0, + queries_per_second=0, + recall=0, + algorithm="stub", + search_params=[], + success=True, + ) + + class StubConfigLoader(ConfigLoader): + @property + def backend_type(self): + return TestPluginLoaderMocked._MOCK_PLUGIN_NAME + + def load(self, **kwargs): + raise NotImplementedError("Stub loader") + + def register(): + register_backend( + TestPluginLoaderMocked._MOCK_PLUGIN_NAME, StubBackend + ) + register_config_loader( + TestPluginLoaderMocked._MOCK_PLUGIN_NAME, StubConfigLoader + ) + + return register + + def test_valid_plugin_loads_via_mock_entry_point(self): + """Mock entry point: valid plugin registers and is discoverable.""" + mock_register = self._make_mock_register() + mock_ep = MagicMock() + mock_ep.name = self._MOCK_PLUGIN_NAME + mock_ep.load.return_value = mock_register + + mock_eps = MagicMock() + mock_eps.select.return_value = [mock_ep] + + with patch( + "cuvs_bench.backends.registry.importlib.metadata.entry_points", + return_value=mock_eps, + ): + cls = get_backend_class(self._MOCK_PLUGIN_NAME) + assert cls is not None + + loader_cls = get_config_loader(self._MOCK_PLUGIN_NAME) + assert loader_cls is not None + + # Cleanup + get_registry().unregister(self._MOCK_PLUGIN_NAME) + unregister_config_loader(self._MOCK_PLUGIN_NAME) + + def test_import_error_with_elasticsearch_message_raises_helpful_error( + self, + ): + """Mock entry point raising ImportError(elasticsearch) -> our install message.""" + # Ensure elastic is not in registry (e.g. from TestElasticWithExtraInstalled) + registry = get_registry() + if "elastic" in registry._backends: + registry.unregister("elastic") + unregister_config_loader("elastic") + + mock_ep = MagicMock() + mock_ep.name = "elastic" + mock_ep.load.side_effect = ImportError( + "No module named 'elasticsearch'" + ) + + mock_eps = MagicMock() + mock_eps.select.return_value = [mock_ep] + + with patch( + "cuvs_bench.backends.registry.importlib.metadata.entry_points", + return_value=mock_eps, + ): + with pytest.raises(ImportError) as exc_info: + get_backend_class("elastic") + + msg = str(exc_info.value) + assert "pip install cuvs-bench[elastic]" in msg + + def test_import_error_with_opensearch_message_raises_helpful_error( + self, + ): + """Mock entry point raising ImportError(opensearchpy) -> our install message.""" + registry = get_registry() + if "opensearch" in registry._backends: + registry.unregister("opensearch") + unregister_config_loader("opensearch") + + mock_ep = MagicMock() + mock_ep.name = "opensearch" + mock_ep.load.side_effect = ImportError( + "No module named 'opensearchpy'" + ) + + mock_eps = MagicMock() + mock_eps.select.return_value = [mock_ep] + + with patch( + "cuvs_bench.backends.registry.importlib.metadata.entry_points", + return_value=mock_eps, + ): + with pytest.raises(ImportError) as exc_info: + get_backend_class("opensearch") + + msg = str(exc_info.value) + assert "pip install cuvs-bench[opensearch]" in msg + + def test_import_error_unrelated_propagates(self): + """Mock entry point: unrelated ImportError propagates unchanged.""" + mock_ep = MagicMock() + mock_ep.name = "other_plugin" + mock_ep.load.side_effect = ImportError( + "No module named 'something_else'" + ) + + mock_eps = MagicMock() + mock_eps.select.return_value = [mock_ep] + + with patch( + "cuvs_bench.backends.registry.importlib.metadata.entry_points", + return_value=mock_eps, + ): + with pytest.raises(ImportError) as exc_info: + get_backend_class("other_plugin") + + assert "something_else" in str(exc_info.value) + + def test_unexpected_error_propagates(self): + """Mock entry point: RuntimeError propagates.""" + mock_ep = MagicMock() + mock_ep.name = "broken_plugin" + mock_ep.load.side_effect = RuntimeError("Plugin crashed") + + mock_eps = MagicMock() + mock_eps.select.return_value = [mock_ep] + + with patch( + "cuvs_bench.backends.registry.importlib.metadata.entry_points", + return_value=mock_eps, + ): + with pytest.raises(RuntimeError) as exc_info: + get_backend_class("broken_plugin") + + assert "Plugin crashed" in str(exc_info.value) + + def test_no_entry_point_for_name_raises_value_error(self): + """Mock entry point: no plugin for requested name -> ValueError.""" + mock_eps = MagicMock() + mock_eps.select.return_value = [] # No matching entry points + + with patch( + "cuvs_bench.backends.registry.importlib.metadata.entry_points", + return_value=mock_eps, + ): + with pytest.raises(ValueError) as exc_info: + get_backend_class("nonexistent_mock_xyz") + + msg = str(exc_info.value) + assert "nonexistent_mock_xyz" in msg + assert "cpp_gbench" in msg + + def test_unknown_optional_backend_includes_install_hint(self): + """Missing optional backend names include install hints in the error.""" + mock_eps = MagicMock() + mock_eps.select.return_value = [] + + with patch( + "cuvs_bench.backends.registry.importlib.metadata.entry_points", + return_value=mock_eps, + ): + with pytest.raises(ValueError) as exc_info: + get_backend_class("opensearch") + + assert "pip install cuvs-bench[opensearch]" in str(exc_info.value) + + +def _elasticsearch_installed(): + try: + import elasticsearch # noqa: F401 + + return True + except ImportError: + return False + + +@pytest.mark.skipif( + not _elasticsearch_installed(), + reason="Requires pip install cuvs-bench[elastic]", +) +class TestElasticWithExtraInstalled: + """Tests that run only when [elastic] extra is installed.""" + + @pytest.fixture(autouse=True) + def _ensure_elastic_registered(self): + """Re-register elastic (may have been unregistered by other tests).""" + registry = get_registry() + if "elastic" not in registry._backends: + try: + from cuvs_bench.backends.elasticsearch import register + + register() + except ImportError: + pass + yield + + def test_elastic_plugin_loads(self): + """Elastic backend and config loader load when elasticsearch is installed.""" + assert get_backend_class("elastic") is not None + assert get_config_loader("elastic") is not None + + def test_elastic_config_loader_tune_mode_returns_single_config(self): + """Tune mode produces one BenchmarkConfig with Optuna-suggested params.""" + loader_cls = get_config_loader("elastic") + loader = loader_cls() + dataset_config, benchmark_configs = loader.load( + dataset="glove-50-angular", + dataset_path="", + algorithms="elastic_hnsw", + _tune_mode=True, + _tune_build_params={"m": 24, "ef_construction": 150}, + _tune_search_params={"num_candidates": 120}, + ) + assert len(benchmark_configs) == 1 + config = benchmark_configs[0] + assert config.indexes[0].algo == "elastic_hnsw" + assert config.indexes[0].build_param["m"] == 24 + assert config.indexes[0].build_param["ef_construction"] == 150 + assert config.indexes[0].build_param["type"] == "hnsw" + assert config.indexes[0].search_params[0]["num_candidates"] == 120 + assert config.backend_config["name"].startswith("elastic_hnsw_tune") + + def test_elastic_config_loader_sweep_mode_returns_multiple_configs(self): + """Sweep mode produces multiple BenchmarkConfigs from param cartesian product.""" + loader_cls = get_config_loader("elastic") + loader = loader_cls() + dataset_config, benchmark_configs = loader.load( + dataset="glove-50-angular", + dataset_path="", + algorithms="elastic_hnsw", + groups="test", + ) + assert len(benchmark_configs) >= 1 + config = benchmark_configs[0] + assert config.indexes[0].algo == "elastic_hnsw" + assert "m" in config.indexes[0].build_param + assert "num_candidates" in config.indexes[0].search_params[0] + + def test_elastic_config_loader_dataset_not_found_raises(self): + """Config loader raises ValueError for unknown dataset.""" + loader_cls = get_config_loader("elastic") + loader = loader_cls() + with pytest.raises(ValueError, match="not found"): + loader.load(dataset="nonexistent_dataset_xyz", dataset_path="") + + def test_elastic_config_loader_group_not_found_raises(self): + """Config loader raises ValueError for unknown algorithm group.""" + loader_cls = get_config_loader("elastic") + loader = loader_cls() + with pytest.raises(ValueError, match="find elastic groups"): + loader.load( + dataset="glove-50-angular", + dataset_path="", + algorithms="elastic_hnsw", + groups="nonexistent_group_xyz", + ) + + def test_elastic_dry_run_build(self): + """ElasticBackend.build(dry_run=True) returns synthetic result without ES.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + base = np.random.rand(100, 32).astype(np.float32) + queries = np.random.rand(10, 32).astype(np.float32) + dataset = Dataset( + name="test", + training_vectors=base, + query_vectors=queries, + distance_metric="euclidean", + ) + indexes = [ + IndexConfig( + name="elastic_hnsw_test", + algo="elastic_hnsw", + build_param={"m": 16, "ef_construction": 100}, + search_params=[{"num_candidates": 100}], + file="", + ) + ] + + result = backend.build(dataset=dataset, indexes=indexes, dry_run=True) + + assert result.success + assert result.algorithm == "elastic_hnsw" + assert result.build_time_seconds == 0 + + def test_elastic_dry_run_search(self): + """ElasticBackend.search(dry_run=True) returns synthetic result without ES.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + base = np.random.rand(100, 32).astype(np.float32) + queries = np.random.rand(10, 32).astype(np.float32) + dataset = Dataset( + name="test", + training_vectors=base, + query_vectors=queries, + distance_metric="euclidean", + ) + indexes = [ + IndexConfig( + name="elastic_hnsw_test", + algo="elastic_hnsw", + build_param={}, + search_params=[{"num_candidates": 100}], + file="", + ) + ] + + result = backend.search( + dataset=dataset, indexes=indexes, k=10, dry_run=True + ) + + assert result.success + assert result.algorithm == "elastic_hnsw" + assert result.search_time_ms == 0 + + def test_elastic_build_requires_training_vectors(self): + """ElasticBackend.build returns error when no training vectors are available.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + queries = np.random.rand(10, 32).astype(np.float32) + dataset = Dataset( + name="test", + query_vectors=queries, + base_file=None, + query_file=None, + ) + indexes = [ + IndexConfig( + name="elastic_hnsw_test", + algo="elastic_hnsw", + build_param={}, + search_params=[{}], + file="", + ) + ] + + with patch.object( + backend, "_check_network_available", return_value=True + ): + result = backend.build( + dataset=dataset, indexes=indexes, dry_run=False + ) + + assert not result.success + assert "training_vectors" in (result.error_message or "") + + def test_elastic_preflight_fails_when_no_network(self): + """ElasticBackend.build returns success=False when network is unavailable.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + base = np.random.rand(100, 32).astype(np.float32) + queries = np.random.rand(10, 32).astype(np.float32) + dataset = Dataset( + name="test", + training_vectors=base, + query_vectors=queries, + base_file="dummy/base.fbin", + query_file="dummy/query.fbin", + ) + indexes = [ + IndexConfig( + name="elastic_hnsw_test", + algo="elastic_hnsw", + build_param={}, + search_params=[{}], + file="", + ) + ] + + with patch.object( + backend, "_check_network_available", return_value=False + ): + result = backend.build(dataset=dataset, indexes=indexes) + + assert not result.success + assert "pre-flight" in (result.error_message or "").lower() + + def test_elastic_search_preflight_fails_when_no_network(self): + """ElasticBackend.search returns success=False when network is unavailable.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + dataset = Dataset( + name="test", + training_vectors=np.random.rand(100, 32).astype(np.float32), + query_vectors=np.random.rand(10, 32).astype(np.float32), + query_file="dummy/query.fbin", + ) + indexes = [ + IndexConfig( + name="elastic_hnsw_test", + algo="elastic_hnsw", + build_param={}, + search_params=[{"num_candidates": 100}], + file="", + ) + ] + + with patch.object( + backend, "_check_network_available", return_value=False + ): + result = backend.search(dataset=dataset, indexes=indexes, k=10) + + assert not result.success + assert "pre-flight" in (result.error_message or "").lower() + + def test_elastic_build_skips_existing_index_when_force_false(self): + """build(force=False) returns success=True immediately when index already exists.""" + cls = get_backend_class("elastic") + backend = cls( + config={ + "name": "test", + "host": "localhost", + "port": 9200, + "index_name": "test_index", + } + ) + + mock_client = MagicMock() + mock_client.indices.exists.return_value = True + mock_client.indices.stats.return_value = { + "_all": {"primaries": {"store": {"size_in_bytes": 1024}}} + } + + dataset = Dataset( + name="test", + training_vectors=np.random.rand(100, 32).astype(np.float32), + query_vectors=np.random.rand(10, 32).astype(np.float32), + base_file="dummy/base.fbin", + ) + indexes = [ + IndexConfig( + name="test_index", + algo="elastic_hnsw", + build_param={ + "type": "hnsw", + "m": 16, + "ef_construction": 100, + "similarity": "l2_norm", + "number_of_shards": 1, + "number_of_replicas": 0, + "vector_field": "embedding", + }, + search_params=[{"num_candidates": 100}], + file="", + ) + ] + + with patch.object( + backend, "_check_network_available", return_value=True + ): + with patch.object( + backend, "_get_client", return_value=mock_client + ): + result = backend.build( + dataset=dataset, indexes=indexes, force=False + ) + + assert result.success + assert result.index_size_bytes == 1024 + mock_client.indices.delete.assert_not_called() + + def test_elastic_build_uses_lazy_loaded_training_vectors(self): + """ElasticBackend.build works with Dataset lazy-loading via base_file.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + mock_client = MagicMock() + mock_client.indices.exists.return_value = True + mock_client.indices.stats.return_value = { + "_all": {"primaries": {"store": {"size_in_bytes": 2048}}} + } + + dataset = Dataset( + name="test", + query_vectors=np.random.rand(10, 32).astype(np.float32), + base_file="dummy/base.fbin", + ) + indexes = [ + IndexConfig( + name="test_index", + algo="elastic_hnsw", + build_param={"m": 16, "ef_construction": 100}, + search_params=[{"num_candidates": 100}], + file="", + ) + ] + + with patch.object( + backend, "_check_network_available", return_value=True + ): + with patch.object( + backend, "_get_client", return_value=mock_client + ): + with patch( + "cuvs_bench.backends.elasticsearch.load_vectors", + return_value=np.random.rand(100, 32).astype(np.float32), + ): + result = backend.build( + dataset=dataset, indexes=indexes, force=False + ) + + assert result.success + assert result.index_size_bytes == 2048 + + def test_elastic_search_uses_lazy_loaded_query_vectors(self): + """ElasticBackend.search works with Dataset lazy-loading via query_file.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + mock_client = MagicMock() + mock_client.search.return_value = { + "hits": {"hits": [{"_id": "0", "_score": 1.0}]} + } + + dataset = Dataset( + name="test", + training_vectors=np.random.rand(100, 32).astype(np.float32), + query_file="dummy/query.fbin", + groundtruth_neighbors=np.array([[0]], dtype=np.int32), + ) + indexes = [ + IndexConfig( + name="elastic_hnsw_test", + algo="elastic_hnsw", + build_param={}, + search_params=[{"num_candidates": 100}], + file="", + ) + ] + + with patch.object( + backend, "_check_network_available", return_value=True + ): + with patch.object( + backend, "_get_client", return_value=mock_client + ): + with patch( + "cuvs_bench.backends.elasticsearch.load_vectors", + return_value=np.random.rand(1, 32).astype(np.float32), + ): + result = backend.search( + dataset=dataset, indexes=indexes, k=1 + ) + + assert result.success + assert result.neighbors.shape == (1, 1) + + def test_elastic_algo_from_config(self): + """ElasticBackend.algo derives from config type (elastic_hnsw, elastic_int8_hnsw).""" + cls = get_backend_class("elastic") + backend_hnsw = cls(config={"name": "test", "type": "hnsw"}) + assert backend_hnsw.algo == "elastic_hnsw" + + backend_int8 = cls(config={"name": "test", "type": "int8_hnsw"}) + assert backend_int8.algo == "elastic_int8_hnsw" + + def test_elastic_build_rejects_unsupported_index_type(self): + """Elastic build fails early with a descriptive unsupported-index error.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + dataset = Dataset( + name="test", + training_vectors=np.random.rand(8, 4).astype(np.float32), + query_vectors=np.random.rand(2, 4).astype(np.float32), + distance_metric="euclidean", + ) + indexes = [ + IndexConfig( + name="elastic_scann_test", + algo="elastic_scann", + build_param={"type": "scann"}, + search_params=[{"num_candidates": 10}], + file="", + ) + ] + + with patch.object( + backend, "_check_network_available", return_value=True + ): + result = backend.build(dataset=dataset, indexes=indexes) + + assert not result.success + assert "unsupported Elasticsearch index type" in ( + result.error_message or "" + ) + assert "scann" in (result.error_message or "") + + def test_elastic_build_rejects_unsupported_similarity(self): + """Elastic build fails early with a descriptive unsupported-similarity error.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + + dataset = Dataset( + name="test", + training_vectors=np.random.rand(8, 4).astype(np.float32), + query_vectors=np.random.rand(2, 4).astype(np.float32), + distance_metric="euclidean", + ) + indexes = [ + IndexConfig( + name="elastic_hnsw_test", + algo="elastic_hnsw", + build_param={"type": "hnsw", "similarity": "dot_product"}, + search_params=[{"num_candidates": 10}], + file="", + ) + ] + + with patch.object( + backend, "_check_network_available", return_value=True + ): + result = backend.build(dataset=dataset, indexes=indexes) + + assert not result.success + assert "unsupported Elasticsearch similarity" in ( + result.error_message or "" + ) + assert "dot_product" in (result.error_message or "") + + def test_elastic_cleanup_closes_client(self): + """ElasticBackend.cleanup() closes client and sets _client to None.""" + cls = get_backend_class("elastic") + backend = cls( + config={"name": "test", "host": "localhost", "port": 9200} + ) + mock_client = MagicMock() + backend._client = mock_client + + backend.cleanup() + + mock_client.close.assert_called_once() + assert backend._client is None + + def test_orchestrator_elastic_dry_run(self): + """Initializing the elastic orchestrator should support dry_run.""" + from cuvs_bench.orchestrator import BenchmarkOrchestrator + + orch = BenchmarkOrchestrator(backend_type="elastic") + results = orch.run_benchmark( + dataset="glove-50-angular", + dataset_path="/nonexistent", + host="localhost", + port=9200, + algorithms="elastic_hnsw", + groups="test", + build=True, + search=True, + dry_run=True, + count=10, + batch_size=100, + ) + assert results is not None + assert len(results) >= 1 + + +@pytest.mark.skipif( + not _elasticsearch_installed(), + reason="Requires pip install cuvs-bench[elastic]", +) +class TestElasticHelpers: + """Tests for elastic backend helper functions.""" + + @pytest.fixture(autouse=True) + def _ensure_elastic_registered(self): + """Re-register elastic (may have been unregistered by other tests).""" + registry = get_registry() + if "elastic" not in registry._backends: + try: + from cuvs_bench.backends.elasticsearch import register + + register() + except ImportError: + pass + yield + + def test_distance_to_similarity(self): + """_distance_to_similarity maps cuvs distance to ES similarity.""" + from cuvs_bench.backends.elasticsearch import _distance_to_similarity + + assert _distance_to_similarity("euclidean") == "l2_norm" + assert _distance_to_similarity("inner_product") == "max_inner_product" + assert _distance_to_similarity("cosine") == "cosine" + assert _distance_to_similarity("unknown") == "l2_norm" + + def test_validate_elastic_algorithm_rejects_unknown(self): + """Unknown elastic algorithm names fail with a descriptive error.""" + from cuvs_bench.backends.elasticsearch import ( + _validate_elastic_algorithm, + ) + + with pytest.raises( + ValueError, match="unsupported algorithm" + ) as exc_info: + _validate_elastic_algorithm("elastic_scann") + + assert "elastic_hnsw" in str(exc_info.value) + + def test_validate_elastic_index_type_rejects_unknown(self): + """Unknown elastic index types fail with a descriptive error.""" + from cuvs_bench.backends.elasticsearch import ( + _validate_elastic_index_type, + ) + + with pytest.raises( + ValueError, + match="unsupported Elasticsearch index type", + ) as exc_info: + _validate_elastic_index_type("scann") + + assert "hnsw" in str(exc_info.value) + + def test_validate_elastic_similarity_rejects_unknown(self): + """Unknown elastic similarities fail with a descriptive error.""" + from cuvs_bench.backends.elasticsearch import ( + _validate_elastic_similarity, + ) + + with pytest.raises( + ValueError, + match="unsupported Elasticsearch similarity", + ) as exc_info: + _validate_elastic_similarity("dot_product") + + assert "max_inner_product" in str(exc_info.value) + + def test_load_fbin(self): + """_load_fbin loads big-ann-bench fbin format.""" + import tempfile + from pathlib import Path + + from cuvs_bench.backends.elasticsearch import _load_fbin + + data = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32) + with tempfile.NamedTemporaryFile(suffix=".fbin", delete=False) as f: + path = Path(f.name) + try: + with open(path, "wb") as f: + np.array([2, 2], dtype=np.uint32).tofile(f) + data.tofile(f) + loaded = _load_fbin(path) + np.testing.assert_array_equal(loaded, data) + finally: + path.unlink(missing_ok=True) + + def test_load_ibin(self): + """_load_ibin loads big-ann-bench ibin format.""" + import tempfile + from pathlib import Path + + from cuvs_bench.backends.elasticsearch import _load_ibin + + data = np.array([[1, 2], [3, 4]], dtype=np.int32) + with tempfile.NamedTemporaryFile(suffix=".ibin", delete=False) as f: + path = Path(f.name) + try: + with open(path, "wb") as f: + np.array([2, 2], dtype=np.uint32).tofile(f) + data.tofile(f) + loaded = _load_ibin(path) + np.testing.assert_array_equal(loaded, data) + finally: + path.unlink(missing_ok=True) diff --git a/python/cuvs_bench/pyproject.toml b/python/cuvs_bench/pyproject.toml index 8df6955978..7878c03374 100644 --- a/python/cuvs_bench/pyproject.toml +++ b/python/cuvs_bench/pyproject.toml @@ -42,10 +42,17 @@ classifiers = [ opensearch = [ "opensearch-py>=2.4.0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. +elastic = ["elasticsearch>=8.0"] [project.urls] Homepage = "https://github.com/rapidsai/cuvs" +[project.entry-points."cuvs_bench.backends"] +elastic = "cuvs_bench.backends.elasticsearch:register" + +[project.entry-points."cuvs_bench.config_loaders"] +elastic = "cuvs_bench.backends.elasticsearch:register" + [tool.setuptools.package-data] "*" = ["*.*", "VERSION"]