Add cuvs-bench-elastic: HTTP backend for Elasticsearch GPU vector search#1907
Add cuvs-bench-elastic: HTTP backend for Elasticsearch GPU vector search#1907afourniernv wants to merge 17 commits into
Conversation
Introduce cuvs-bench-elastic as an optional plugin for cuvs-bench that provides an Elasticsearch backend. The backend communicates with Elasticsearch via HTTP and supports HNSW indexing with optional GPU acceleration when using the Elasticsearch GPU image (cuVS-accelerated vector search). - Add cuvs_bench_elastic package with backend and config loader entry points - Extend cuvs_bench registry and search spaces for pluggable backends - Add elastic and integration optional dependencies to cuvs-bench - Add modularization tests and integration test scaffolding (disabled until CI has ES GPU image, cuVS libs, and GPU runner) Signed-off-by: Alex Fournier <afournier@nvidia.com>
Use single-doc format (_index, _id, vector_field) instead of two-part NDJSON (index action + source) so ES accepts the bulk request. Signed-off-by: Alex Fournier <afournier@nvidia.com>
Expose ELASTIC constant and convenience wrappers for build-only, search-only, or full benchmark runs. Signed-off-by: Alex Fournier <afournier@nvidia.com>
ec0b3e3 to
7b25521
Compare
- Document run_build, run_search, run_benchmark convenience API - Document ELASTIC constant and orchestrator usage - Add username/password support in config loader (converts to basic_auth) Signed-off-by: Alex Fournier <afournier@nvidia.com>
- New `cuvs_bench_elastic` package with HTTP backend for Elasticsearch GPU vector search (HNSW, int8_hnsw, int4_hnsw, bbq_hnsw index types) - Supports `pip install cuvs-bench[elastic]` without a separate PyPI publish: `cuvs_bench` bundles the plugin via setuptools packages.find - Plugin registers via entry points (`cuvs_bench.backends` / `cuvs_bench.config_loaders`) — no changes to core cuvs-bench required - `ElasticConfigLoader` reads shared `datasets.yaml` from cuvs_bench and `elastic.yaml` from the plugin config; supports sweep and tune modes - `build()` checks index existence before file validation so `force=False` returns immediately without requiring the base file on disk - Removed testcontainers-based integration tests; added unit tests for pre-flight failure, force=False skip, dry-run, helper functions - `elasticsearch` client is an optional dep (`cuvs-bench[elastic]` extra)
dca9c6f to
087289d
Compare
The separate cuvs_bench_elastic package required bundling via packages.find and complicated the build. Instead, keep the backend inside cuvs_bench and use entry points pointing back into the same package so the backend only registers when elasticsearch is installed. - git mv backend.py to cuvs_bench/backends/elasticsearch.py - git mv elastic.yaml to cuvs_bench/config/algos/ - Fix imports to relative paths - Fix _get_elastic_config_path() to use ../config from backends/ - Update pyproject.toml: entry points -> cuvs_bench.backends.elasticsearch:register - Remove packages.find (no longer needed) - Remove cuvs_bench_elastic/ package entirely DX unchanged: pip install cuvs-bench[elastic] One package, one publish pipeline. Signed-off-by: Alex Fournier <afournier@nvidia.com>
…ch.py Restores the high-level API that was previously in cuvs_bench_elastic/__init__.py so existing demo scripts continue to work after the module consolidation. Signed-off-by: Alex Fournier <afournier@nvidia.com>
| "Install with: pip install cuvs-bench[elastic]" | ||
| ) from e | ||
| host = self.config.get("host", "localhost") | ||
| port = self.config.get("port", 9200) |
There was a problem hiding this comment.
Just want to confirm- are there any other protocols to interact with Elasticsearch or is HTTP/Rest pretty much it? Mostly asking to make sure it's the best (fastest) way to comunicate so we aren't adding additional overheads in the mix.
There was a problem hiding this comment.
Confirmed. For Elasticsearch the supported client path here is the official HTTP/REST interface; the backend does not add another protocol layer on top of that. We re-ran the backend live against the Brev deployment after the recent fixes. Leaving this open since it is more an interface tradeoff than a code bug.
| index_type = build_params.get("type", _DEFAULT_INDEX_TYPE) | ||
| m = build_params.get("m", _DEFAULT_M) | ||
| ef_construction = build_params.get( | ||
| "ef_construction", _DEFAULT_EF_CONSTRUCTION |
There was a problem hiding this comment.
We should decouple the index type from the backend type. User should be able to specify the backend and the algorithm they want to run in that backend (or maybe the two are coupled, but the backend should not assume the algorithm).
So for example, what if the index type is not supported? What happens when a user wants to test diskbbq against hnsw/cagra? We should decouple these from the beginning, ideally with separate functions for each (for e.g. _parse_cagra_params(), _parse_hnsw_params())
There was a problem hiding this comment.
It's also important that we throw a proper (descriptive) error when the backend doesn't support an index type. For example, if someone passes in "SCaNN" to the Elasticsearch backend, they should get a message to the effect of "Received params for SCaNN index type, but Elasticsearch backend does not support this index type. Please check configuration."
There was a problem hiding this comment.
Partially addressed. The backend now validates supported algorithm names, index types, and similarities up front with descriptive errors instead of failing later in Elasticsearch. It still models the benchmark family as elastic_* plus build-param type, so this is better than before but not a fully generic backend/index abstraction yet.
| # SPDX-License-Identifier: Apache-2.0 | ||
| # | ||
| """ | ||
| Smoke tests for cuvs-bench modularization (optional deps, entry points, lazy loading). |
There was a problem hiding this comment.
It's great to see improved test coverage!
|
@afourniernv this is a really big PR so I'm reviewing in stages. Thank you for bearing with me. I finally had some time to look through the implementation for comprehensively. Mostly minor, but important things. |
# Conflicts: # python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py # python/cuvs_bench/cuvs_bench/tests/test_registry.py
Signed-off-by: Alex Fournier <afournier@nvidia.com>
Would still like to refine a bit but pushing up for early feedback cc @singhmanas1 @janakivamaraju @afourniernv xref #1907 which does something similar but for Elastic EDIT: Okay, this is now ready for review (cc @jnke2016) Authors: - James Bourbeau (https://github.com/jrbourbeau) - Corey J. Nolet (https://github.com/cjnolet) Approvers: - Joseph Nke (https://github.com/jnke2016) - Corey J. Nolet (https://github.com/cjnolet) - James Lamb (https://github.com/jameslamb) URL: #2012
Signed-off-by: Alex Fournier <afournier@nvidia.com> # Conflicts: # dependencies.yaml # python/cuvs_bench/cuvs_bench/backends/search_spaces.py # python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py
Signed-off-by: Alex Fournier <afournier@nvidia.com>
Signed-off-by: Alex Fournier <afournier@nvidia.com>
📝 WalkthroughSummary by CodeRabbitRelease Notes
WalkthroughAdds a complete Elasticsearch GPU HNSW backend to cuvs-bench with pluggable entry-point discovery, centralized recall computation, configuration loading, search-space parameters, and comprehensive tests. Also modernizes OpenSearch array initialization and extends the registry to support optional backend dependencies with installation hints. ChangesElasticsearch GPU Backend with Plugin Discovery
🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@dependencies.yaml`:
- Around line 596-600: The pyproject extra for "elastic" is drifting because
dependencies.yaml defines bench_elastic -> cuvs-bench-elastic>=26.4.0 while
python/cuvs_bench/pyproject.toml currently lists elastic =
["elasticsearch>=8.0"]; update the source-of-truth to keep them in sync by
changing dependencies.yaml's bench_elastic entry to include the same package(s)
as the pyproject extra (or update python/cuvs_bench/pyproject.toml to match
dependencies.yaml) so that project.optional-dependencies.elastic (via
py_elastic_cuvs_bench / bench_elastic) consistently maps to the intended package
list and version constraint.
In `@python/cuvs_bench/cuvs_bench/backends/elasticsearch.py`:
- Around line 256-269: The current checks return a BuildResult immediately when
dataset.training_vectors (and similarly dataset.query_vectors) are zero-length,
but never attempt to load file-backed data from dataset.base_file /
dataset.query_file; update the logic in the Elasticsearch backend where
dataset.training_vectors is checked (and the analogous block for query_vectors)
to: if vectors.size == 0 and dataset.base_file (or dataset.query_file) is set,
attempt to load the vectors from that file (e.g., via the project’s dataset
loading utility or numpy.load) and re-check size; only return the BuildResult
error (preserving algorithm=self.algo and other fields) if no file is present or
loading still yields an empty array. Ensure you reference
dataset.training_vectors, dataset.base_file, dataset.query_vectors,
dataset.query_file, and BuildResult when making the change so the file-backed
path is executed before failing.
In `@python/cuvs_bench/cuvs_bench/backends/registry.py`:
- Around line 415-440: The function _try_load_plugin currently returns after
loading the first matching entry point, which causes it to short-circuit and
skip checking the other registry group (_BACKENDS_GROUP vs
_CONFIG_LOADERS_GROUP) for the same plugin name; change the control flow so that
after a successful ep.load()() you do not return from _try_load_plugin but
instead continue scanning remaining groups/entry-points (i.e., remove/relocate
the unconditional return and only exit once all groups/entry-points have been
iterated or explicitly determine both backend and config-loader are loaded),
ensuring get_config_loader() can find same-named plugins that split
registrations across groups.
In `@python/cuvs_bench/cuvs_bench/tests/conftest.py`:
- Around line 19-23: pytest_configure currently only attempts "from
cuvs_bench_elastic import register" and swallows ImportError, which prevents the
in-tree backend's register() from being used; update pytest_configure in
conftest.py to try the external import first, and if that raises ImportError,
import the in-tree backend's register (e.g. from
cuvs_bench.backends.elasticsearch import register) as a fallback, and ensure any
remaining ImportError is not silently suppressed (log or re-raise) so
registration failures are visible; reference the pytest_configure function and
the register() symbol when making the change.
In `@python/cuvs_bench/cuvs_bench/tests/test_modularization.py`:
- Around line 35-1002: The tests use class-based pytest suites
(TestModularizationSmoke, TestPluginLoaderMocked, TestElasticWithExtraInstalled,
TestElasticHelpers) which diverges from the project's preferred standalone test
function style; refactor each test method (e.g., test_cpp_gbench_available,
test_import_error_with_elasticsearch_message_raises_helpful_error,
test_elastic_config_loader_tune_mode_returns_single_config,
test_distance_to_similarity, etc.) into top-level test_* functions, move any
per-class fixtures like _ensure_elastic_registered into module-level fixtures or
helper functions, convert helper methods such as _make_mock_register into plain
module helpers, and ensure cleanup/patching logic (registry.unregister,
unregister_config_loader, patch context managers) is preserved in the new
functions so behavior remains identical.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 0a62f800-07f3-46e8-9700-760e774b2c2a
📒 Files selected for processing (10)
dependencies.yamlpython/cuvs_bench/cuvs_bench/backends/elasticsearch.pypython/cuvs_bench/cuvs_bench/backends/opensearch.pypython/cuvs_bench/cuvs_bench/backends/registry.pypython/cuvs_bench/cuvs_bench/backends/search_spaces.pypython/cuvs_bench/cuvs_bench/config/algos/elastic.yamlpython/cuvs_bench/cuvs_bench/orchestrator/orchestrator.pypython/cuvs_bench/cuvs_bench/tests/conftest.pypython/cuvs_bench/cuvs_bench/tests/test_modularization.pypython/cuvs_bench/pyproject.toml
| bench_elastic: | ||
| common: | ||
| - output_types: [conda, pyproject, requirements] | ||
| packages: | ||
| - cuvs-bench-elastic>=26.4.0 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== pyproject elastic extra ==="
sed -n '41,56p' python/cuvs_bench/pyproject.toml
echo
echo "=== dependencies.yaml elastic source-of-truth ==="
rg -n "py_elastic_cuvs_bench|bench_elastic|cuvs-bench-elastic|elasticsearch>=8.0" dependencies.yaml -A4 -B3Repository: rapidsai/cuvs
Length of output: 1500
Fix elastic extra source-of-truth drift between dependencies.yaml and generated pyproject.toml
dependencies.yaml maps project.optional-dependencies.elastic (via py_elastic_cuvs_bench → bench_elastic) to cuvs-bench-elastic>=26.4.0, but python/cuvs_bench/pyproject.toml currently declares elastic = ["elasticsearch>=8.0"]—regenerating from dependencies.yaml will overwrite the committed pyproject extra and create install-hint/metadata drift.
Suggested fix
bench_elastic:
common:
- output_types: [conda, pyproject, requirements]
packages:
- - cuvs-bench-elastic>=26.4.0
+ - elasticsearch>=8.0🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@dependencies.yaml` around lines 596 - 600, The pyproject extra for "elastic"
is drifting because dependencies.yaml defines bench_elastic ->
cuvs-bench-elastic>=26.4.0 while python/cuvs_bench/pyproject.toml currently
lists elastic = ["elasticsearch>=8.0"]; update the source-of-truth to keep them
in sync by changing dependencies.yaml's bench_elastic entry to include the same
package(s) as the pyproject extra (or update python/cuvs_bench/pyproject.toml to
match dependencies.yaml) so that project.optional-dependencies.elastic (via
py_elastic_cuvs_bench / bench_elastic) consistently maps to the intended package
list and version constraint.
Source: Coding guidelines
| _SUPPORTED_INDEX_TYPES = ("hnsw", "int8_hnsw", "int4_hnsw", "bbq_hnsw") | ||
| _SUPPORTED_SIMILARITIES = ("l2_norm", "cosine", "max_inner_product") | ||
| _SUPPORTED_ALGOS = tuple( | ||
| f"elastic_{index_type}" for index_type in _SUPPORTED_INDEX_TYPES | ||
| ) |
There was a problem hiding this comment.
HIGH: advertised elastic_* algorithms are not all selectable through the loader.
Line 110-114 exposes four supported algorithms, but Line 699-700 only accepts exact YAML name matches; with elastic.yaml naming only elastic_hnsw, requests like algorithms="elastic_int8_hnsw" are rejected.
Impact: tune/sweep paths cannot run several public algorithm names that are already exposed in search spaces and validation.
Have you considered normalizing elastic_int8_hnsw / elastic_int4_hnsw / elastic_bbq_hnsw to the elastic_hnsw config with an injected build.type, or adding dedicated YAML algorithm entries for each name?
As per coding guidelines, “Prevent API breaking changes to public Python interfaces, including removing or renaming public methods/attributes without proper deprecation (minimum one release cycle).”
Also applies to: 699-700, 719-726
Source: Coding guidelines
| vectors = dataset.training_vectors | ||
| if vectors.size == 0: | ||
| return BuildResult( | ||
| index_path="", | ||
| build_time_seconds=0.0, | ||
| index_size_bytes=0, | ||
| algorithm=self.algo, | ||
| build_params={}, | ||
| success=False, | ||
| error_message=( | ||
| "training_vectors are required for Elasticsearch backend " | ||
| "(directly or via dataset.base_file)" | ||
| ), | ||
| ) |
There was a problem hiding this comment.
CRITICAL: file-backed dataset path is declared but never executed.
Line 256 and Line 441 fail immediately on empty in-memory arrays, but the code never loads from dataset.base_file / dataset.query_file even though the error text says it does.
Impact: with BenchmarkOrchestrator._create_dataset (python/cuvs_bench/cuvs_bench/orchestrator/orchestrator.py Line 633-634), normal backend runs fail before index/search work starts.
Proposed fix
- vectors = dataset.training_vectors
- if vectors.size == 0:
+ vectors = dataset.training_vectors
+ if vectors.size == 0 and dataset.base_file:
+ vectors = load_vectors(dataset.base_file)
+ if vectors.size == 0:
return BuildResult(
index_path="",
build_time_seconds=0.0,
index_size_bytes=0,
@@
- query_vectors = dataset.query_vectors
- if query_vectors.size == 0:
+ query_vectors = dataset.query_vectors
+ if query_vectors.size == 0 and dataset.query_file:
+ query_vectors = load_vectors(dataset.query_file)
+ if query_vectors.size == 0:
return SearchResult(
neighbors=np.empty((0, k), dtype=np.int64),
distances=np.empty((0, k), dtype=np.float32),As per coding guidelines, “Ensure missing validation does not cause crashes on invalid input through proper size/type checks” and “Handle edge cases in memory resource code including zero-size arrays and alignment requirements.”
Also applies to: 441-455
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/cuvs_bench/cuvs_bench/backends/elasticsearch.py` around lines 256 -
269, The current checks return a BuildResult immediately when
dataset.training_vectors (and similarly dataset.query_vectors) are zero-length,
but never attempt to load file-backed data from dataset.base_file /
dataset.query_file; update the logic in the Elasticsearch backend where
dataset.training_vectors is checked (and the analogous block for query_vectors)
to: if vectors.size == 0 and dataset.base_file (or dataset.query_file) is set,
attempt to load the vectors from that file (e.g., via the project’s dataset
loading utility or numpy.load) and re-check size; only return the BuildResult
error (preserving algorithm=self.algo and other fields) if no file is present or
loading still yields an empty array. Ensure you reference
dataset.training_vectors, dataset.base_file, dataset.query_vectors,
dataset.query_file, and BuildResult when making the change so the file-backed
path is executed before failing.
Source: Coding guidelines
| def _try_load_plugin(name: str) -> None: | ||
| """ | ||
| Try to load backend and config loader from entry points for the given name. | ||
|
|
||
| Plugins register themselves when their entry point is loaded. | ||
| Raises ImportError with install instructions if the plugin requires | ||
| an optional dependency that is not installed. | ||
| """ | ||
| for group in (_BACKENDS_GROUP, _CONFIG_LOADERS_GROUP): | ||
| try: | ||
| eps = importlib.metadata.entry_points(group=group) | ||
| except TypeError: | ||
| eps = importlib.metadata.entry_points().get(group, []) | ||
| if hasattr(eps, "select"): # Python 3.10+ | ||
| eps = list(eps.select(name=name)) | ||
| else: | ||
| eps = [e for e in eps if e.name == name] | ||
| for ep in eps: | ||
| try: | ||
| ep.load()() | ||
| except ImportError as e: | ||
| rewritten = _rewrite_optional_backend_import_error(name, e) | ||
| if rewritten is not None: | ||
| raise rewritten from e | ||
| raise | ||
| return # Plugin loaded successfully |
There was a problem hiding this comment.
HIGH: _try_load_plugin() can short-circuit config-loader discovery.
At Line 440, the function returns after the first successful load in _BACKENDS_GROUP, so get_config_loader() (Line 533) may never check _CONFIG_LOADERS_GROUP for same-named plugins that split registrations.
Suggested fix
-def _try_load_plugin(name: str) -> None:
+def _try_load_plugin(
+ name: str,
+ groups: tuple[str, ...] = (_BACKENDS_GROUP, _CONFIG_LOADERS_GROUP),
+) -> None:
@@
- for group in (_BACKENDS_GROUP, _CONFIG_LOADERS_GROUP):
+ for group in groups:
@@
- return # Plugin loaded successfully
+ return # Plugin loaded successfully
@@
def get_config_loader(name: str) -> Type:
@@
if name not in _CONFIG_LOADER_REGISTRY:
- _try_load_plugin(name)
+ _try_load_plugin(
+ name,
+ groups=(_CONFIG_LOADERS_GROUP, _BACKENDS_GROUP),
+ )As per coding guidelines, integration errors are HIGH-priority review targets.
Also applies to: 532-534
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/cuvs_bench/cuvs_bench/backends/registry.py` around lines 415 - 440,
The function _try_load_plugin currently returns after loading the first matching
entry point, which causes it to short-circuit and skip checking the other
registry group (_BACKENDS_GROUP vs _CONFIG_LOADERS_GROUP) for the same plugin
name; change the control flow so that after a successful ep.load()() you do not
return from _try_load_plugin but instead continue scanning remaining
groups/entry-points (i.e., remove/relocate the unconditional return and only
exit once all groups/entry-points have been iterated or explicitly determine
both backend and config-loader are loaded), ensuring get_config_loader() can
find same-named plugins that split registrations across groups.
Source: Coding guidelines
| try: | ||
| from cuvs_bench_elastic import register | ||
| register() | ||
| except ImportError: | ||
| pass |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== conftest registration block ==="
sed -n '8,24p' python/cuvs_bench/cuvs_bench/tests/conftest.py
echo
echo "=== in-tree elastic register symbol ==="
rg -n "def register\\(" python/cuvs_bench/cuvs_bench/backends/elasticsearch.py -C2 || true
echo
echo "=== legacy cuvs_bench_elastic module paths in repo ==="
fd -HI "cuvs_bench_elastic" python || trueRepository: rapidsai/cuvs
Length of output: 809
conftest elastic plugin registration ignores the in-tree backend and silently drops failures
python/cuvs_bench/cuvs_bench/tests/conftest.py’s pytest_configure only tries from cuvs_bench_elastic import register and swallows ImportError; the in-tree entry point python/cuvs_bench/cuvs_bench/backends/elasticsearch.py defines register(), and the legacy cuvs_bench_elastic module doesn’t exist in this repo—so source-tree runs may not register the elastic backend as the docstring claims.
Suggested fix
- try:
- from cuvs_bench_elastic import register
- register()
- except ImportError:
- pass
+ try:
+ from cuvs_bench.backends.elasticsearch import register
+ except ImportError:
+ try:
+ from cuvs_bench_elastic import register # legacy fallback
+ except ImportError:
+ return
+ register()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/cuvs_bench/cuvs_bench/tests/conftest.py` around lines 19 - 23,
pytest_configure currently only attempts "from cuvs_bench_elastic import
register" and swallows ImportError, which prevents the in-tree backend's
register() from being used; update pytest_configure in conftest.py to try the
external import first, and if that raises ImportError, import the in-tree
backend's register (e.g. from cuvs_bench.backends.elasticsearch import register)
as a fallback, and ensure any remaining ImportError is not silently suppressed
(log or re-raise) so registration failures are visible; reference the
pytest_configure function and the register() symbol when making the change.
Source: Coding guidelines
| class TestModularizationSmoke: | ||
| """Smoke tests for optional backend loading and error handling.""" | ||
|
|
||
| def test_cpp_gbench_available(self): | ||
| """cpp_gbench (built-in) should always be available.""" | ||
| backends = list_backends() | ||
| assert "cpp_gbench" in backends | ||
| cls = get_backend_class("cpp_gbench") | ||
| assert cls is not None | ||
|
|
||
| def test_cpp_gbench_config_loader_available(self): | ||
| """cpp_gbench config loader should always be registered.""" | ||
| loaders = list_config_loaders() | ||
| assert "cpp_gbench" in loaders | ||
| loader_cls = get_config_loader("cpp_gbench") | ||
| assert loader_cls is not None | ||
|
|
||
| def test_elastic_without_extra_raises_clear_error(self): | ||
| """Requesting elastic without [elastic] installed raises helpful error.""" | ||
| try: | ||
| import elasticsearch # noqa: F401 | ||
|
|
||
| pytest.skip( | ||
| "elasticsearch is installed; cannot test missing-plugin path" | ||
| ) | ||
| except ImportError: | ||
| pass | ||
| import importlib.metadata | ||
|
|
||
| all_eps = importlib.metadata.entry_points() | ||
| if hasattr(all_eps, "select"): | ||
| eps = list( | ||
| all_eps.select(group="cuvs_bench.backends", name="elastic") | ||
| ) | ||
| else: | ||
| eps = [ | ||
| e | ||
| for e in all_eps.get("cuvs_bench.backends", []) | ||
| if e.name == "elastic" | ||
| ] | ||
| if eps: | ||
| pytest.skip( | ||
| "cuvs-bench-elastic is installed; cannot test missing-plugin path" | ||
| ) | ||
|
|
||
| with pytest.raises((ImportError, ValueError)) as exc_info: | ||
| get_backend_class("elastic") | ||
|
|
||
| msg = str(exc_info.value) | ||
| assert "elastic" in msg.lower() | ||
|
|
||
| def test_elastic_config_loader_without_extra_raises_clear_error(self): | ||
| """Requesting elastic config loader without [elastic] raises helpful error.""" | ||
| try: | ||
| import elasticsearch # noqa: F401 | ||
|
|
||
| pytest.skip( | ||
| "elasticsearch is installed; cannot test missing-plugin path" | ||
| ) | ||
| except ImportError: | ||
| pass | ||
| import importlib.metadata | ||
|
|
||
| all_eps = importlib.metadata.entry_points() | ||
| if hasattr(all_eps, "select"): | ||
| eps = list( | ||
| all_eps.select( | ||
| group="cuvs_bench.config_loaders", name="elastic" | ||
| ) | ||
| ) | ||
| else: | ||
| eps = [ | ||
| e | ||
| for e in all_eps.get("cuvs_bench.config_loaders", []) | ||
| if e.name == "elastic" | ||
| ] | ||
| if eps: | ||
| pytest.skip( | ||
| "cuvs-bench-elastic is installed; cannot test missing-plugin path" | ||
| ) | ||
|
|
||
| with pytest.raises((ImportError, ValueError)) as exc_info: | ||
| get_config_loader("elastic") | ||
|
|
||
| msg = str(exc_info.value) | ||
| assert "elastic" in msg.lower() | ||
|
|
||
| def test_unknown_backend_raises_value_error(self): | ||
| """Requesting unknown backend raises ValueError with available backends.""" | ||
| with pytest.raises(ValueError) as exc_info: | ||
| get_backend_class("nonexistent_backend_xyz") | ||
|
|
||
| msg = str(exc_info.value) | ||
| assert "nonexistent_backend_xyz" in msg | ||
| assert "cpp_gbench" in msg | ||
|
|
||
| def test_unknown_config_loader_raises_value_error(self): | ||
| """Requesting unknown config loader raises ValueError.""" | ||
| with pytest.raises(ValueError) as exc_info: | ||
| get_config_loader("nonexistent_loader_xyz") | ||
|
|
||
| msg = str(exc_info.value) | ||
| assert "nonexistent_loader_xyz" in msg | ||
|
|
||
| def test_orchestrator_cpp_gbench_no_regression(self): | ||
| """Initializing BenchmarkOrchestrator with cpp_gbench should work.""" | ||
| from cuvs_bench.orchestrator import BenchmarkOrchestrator | ||
|
|
||
| assert "cpp_gbench" in BenchmarkOrchestrator.available_backends() | ||
| orch = BenchmarkOrchestrator(backend_type="cpp_gbench") | ||
| assert orch.backend_type == "cpp_gbench" | ||
| assert orch.backend_class is not None | ||
| assert orch.config_loader is not None | ||
|
|
||
| def test_orchestrator_respects_authoritative_backend_recall(self): | ||
| """Backends can mark their own recall as authoritative.""" | ||
| from cuvs_bench.orchestrator.orchestrator import _should_compute_recall | ||
|
|
||
| result = SearchResult( | ||
| neighbors=np.array([[1, 2, 3]], dtype=np.int64), | ||
| distances=np.array([[0.1, 0.2, 0.3]], dtype=np.float32), | ||
| search_time_ms=1.0, | ||
| queries_per_second=1.0, | ||
| recall=0.5, | ||
| algorithm="elastic_hnsw", | ||
| search_params=[{"num_candidates": 100}], | ||
| metadata={"recall_is_authoritative": True}, | ||
| success=True, | ||
| ) | ||
|
|
||
| assert not _should_compute_recall(result) | ||
|
|
||
|
|
||
| class TestPluginLoaderMocked: | ||
| """ | ||
| Tests using mocked entry points (NAT-style). | ||
|
|
||
| These tests do not require elasticsearch or real plugins. | ||
| """ | ||
|
|
||
| _MOCK_PLUGIN_NAME = "mock_plugin_test" | ||
|
|
||
| @staticmethod | ||
| def _make_mock_register(): | ||
| """Create a register function that registers a minimal stub backend and loader.""" | ||
|
|
||
| class StubBackend(BenchmarkBackend): | ||
| def build(self, dataset, indexes, force=False, dry_run=False): | ||
| return BuildResult( | ||
| index_path="", | ||
| build_time_seconds=0, | ||
| index_size_bytes=0, | ||
| algorithm="stub", | ||
| build_params={}, | ||
| success=True, | ||
| ) | ||
|
|
||
| def search( | ||
| self, | ||
| dataset, | ||
| indexes, | ||
| k=10, | ||
| batch_size=10000, | ||
| mode="latency", | ||
| force=False, | ||
| search_threads=None, | ||
| dry_run=False, | ||
| ): | ||
| import numpy as np | ||
|
|
||
| return SearchResult( | ||
| neighbors=np.empty((0, k)), | ||
| distances=np.empty((0, k)), | ||
| search_time_ms=0, | ||
| queries_per_second=0, | ||
| recall=0, | ||
| algorithm="stub", | ||
| search_params=[], | ||
| success=True, | ||
| ) | ||
|
|
||
| class StubConfigLoader(ConfigLoader): | ||
| @property | ||
| def backend_type(self): | ||
| return TestPluginLoaderMocked._MOCK_PLUGIN_NAME | ||
|
|
||
| def load(self, **kwargs): | ||
| raise NotImplementedError("Stub loader") | ||
|
|
||
| def register(): | ||
| register_backend( | ||
| TestPluginLoaderMocked._MOCK_PLUGIN_NAME, StubBackend | ||
| ) | ||
| register_config_loader( | ||
| TestPluginLoaderMocked._MOCK_PLUGIN_NAME, StubConfigLoader | ||
| ) | ||
|
|
||
| return register | ||
|
|
||
| def test_valid_plugin_loads_via_mock_entry_point(self): | ||
| """Mock entry point: valid plugin registers and is discoverable.""" | ||
| mock_register = self._make_mock_register() | ||
| mock_ep = MagicMock() | ||
| mock_ep.name = self._MOCK_PLUGIN_NAME | ||
| mock_ep.load.return_value = mock_register | ||
|
|
||
| mock_eps = MagicMock() | ||
| mock_eps.select.return_value = [mock_ep] | ||
|
|
||
| with patch( | ||
| "cuvs_bench.backends.registry.importlib.metadata.entry_points", | ||
| return_value=mock_eps, | ||
| ): | ||
| cls = get_backend_class(self._MOCK_PLUGIN_NAME) | ||
| assert cls is not None | ||
|
|
||
| loader_cls = get_config_loader(self._MOCK_PLUGIN_NAME) | ||
| assert loader_cls is not None | ||
|
|
||
| # Cleanup | ||
| get_registry().unregister(self._MOCK_PLUGIN_NAME) | ||
| unregister_config_loader(self._MOCK_PLUGIN_NAME) | ||
|
|
||
| def test_import_error_with_elasticsearch_message_raises_helpful_error( | ||
| self, | ||
| ): | ||
| """Mock entry point raising ImportError(elasticsearch) -> our install message.""" | ||
| # Ensure elastic is not in registry (e.g. from TestElasticWithExtraInstalled) | ||
| registry = get_registry() | ||
| if "elastic" in registry._backends: | ||
| registry.unregister("elastic") | ||
| unregister_config_loader("elastic") | ||
|
|
||
| mock_ep = MagicMock() | ||
| mock_ep.name = "elastic" | ||
| mock_ep.load.side_effect = ImportError( | ||
| "No module named 'elasticsearch'" | ||
| ) | ||
|
|
||
| mock_eps = MagicMock() | ||
| mock_eps.select.return_value = [mock_ep] | ||
|
|
||
| with patch( | ||
| "cuvs_bench.backends.registry.importlib.metadata.entry_points", | ||
| return_value=mock_eps, | ||
| ): | ||
| with pytest.raises(ImportError) as exc_info: | ||
| get_backend_class("elastic") | ||
|
|
||
| msg = str(exc_info.value) | ||
| assert "pip install cuvs-bench[elastic]" in msg | ||
|
|
||
| def test_import_error_with_opensearch_message_raises_helpful_error( | ||
| self, | ||
| ): | ||
| """Mock entry point raising ImportError(opensearchpy) -> our install message.""" | ||
| registry = get_registry() | ||
| if "opensearch" in registry._backends: | ||
| registry.unregister("opensearch") | ||
| unregister_config_loader("opensearch") | ||
|
|
||
| mock_ep = MagicMock() | ||
| mock_ep.name = "opensearch" | ||
| mock_ep.load.side_effect = ImportError( | ||
| "No module named 'opensearchpy'" | ||
| ) | ||
|
|
||
| mock_eps = MagicMock() | ||
| mock_eps.select.return_value = [mock_ep] | ||
|
|
||
| with patch( | ||
| "cuvs_bench.backends.registry.importlib.metadata.entry_points", | ||
| return_value=mock_eps, | ||
| ): | ||
| with pytest.raises(ImportError) as exc_info: | ||
| get_backend_class("opensearch") | ||
|
|
||
| msg = str(exc_info.value) | ||
| assert "pip install cuvs-bench[opensearch]" in msg | ||
|
|
||
| def test_import_error_unrelated_propagates(self): | ||
| """Mock entry point: unrelated ImportError propagates unchanged.""" | ||
| mock_ep = MagicMock() | ||
| mock_ep.name = "other_plugin" | ||
| mock_ep.load.side_effect = ImportError( | ||
| "No module named 'something_else'" | ||
| ) | ||
|
|
||
| mock_eps = MagicMock() | ||
| mock_eps.select.return_value = [mock_ep] | ||
|
|
||
| with patch( | ||
| "cuvs_bench.backends.registry.importlib.metadata.entry_points", | ||
| return_value=mock_eps, | ||
| ): | ||
| with pytest.raises(ImportError) as exc_info: | ||
| get_backend_class("other_plugin") | ||
|
|
||
| assert "something_else" in str(exc_info.value) | ||
|
|
||
| def test_unexpected_error_propagates(self): | ||
| """Mock entry point: RuntimeError propagates.""" | ||
| mock_ep = MagicMock() | ||
| mock_ep.name = "broken_plugin" | ||
| mock_ep.load.side_effect = RuntimeError("Plugin crashed") | ||
|
|
||
| mock_eps = MagicMock() | ||
| mock_eps.select.return_value = [mock_ep] | ||
|
|
||
| with patch( | ||
| "cuvs_bench.backends.registry.importlib.metadata.entry_points", | ||
| return_value=mock_eps, | ||
| ): | ||
| with pytest.raises(RuntimeError) as exc_info: | ||
| get_backend_class("broken_plugin") | ||
|
|
||
| assert "Plugin crashed" in str(exc_info.value) | ||
|
|
||
| def test_no_entry_point_for_name_raises_value_error(self): | ||
| """Mock entry point: no plugin for requested name -> ValueError.""" | ||
| mock_eps = MagicMock() | ||
| mock_eps.select.return_value = [] # No matching entry points | ||
|
|
||
| with patch( | ||
| "cuvs_bench.backends.registry.importlib.metadata.entry_points", | ||
| return_value=mock_eps, | ||
| ): | ||
| with pytest.raises(ValueError) as exc_info: | ||
| get_backend_class("nonexistent_mock_xyz") | ||
|
|
||
| msg = str(exc_info.value) | ||
| assert "nonexistent_mock_xyz" in msg | ||
| assert "cpp_gbench" in msg | ||
|
|
||
| def test_unknown_optional_backend_includes_install_hint(self): | ||
| """Missing optional backend names include install hints in the error.""" | ||
| mock_eps = MagicMock() | ||
| mock_eps.select.return_value = [] | ||
|
|
||
| with patch( | ||
| "cuvs_bench.backends.registry.importlib.metadata.entry_points", | ||
| return_value=mock_eps, | ||
| ): | ||
| with pytest.raises(ValueError) as exc_info: | ||
| get_backend_class("opensearch") | ||
|
|
||
| assert "pip install cuvs-bench[opensearch]" in str(exc_info.value) | ||
|
|
||
|
|
||
| def _elasticsearch_installed(): | ||
| try: | ||
| import elasticsearch # noqa: F401 | ||
|
|
||
| return True | ||
| except ImportError: | ||
| return False | ||
|
|
||
|
|
||
| @pytest.mark.skipif( | ||
| not _elasticsearch_installed(), | ||
| reason="Requires pip install cuvs-bench[elastic]", | ||
| ) | ||
| class TestElasticWithExtraInstalled: | ||
| """Tests that run only when [elastic] extra is installed.""" | ||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def _ensure_elastic_registered(self): | ||
| """Re-register elastic (may have been unregistered by other tests).""" | ||
| registry = get_registry() | ||
| if "elastic" not in registry._backends: | ||
| try: | ||
| from cuvs_bench.backends.elasticsearch import register | ||
|
|
||
| register() | ||
| except ImportError: | ||
| pass | ||
| yield | ||
|
|
||
| def test_elastic_plugin_loads(self): | ||
| """Elastic backend and config loader load when elasticsearch is installed.""" | ||
| assert get_backend_class("elastic") is not None | ||
| assert get_config_loader("elastic") is not None | ||
|
|
||
| def test_elastic_config_loader_tune_mode_returns_single_config(self): | ||
| """Tune mode produces one BenchmarkConfig with Optuna-suggested params.""" | ||
| loader_cls = get_config_loader("elastic") | ||
| loader = loader_cls() | ||
| dataset_config, benchmark_configs = loader.load( | ||
| dataset="glove-50-angular", | ||
| dataset_path="", | ||
| algorithms="elastic_hnsw", | ||
| _tune_mode=True, | ||
| _tune_build_params={"m": 24, "ef_construction": 150}, | ||
| _tune_search_params={"num_candidates": 120}, | ||
| ) | ||
| assert len(benchmark_configs) == 1 | ||
| config = benchmark_configs[0] | ||
| assert config.indexes[0].algo == "elastic_hnsw" | ||
| assert config.indexes[0].build_param["m"] == 24 | ||
| assert config.indexes[0].build_param["ef_construction"] == 150 | ||
| assert config.indexes[0].build_param["type"] == "hnsw" | ||
| assert config.indexes[0].search_params[0]["num_candidates"] == 120 | ||
| assert config.backend_config["name"].startswith("elastic_hnsw_tune") | ||
|
|
||
| def test_elastic_config_loader_sweep_mode_returns_multiple_configs(self): | ||
| """Sweep mode produces multiple BenchmarkConfigs from param cartesian product.""" | ||
| loader_cls = get_config_loader("elastic") | ||
| loader = loader_cls() | ||
| dataset_config, benchmark_configs = loader.load( | ||
| dataset="glove-50-angular", | ||
| dataset_path="", | ||
| algorithms="elastic_hnsw", | ||
| groups="test", | ||
| ) | ||
| assert len(benchmark_configs) >= 1 | ||
| config = benchmark_configs[0] | ||
| assert config.indexes[0].algo == "elastic_hnsw" | ||
| assert "m" in config.indexes[0].build_param | ||
| assert "num_candidates" in config.indexes[0].search_params[0] | ||
|
|
||
| def test_elastic_config_loader_dataset_not_found_raises(self): | ||
| """Config loader raises ValueError for unknown dataset.""" | ||
| loader_cls = get_config_loader("elastic") | ||
| loader = loader_cls() | ||
| with pytest.raises(ValueError, match="not found"): | ||
| loader.load(dataset="nonexistent_dataset_xyz", dataset_path="") | ||
|
|
||
| def test_elastic_config_loader_group_not_found_raises(self): | ||
| """Config loader raises ValueError for unknown algorithm group.""" | ||
| loader_cls = get_config_loader("elastic") | ||
| loader = loader_cls() | ||
| with pytest.raises(ValueError, match="find elastic groups"): | ||
| loader.load( | ||
| dataset="glove-50-angular", | ||
| dataset_path="", | ||
| algorithms="elastic_hnsw", | ||
| groups="nonexistent_group_xyz", | ||
| ) | ||
|
|
||
| def test_elastic_dry_run_build(self): | ||
| """ElasticBackend.build(dry_run=True) returns synthetic result without ES.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| base = np.random.rand(100, 32).astype(np.float32) | ||
| queries = np.random.rand(10, 32).astype(np.float32) | ||
| dataset = Dataset( | ||
| name="test", | ||
| training_vectors=base, | ||
| query_vectors=queries, | ||
| distance_metric="euclidean", | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="elastic_hnsw_test", | ||
| algo="elastic_hnsw", | ||
| build_param={"m": 16, "ef_construction": 100}, | ||
| search_params=[{"num_candidates": 100}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| result = backend.build(dataset=dataset, indexes=indexes, dry_run=True) | ||
|
|
||
| assert result.success | ||
| assert result.algorithm == "elastic_hnsw" | ||
| assert result.build_time_seconds == 0 | ||
|
|
||
| def test_elastic_dry_run_search(self): | ||
| """ElasticBackend.search(dry_run=True) returns synthetic result without ES.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| base = np.random.rand(100, 32).astype(np.float32) | ||
| queries = np.random.rand(10, 32).astype(np.float32) | ||
| dataset = Dataset( | ||
| name="test", | ||
| training_vectors=base, | ||
| query_vectors=queries, | ||
| distance_metric="euclidean", | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="elastic_hnsw_test", | ||
| algo="elastic_hnsw", | ||
| build_param={}, | ||
| search_params=[{"num_candidates": 100}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| result = backend.search( | ||
| dataset=dataset, indexes=indexes, k=10, dry_run=True | ||
| ) | ||
|
|
||
| assert result.success | ||
| assert result.algorithm == "elastic_hnsw" | ||
| assert result.search_time_ms == 0 | ||
|
|
||
| def test_elastic_build_requires_training_vectors(self): | ||
| """ElasticBackend.build returns error when no training vectors are available.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| queries = np.random.rand(10, 32).astype(np.float32) | ||
| dataset = Dataset( | ||
| name="test", | ||
| query_vectors=queries, | ||
| base_file=None, | ||
| query_file=None, | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="elastic_hnsw_test", | ||
| algo="elastic_hnsw", | ||
| build_param={}, | ||
| search_params=[{}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| with patch.object( | ||
| backend, "_check_network_available", return_value=True | ||
| ): | ||
| result = backend.build( | ||
| dataset=dataset, indexes=indexes, dry_run=False | ||
| ) | ||
|
|
||
| assert not result.success | ||
| assert "training_vectors" in (result.error_message or "") | ||
|
|
||
| def test_elastic_preflight_fails_when_no_network(self): | ||
| """ElasticBackend.build returns success=False when network is unavailable.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| base = np.random.rand(100, 32).astype(np.float32) | ||
| queries = np.random.rand(10, 32).astype(np.float32) | ||
| dataset = Dataset( | ||
| name="test", | ||
| training_vectors=base, | ||
| query_vectors=queries, | ||
| base_file="dummy/base.fbin", | ||
| query_file="dummy/query.fbin", | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="elastic_hnsw_test", | ||
| algo="elastic_hnsw", | ||
| build_param={}, | ||
| search_params=[{}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| with patch.object( | ||
| backend, "_check_network_available", return_value=False | ||
| ): | ||
| result = backend.build(dataset=dataset, indexes=indexes) | ||
|
|
||
| assert not result.success | ||
| assert "pre-flight" in (result.error_message or "").lower() | ||
|
|
||
| def test_elastic_search_preflight_fails_when_no_network(self): | ||
| """ElasticBackend.search returns success=False when network is unavailable.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| dataset = Dataset( | ||
| name="test", | ||
| training_vectors=np.random.rand(100, 32).astype(np.float32), | ||
| query_vectors=np.random.rand(10, 32).astype(np.float32), | ||
| query_file="dummy/query.fbin", | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="elastic_hnsw_test", | ||
| algo="elastic_hnsw", | ||
| build_param={}, | ||
| search_params=[{"num_candidates": 100}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| with patch.object( | ||
| backend, "_check_network_available", return_value=False | ||
| ): | ||
| result = backend.search(dataset=dataset, indexes=indexes, k=10) | ||
|
|
||
| assert not result.success | ||
| assert "pre-flight" in (result.error_message or "").lower() | ||
|
|
||
| def test_elastic_build_skips_existing_index_when_force_false(self): | ||
| """build(force=False) returns success=True immediately when index already exists.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={ | ||
| "name": "test", | ||
| "host": "localhost", | ||
| "port": 9200, | ||
| "index_name": "test_index", | ||
| } | ||
| ) | ||
|
|
||
| mock_client = MagicMock() | ||
| mock_client.indices.exists.return_value = True | ||
| mock_client.indices.stats.return_value = { | ||
| "_all": {"primaries": {"store": {"size_in_bytes": 1024}}} | ||
| } | ||
|
|
||
| dataset = Dataset( | ||
| name="test", | ||
| training_vectors=np.random.rand(100, 32).astype(np.float32), | ||
| query_vectors=np.random.rand(10, 32).astype(np.float32), | ||
| base_file="dummy/base.fbin", | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="test_index", | ||
| algo="elastic_hnsw", | ||
| build_param={ | ||
| "type": "hnsw", | ||
| "m": 16, | ||
| "ef_construction": 100, | ||
| "similarity": "l2_norm", | ||
| "number_of_shards": 1, | ||
| "number_of_replicas": 0, | ||
| "vector_field": "embedding", | ||
| }, | ||
| search_params=[{"num_candidates": 100}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| with patch.object( | ||
| backend, "_check_network_available", return_value=True | ||
| ): | ||
| with patch.object( | ||
| backend, "_get_client", return_value=mock_client | ||
| ): | ||
| result = backend.build( | ||
| dataset=dataset, indexes=indexes, force=False | ||
| ) | ||
|
|
||
| assert result.success | ||
| assert result.index_size_bytes == 1024 | ||
| mock_client.indices.delete.assert_not_called() | ||
|
|
||
| def test_elastic_build_uses_lazy_loaded_training_vectors(self): | ||
| """ElasticBackend.build works with Dataset lazy-loading via base_file.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| mock_client = MagicMock() | ||
| mock_client.indices.exists.return_value = True | ||
| mock_client.indices.stats.return_value = { | ||
| "_all": {"primaries": {"store": {"size_in_bytes": 2048}}} | ||
| } | ||
|
|
||
| dataset = Dataset( | ||
| name="test", | ||
| query_vectors=np.random.rand(10, 32).astype(np.float32), | ||
| base_file="dummy/base.fbin", | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="test_index", | ||
| algo="elastic_hnsw", | ||
| build_param={"m": 16, "ef_construction": 100}, | ||
| search_params=[{"num_candidates": 100}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| with patch.object( | ||
| backend, "_check_network_available", return_value=True | ||
| ): | ||
| with patch.object( | ||
| backend, "_get_client", return_value=mock_client | ||
| ): | ||
| with patch( | ||
| "cuvs_bench.backends.elasticsearch.load_vectors", | ||
| return_value=np.random.rand(100, 32).astype(np.float32), | ||
| ): | ||
| result = backend.build( | ||
| dataset=dataset, indexes=indexes, force=False | ||
| ) | ||
|
|
||
| assert result.success | ||
| assert result.index_size_bytes == 2048 | ||
|
|
||
| def test_elastic_search_uses_lazy_loaded_query_vectors(self): | ||
| """ElasticBackend.search works with Dataset lazy-loading via query_file.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| mock_client = MagicMock() | ||
| mock_client.search.return_value = { | ||
| "hits": {"hits": [{"_id": "0", "_score": 1.0}]} | ||
| } | ||
|
|
||
| dataset = Dataset( | ||
| name="test", | ||
| training_vectors=np.random.rand(100, 32).astype(np.float32), | ||
| query_file="dummy/query.fbin", | ||
| groundtruth_neighbors=np.array([[0]], dtype=np.int32), | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="elastic_hnsw_test", | ||
| algo="elastic_hnsw", | ||
| build_param={}, | ||
| search_params=[{"num_candidates": 100}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| with patch.object( | ||
| backend, "_check_network_available", return_value=True | ||
| ): | ||
| with patch.object( | ||
| backend, "_get_client", return_value=mock_client | ||
| ): | ||
| with patch( | ||
| "cuvs_bench.backends.elasticsearch.load_vectors", | ||
| return_value=np.random.rand(1, 32).astype(np.float32), | ||
| ): | ||
| result = backend.search( | ||
| dataset=dataset, indexes=indexes, k=1 | ||
| ) | ||
|
|
||
| assert result.success | ||
| assert result.neighbors.shape == (1, 1) | ||
|
|
||
| def test_elastic_algo_from_config(self): | ||
| """ElasticBackend.algo derives from config type (elastic_hnsw, elastic_int8_hnsw).""" | ||
| cls = get_backend_class("elastic") | ||
| backend_hnsw = cls(config={"name": "test", "type": "hnsw"}) | ||
| assert backend_hnsw.algo == "elastic_hnsw" | ||
|
|
||
| backend_int8 = cls(config={"name": "test", "type": "int8_hnsw"}) | ||
| assert backend_int8.algo == "elastic_int8_hnsw" | ||
|
|
||
| def test_elastic_build_rejects_unsupported_index_type(self): | ||
| """Elastic build fails early with a descriptive unsupported-index error.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| dataset = Dataset( | ||
| name="test", | ||
| training_vectors=np.random.rand(8, 4).astype(np.float32), | ||
| query_vectors=np.random.rand(2, 4).astype(np.float32), | ||
| distance_metric="euclidean", | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="elastic_scann_test", | ||
| algo="elastic_scann", | ||
| build_param={"type": "scann"}, | ||
| search_params=[{"num_candidates": 10}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| with patch.object( | ||
| backend, "_check_network_available", return_value=True | ||
| ): | ||
| result = backend.build(dataset=dataset, indexes=indexes) | ||
|
|
||
| assert not result.success | ||
| assert "unsupported Elasticsearch index type" in ( | ||
| result.error_message or "" | ||
| ) | ||
| assert "scann" in (result.error_message or "") | ||
|
|
||
| def test_elastic_build_rejects_unsupported_similarity(self): | ||
| """Elastic build fails early with a descriptive unsupported-similarity error.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
|
|
||
| dataset = Dataset( | ||
| name="test", | ||
| training_vectors=np.random.rand(8, 4).astype(np.float32), | ||
| query_vectors=np.random.rand(2, 4).astype(np.float32), | ||
| distance_metric="euclidean", | ||
| ) | ||
| indexes = [ | ||
| IndexConfig( | ||
| name="elastic_hnsw_test", | ||
| algo="elastic_hnsw", | ||
| build_param={"type": "hnsw", "similarity": "dot_product"}, | ||
| search_params=[{"num_candidates": 10}], | ||
| file="", | ||
| ) | ||
| ] | ||
|
|
||
| with patch.object( | ||
| backend, "_check_network_available", return_value=True | ||
| ): | ||
| result = backend.build(dataset=dataset, indexes=indexes) | ||
|
|
||
| assert not result.success | ||
| assert "unsupported Elasticsearch similarity" in ( | ||
| result.error_message or "" | ||
| ) | ||
| assert "dot_product" in (result.error_message or "") | ||
|
|
||
| def test_elastic_cleanup_closes_client(self): | ||
| """ElasticBackend.cleanup() closes client and sets _client to None.""" | ||
| cls = get_backend_class("elastic") | ||
| backend = cls( | ||
| config={"name": "test", "host": "localhost", "port": 9200} | ||
| ) | ||
| mock_client = MagicMock() | ||
| backend._client = mock_client | ||
|
|
||
| backend.cleanup() | ||
|
|
||
| mock_client.close.assert_called_once() | ||
| assert backend._client is None | ||
|
|
||
| def test_orchestrator_elastic_dry_run(self): | ||
| """Initializing the elastic orchestrator should support dry_run.""" | ||
| from cuvs_bench.orchestrator import BenchmarkOrchestrator | ||
|
|
||
| orch = BenchmarkOrchestrator(backend_type="elastic") | ||
| results = orch.run_benchmark( | ||
| dataset="glove-50-angular", | ||
| dataset_path="/nonexistent", | ||
| host="localhost", | ||
| port=9200, | ||
| algorithms="elastic_hnsw", | ||
| groups="test", | ||
| build=True, | ||
| search=True, | ||
| dry_run=True, | ||
| count=10, | ||
| batch_size=100, | ||
| ) | ||
| assert results is not None | ||
| assert len(results) >= 1 | ||
|
|
||
|
|
||
| @pytest.mark.skipif( | ||
| not _elasticsearch_installed(), | ||
| reason="Requires pip install cuvs-bench[elastic]", | ||
| ) | ||
| class TestElasticHelpers: | ||
| """Tests for elastic backend helper functions.""" | ||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def _ensure_elastic_registered(self): | ||
| """Re-register elastic (may have been unregistered by other tests).""" | ||
| registry = get_registry() | ||
| if "elastic" not in registry._backends: | ||
| try: | ||
| from cuvs_bench.backends.elasticsearch import register | ||
|
|
||
| register() | ||
| except ImportError: | ||
| pass | ||
| yield | ||
|
|
||
| def test_distance_to_similarity(self): | ||
| """_distance_to_similarity maps cuvs distance to ES similarity.""" | ||
| from cuvs_bench.backends.elasticsearch import _distance_to_similarity | ||
|
|
||
| assert _distance_to_similarity("euclidean") == "l2_norm" | ||
| assert _distance_to_similarity("inner_product") == "max_inner_product" | ||
| assert _distance_to_similarity("cosine") == "cosine" | ||
| assert _distance_to_similarity("unknown") == "l2_norm" | ||
|
|
||
| def test_validate_elastic_algorithm_rejects_unknown(self): | ||
| """Unknown elastic algorithm names fail with a descriptive error.""" | ||
| from cuvs_bench.backends.elasticsearch import ( | ||
| _validate_elastic_algorithm, | ||
| ) | ||
|
|
||
| with pytest.raises( | ||
| ValueError, match="unsupported algorithm" | ||
| ) as exc_info: | ||
| _validate_elastic_algorithm("elastic_scann") | ||
|
|
||
| assert "elastic_hnsw" in str(exc_info.value) | ||
|
|
||
| def test_validate_elastic_index_type_rejects_unknown(self): | ||
| """Unknown elastic index types fail with a descriptive error.""" | ||
| from cuvs_bench.backends.elasticsearch import ( | ||
| _validate_elastic_index_type, | ||
| ) | ||
|
|
||
| with pytest.raises( | ||
| ValueError, | ||
| match="unsupported Elasticsearch index type", | ||
| ) as exc_info: | ||
| _validate_elastic_index_type("scann") | ||
|
|
||
| assert "hnsw" in str(exc_info.value) | ||
|
|
||
| def test_validate_elastic_similarity_rejects_unknown(self): | ||
| """Unknown elastic similarities fail with a descriptive error.""" | ||
| from cuvs_bench.backends.elasticsearch import ( | ||
| _validate_elastic_similarity, | ||
| ) | ||
|
|
||
| with pytest.raises( | ||
| ValueError, | ||
| match="unsupported Elasticsearch similarity", | ||
| ) as exc_info: | ||
| _validate_elastic_similarity("dot_product") | ||
|
|
||
| assert "max_inner_product" in str(exc_info.value) | ||
|
|
||
| def test_load_fbin(self): | ||
| """_load_fbin loads big-ann-bench fbin format.""" | ||
| import tempfile | ||
| from pathlib import Path | ||
|
|
||
| from cuvs_bench.backends.elasticsearch import _load_fbin | ||
|
|
||
| data = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32) | ||
| with tempfile.NamedTemporaryFile(suffix=".fbin", delete=False) as f: | ||
| path = Path(f.name) | ||
| try: | ||
| with open(path, "wb") as f: | ||
| np.array([2, 2], dtype=np.uint32).tofile(f) | ||
| data.tofile(f) | ||
| loaded = _load_fbin(path) | ||
| np.testing.assert_array_equal(loaded, data) | ||
| finally: | ||
| path.unlink(missing_ok=True) | ||
|
|
||
| def test_load_ibin(self): | ||
| """_load_ibin loads big-ann-bench ibin format.""" | ||
| import tempfile | ||
| from pathlib import Path | ||
|
|
||
| from cuvs_bench.backends.elasticsearch import _load_ibin | ||
|
|
||
| data = np.array([[1, 2], [3, 4]], dtype=np.int32) | ||
| with tempfile.NamedTemporaryFile(suffix=".ibin", delete=False) as f: | ||
| path = Path(f.name) | ||
| try: | ||
| with open(path, "wb") as f: | ||
| np.array([2, 2], dtype=np.uint32).tofile(f) | ||
| data.tofile(f) | ||
| loaded = _load_ibin(path) | ||
| np.testing.assert_array_equal(loaded, data) | ||
| finally: | ||
| path.unlink(missing_ok=True) |
There was a problem hiding this comment.
HIGH: Test suite structure diverges from cuVS test contract (class-based tests instead of standalone functions).
Have you considered converting these class-based pytest suites into standalone test_* functions? This pattern can block consistency with cuVS test conventions and future test maintenance in this module.
As per coding guidelines, “Using test classes instead of standalone functions (cuVS prefers test_foo_bar() functions over class TestFoo)” should be treated as a HIGH test-quality issue.
🧰 Tools
🪛 Ruff (0.15.15)
[warning] 422-422: Unpacked variable dataset_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
[warning] 443-443: Unpacked variable dataset_config is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/cuvs_bench/cuvs_bench/tests/test_modularization.py` around lines 35 -
1002, The tests use class-based pytest suites (TestModularizationSmoke,
TestPluginLoaderMocked, TestElasticWithExtraInstalled, TestElasticHelpers) which
diverges from the project's preferred standalone test function style; refactor
each test method (e.g., test_cpp_gbench_available,
test_import_error_with_elasticsearch_message_raises_helpful_error,
test_elastic_config_loader_tune_mode_returns_single_config,
test_distance_to_similarity, etc.) into top-level test_* functions, move any
per-class fixtures like _ensure_elastic_registered into module-level fixtures or
helper functions, convert helper methods such as _make_mock_register into plain
module helpers, and ensure cleanup/patching logic (registry.unregister,
unregister_config_loader, patch context managers) is preserved in the new
functions so behavior remains identical.
Source: Coding guidelines
| return load_vectors(os.fspath(path)) | ||
|
|
||
|
|
||
| def _load_ibin(path: Path) -> np.ndarray: |
There was a problem hiding this comment.
One small cleanup: _load_fbin and _load_ibin are now thin wrappers around load_vectors and aren't called anywhere, so they can be removed.
| error_message=str(e), | ||
| ) | ||
|
|
||
| vectors = dataset.training_vectors |
There was a problem hiding this comment.
Vector loading looks good now and matches the centralized approach. build() reads dataset.training_vectors and search() reads dataset.query_vectors and dataset.groundtruth_neighbors, so the framework handles loading transparently and the backend no longer parses files itself. This also picks up the shared format support (uint64 headers, .f16bin/.u8bin/.i8bin, subset_size) along
| def backend_type(self) -> str: | ||
| return "elastic" | ||
|
|
||
| def load( |
There was a problem hiding this comment.
ElasticConfigLoader.load() can be removed. It only forwards to super().load(), so it adds nothing, the base ConfigLoader.load() already runs all the shared steps and calls into the backend-specific hooks _discover_algo_groups() and _build_benchmark_configs(), which is where the elastic logic lives. OpenSearch relies on this directly: it defines no load(), inherits the base one, and implements only those two hooks. Dropping it keeps elastic consistent with OpenSearch
| dataset=dataset, dataset_path=dataset_path, **kwargs | ||
| ) | ||
|
|
||
| def _discover_algo_groups( |
There was a problem hiding this comment.
Parameter expansion looks good. _discover_algo_groups() just returns the algo groups and _build_benchmark_configs() receives the already-expanded build_combos / search_combos from the base loader. The framework handles the expansion, same as OpenSearch.
| build_params[k] = v | ||
|
|
||
| try: | ||
| client = self._get_client() |
There was a problem hiding this comment.
Is the reason for connecting lazily through _get_client() inside build()/search(), rather than in initialize(), to avoid reaching a live server on a dry_run? The orchestrator always calls initialize(), even for a dry run, while build()/search() return early on dry_run before touching the client, so the lazy path seems to keep dry runs from needing a server.
| index_name = self.config.get("index_name", "cuvs_bench_vectors") | ||
| idx = indexes[0] if indexes else None | ||
| build_params = dict(idx.build_param or {}) if idx else {} | ||
| for k, v in self.config.items(): |
There was a problem hiding this comment.
This loop is redundant in the normal flow. build_params already copies idx.build_param (line 223), and the loader spreads that same dict into backend_config via **build_param (line 818), so every _BUILD_PARAM_KEY in self.config is already in build_params. The remaining entries in self.config are connection keys (host, port, …) that the loop's _BUILD_PARAM_KEYS filter skips. So it never adds anything and can be dropped, leaving idx.build_param as the single source.
The only case where the loop does something is the unusual one where indexes is empty (idx is None, so build_params starts as {}) — but that doesn't happen in the orchestrator path, since each config carries exactly one IndexConfig.
| # ── Convenience API ─────────────────────────────────────────────────────────── | ||
|
|
||
|
|
||
| def run_build( |
There was a problem hiding this comment.
The module-level run_build / run_search / run_benchmark helpers duplicate the entry point already provided by BenchmarkOrchestrator and the CLI, they just call orchestrator.run_benchmark(...) as below:
run_build(...)
# Python API
BenchmarkOrchestrator(backend_type="elastic").run_benchmark(
build=True, search=False,
dataset=..., dataset_path=..., algorithms=..., host=..., port=..., force=force,
)
# CLI
python -m cuvs_bench.run --build \
--dataset <name> --dataset-path <path> --algorithms <group> \
--backend-config <elastic-config>
run_search(...)
# Python API
BenchmarkOrchestrator(backend_type="elastic").run_benchmark(
build=False, search=True,
dataset=..., dataset_path=..., algorithms=..., host=..., port=...,
)
# CLI
python -m cuvs_bench.run --search \
--dataset <name> --dataset-path <path> --algorithms <group> \
--backend-config <elastic-config>
run_benchmark(...)
# Python API
BenchmarkOrchestrator(backend_type="elastic").run_benchmark(
build=build, search=search,
dataset=..., dataset_path=..., algorithms=..., host=..., port=..., force=force,
)
# CLI
python -m cuvs_bench.run --build --search \
--dataset <name> --dataset-path <path> --algorithms <group> \
--backend-config <elastic-config>
Introduce cuvs-bench-elastic as an optional plugin for cuvs-bench that provides an Elasticsearch backend. The backend communicates with Elasticsearch via HTTP and supports HNSW indexing with optional GPU acceleration when using the Elasticsearch GPU image (cuVS-accelerated vector search).