From 6800ea8ca1b9cc90bb82cf1bbe7b0ac8ee450d63 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Thu, 25 Sep 2025 14:17:37 +0200 Subject: [PATCH 1/8] converting several methods to static --- .../opensearch/document_store.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 284d981bb0..f358506f3b 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -87,7 +87,7 @@ def __init__( Creates a new OpenSearchDocumentStore instance. The ``embeddings_dim``, ``method``, ``mappings``, and ``settings`` arguments are only used if the index does not - exists and needs to be created. If the index already exists, its current configurations will be used. + exist and needs to be created. If the index already exists, its current configurations will be used. For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch) @@ -107,7 +107,7 @@ def __init__( :param settings: The settings of the index to be created. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#index-settings) for more information. Defaults to {"index.knn": True} :param create_index: Whether to create the index if it doesn't exist. Defaults to True - :param http_auth: http_auth param passed to the underying connection class. + :param http_auth: http_auth param passed to the underlying connection class. For basic authentication with default connection class `Urllib3HttpConnection` this can be - a tuple of (username, password) - a list of [username, password] @@ -319,7 +319,8 @@ async def count_documents_async(self) -> int: assert self._async_client is not None return (await self._async_client.count(index=self._index))["count"] - def _deserialize_search_hits(self, hits: List[Dict[str, Any]]) -> List[Document]: + @staticmethod + def _deserialize_search_hits(hits: List[Dict[str, Any]]) -> List[Document]: out = [] for hit in hits: data = hit["_source"] @@ -344,12 +345,12 @@ def _prepare_filter_search_request(self, filters: Optional[Dict[str, Any]]) -> D def _search_documents(self, request_body: Dict[str, Any]) -> List[Document]: assert self._client is not None search_results = self._client.search(index=self._index, body=request_body) - return self._deserialize_search_hits(search_results["hits"]["hits"]) + return OpenSearchDocumentStore._deserialize_search_hits(search_results["hits"]["hits"]) async def _search_documents_async(self, request_body: Dict[str, Any]) -> List[Document]: assert self._async_client is not None search_results = await self._async_client.search(index=self._index, body=request_body) - return self._deserialize_search_hits(search_results["hits"]["hits"]) + return OpenSearchDocumentStore._deserialize_search_hits(search_results["hits"]["hits"]) def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]: """ @@ -418,7 +419,8 @@ def _prepare_bulk_write_request( "stats_only": False, } - def _process_bulk_write_errors(self, errors: List[Dict[str, Any]], policy: DuplicatePolicy) -> None: + @staticmethod + def _process_bulk_write_errors(errors: List[Dict[str, Any]], policy: DuplicatePolicy) -> None: if len(errors) == 0: return @@ -461,7 +463,7 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D bulk_params = self._prepare_bulk_write_request(documents=documents, policy=policy, is_async=False) documents_written, errors = bulk(**bulk_params) - self._process_bulk_write_errors(errors, policy) + OpenSearchDocumentStore._process_bulk_write_errors(errors, policy) return documents_written async def write_documents_async( @@ -478,10 +480,11 @@ async def write_documents_async( bulk_params = self._prepare_bulk_write_request(documents=documents, policy=policy, is_async=True) documents_written, errors = await async_bulk(**bulk_params) # since we call async_bulk with stats_only=False, errors is guaranteed to be a list (not int) - self._process_bulk_write_errors(errors=errors, policy=policy) # type: ignore[arg-type] + OpenSearchDocumentStore._process_bulk_write_errors(errors=errors, policy=policy) # type: ignore[arg-type] return documents_written - def _deserialize_document(self, hit: Dict[str, Any]) -> Document: + @staticmethod + def _deserialize_document(hit: Dict[str, Any]) -> Document: """ Creates a Document from the search hit provided. This is mostly useful in self.filter_documents(). From 42b7a23596b78331ca873d949cc1cebf5a739fff Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Thu, 25 Sep 2025 16:05:53 +0200 Subject: [PATCH 2/8] adding delete_all_documents() for sync and async + tests --- .../opensearch/document_store.py | 36 +++++++++++++++++++ integrations/opensearch/tests/test_auth.py | 4 +++ .../opensearch/tests/test_bm25_retriever.py | 1 + .../opensearch/tests/test_document_store.py | 25 +++++++++++++ .../tests/test_document_store_async.py | 29 +++++++++++++++ .../tests/test_embedding_retriever.py | 1 + integrations/opensearch/tests/test_filters.py | 1 + .../test_open_search_hybrid_retriever.py | 4 +++ 8 files changed, 101 insertions(+) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index f358506f3b..15342a8290 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -528,6 +528,42 @@ async def delete_documents_async(self, document_ids: List[str]) -> None: await async_bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=True)) + def delete_all_documents(self) -> None: + """ + Deletes all documents in the document store by deleting and recreating the index. + + A fast way to clear all documents from the document store while preserving any index settings and mappings. + """ + self._ensure_initialized() + assert self._client is not None + + # Delete index + self._client.indices.delete(index=self._index) + + # Recreate with mappings and settings + body = {"mappings": self._mappings, "settings": self._settings} + self._client.indices.create(index=self._index, body=body) + + async def delete_all_documents_async(self) -> None: + """ + Asynchronously deletes all documents in the document store by deleting and recreating the index. + + A fast way to clear all documents from the document store while preserving any index settings and mappings. + """ + self._ensure_initialized() + assert self._async_client is not None + + try: + # Delete index + await self._async_client.indices.delete(index=self._index) + + # Recreate with mappings and settings + body = {"mappings": self._mappings, "settings": self._settings} + await self._async_client.indices.create(index=self._index, body=body) + except Exception as e: + msg = f"Failed to delete all documents from OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e + def _prepare_bm25_search_request( self, *, diff --git a/integrations/opensearch/tests/test_auth.py b/integrations/opensearch/tests/test_auth.py index 12581c4e8f..d0354303c2 100644 --- a/integrations/opensearch/tests/test_auth.py +++ b/integrations/opensearch/tests/test_auth.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + from unittest.mock import Mock, patch import pytest diff --git a/integrations/opensearch/tests/test_bm25_retriever.py b/integrations/opensearch/tests/test_bm25_retriever.py index 82a1ce6575..06b69e5a22 100644 --- a/integrations/opensearch/tests/test_bm25_retriever.py +++ b/integrations/opensearch/tests/test_bm25_retriever.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + from unittest.mock import Mock, patch import pytest diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index 617d5d6478..55e25f6159 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + import random from typing import List from unittest.mock import patch @@ -468,3 +469,27 @@ def filter_documents_no_embedding_returned( assert results[0].embedding is None assert results[1].embedding is None assert results[2].embedding is None + + def test_delete_all_documents(self, document_store: OpenSearchDocumentStore): + """Test delete_all_documents removes all documents and preserves index structure""" + docs = [ + Document(id="1", content="First document", meta={"category": "test"}), + Document(id="2", content="Second document", meta={"category": "test"}), + Document(id="3", content="Third document", meta={"category": "other"}), + ] + document_store.write_documents(docs) + assert document_store.count_documents() == 3 + + # delete all documents + document_store.delete_all_documents() + assert document_store.count_documents() == 0 + + # verify index still exists and can accept new documents and retrieve + new_doc = Document(id="4", content="New document after delete all") + document_store.write_documents([new_doc]) + assert document_store.count_documents() == 1 + + results = document_store.filter_documents() + assert len(results) == 1 + assert results[0].id == "4" + assert results[0].content == "New document after delete all" diff --git a/integrations/opensearch/tests/test_document_store_async.py b/integrations/opensearch/tests/test_document_store_async.py index 7565dadeba..e60121aa4c 100644 --- a/integrations/opensearch/tests/test_document_store_async.py +++ b/integrations/opensearch/tests/test_document_store_async.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + from typing import List import pytest @@ -241,3 +245,28 @@ async def test_delete_documents(self, document_store: OpenSearchDocumentStore): await document_store.delete_documents_async([doc.id]) assert await document_store.count_documents_async() == 0 + + @pytest.mark.asyncio + async def test_delete_all_documents_async(self, document_store): + """Test delete_all_documents_async removes all documents and preserves index structure""" + docs = [ + Document(id="1", content="First document", meta={"category": "test"}), + Document(id="2", content="Second document", meta={"category": "test"}), + Document(id="3", content="Third document", meta={"category": "other"}), + ] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 3 + + # delete all documents + await document_store.delete_all_documents_async() + assert await document_store.count_documents_async() == 0 + + # verify index still exists and can accept new documents and retrieve + new_doc = Document(id="4", content="New document after delete all") + await document_store.write_documents_async([new_doc]) + assert await document_store.count_documents_async() == 1 + + results = await document_store.filter_documents_async() + assert len(results) == 1 + assert results[0].id == "4" + assert results[0].content == "New document after delete all" diff --git a/integrations/opensearch/tests/test_embedding_retriever.py b/integrations/opensearch/tests/test_embedding_retriever.py index 43516310e2..24009cf897 100644 --- a/integrations/opensearch/tests/test_embedding_retriever.py +++ b/integrations/opensearch/tests/test_embedding_retriever.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + from unittest.mock import Mock, patch import pytest diff --git a/integrations/opensearch/tests/test_filters.py b/integrations/opensearch/tests/test_filters.py index d61c2297a8..e35a0dabcc 100644 --- a/integrations/opensearch/tests/test_filters.py +++ b/integrations/opensearch/tests/test_filters.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + from typing import List import pytest diff --git a/integrations/opensearch/tests/test_open_search_hybrid_retriever.py b/integrations/opensearch/tests/test_open_search_hybrid_retriever.py index 3e72e9276d..554df783fe 100644 --- a/integrations/opensearch/tests/test_open_search_hybrid_retriever.py +++ b/integrations/opensearch/tests/test_open_search_hybrid_retriever.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + from copy import deepcopy from typing import Any, Dict from unittest.mock import Mock From 3cf92a15f8bf16795d7e7170e2fecf73b3d46c37 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Mon, 29 Sep 2025 16:41:29 +0200 Subject: [PATCH 3/8] updating tests --- .../opensearch/document_store.py | 65 ++++++++++++++----- .../opensearch/tests/test_document_store.py | 50 ++++++++++---- 2 files changed, 88 insertions(+), 27 deletions(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 15342a8290..e9eafa2fd9 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -528,38 +528,71 @@ async def delete_documents_async(self, document_ids: List[str]) -> None: await async_bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=True)) - def delete_all_documents(self) -> None: + def _prepare_delete_all_request(self, *, is_async: bool) -> Dict[str, Any]: + return { + "index": self._index, + "body": {"query": {"match_all": {}}}, # Delete all documents + "wait_for_completion": False if is_async else True, # block until done (set False for async) + } + + def delete_all_documents(self, recreate_index: bool = False) -> None: # noqa: FBT002, FBT001 """ - Deletes all documents in the document store by deleting and recreating the index. + Deletes all documents in the document store. - A fast way to clear all documents from the document store while preserving any index settings and mappings. + :param recreate_index: If True, the index will be deleted and recreated with the original mappings and + settings. If False, all documents will be deleted using the `delete_by_query` API. """ self._ensure_initialized() assert self._client is not None - # Delete index - self._client.indices.delete(index=self._index) + if recreate_index: + # get the current index mappings and settings + index_name = self._index + body = { + "mappings": self._client.indices.get(self._index)[index_name]["mappings"], + "settings": self._client.indices.get(self._index)[index_name]["settings"], + } + body["settings"]["index"].pop("uuid", None) + body["settings"]["index"].pop("creation_date", None) + body["settings"]["index"].pop("provided_name", None) + body["settings"]["index"].pop("version", None) + self._client.indices.delete(index=self._index) + self._client.indices.create(index=self._index, body=body) + logger.debug( + "The index '{index}' recreated with the original mappings and settings.", + index=self._index, + ) - # Recreate with mappings and settings - body = {"mappings": self._mappings, "settings": self._settings} - self._client.indices.create(index=self._index, body=body) + else: + result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) + logger.debug( + "Deleted all the {n_docs} documents from the index '{index}'.", + index=self._index, + n_docs=result["deleted"], + ) - async def delete_all_documents_async(self) -> None: + async def delete_all_documents_async(self, recreate_index: bool = False) -> None: # noqa: FBT002, FBT001 """ - Asynchronously deletes all documents in the document store by deleting and recreating the index. + Asynchronously deletes all documents in the document store. - A fast way to clear all documents from the document store while preserving any index settings and mappings. + :param recreate_index: If True, the index will be deleted and recreated with the original mappings and + settings. If False, all documents will be deleted using the `delete_by_query` API. """ self._ensure_initialized() assert self._async_client is not None try: - # Delete index - await self._async_client.indices.delete(index=self._index) + if recreate_index: + # delete index + await self._async_client.indices.delete(index=self._index) + # recreate with mappings and settings + body = {"mappings": self._mappings, "settings": self._settings} + await self._async_client.indices.create(index=self._index, body=body) + else: + await self._async_client.indices.close(index=self._index) # close the index + await self._async_client.delete_by_query(**self._prepare_delete_all_request(is_async=True)) + await self._async_client.indices.open(index=self._index) # reopen - # Recreate with mappings and settings - body = {"mappings": self._mappings, "settings": self._settings} - await self._async_client.indices.create(index=self._index, body=body) except Exception as e: msg = f"Failed to delete all documents from OpenSearch: {e!s}" raise DocumentStoreError(msg) from e diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index 55e25f6159..00a24af4ab 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -470,26 +470,54 @@ def filter_documents_no_embedding_returned( assert results[1].embedding is None assert results[2].embedding is None - def test_delete_all_documents(self, document_store: OpenSearchDocumentStore): - """Test delete_all_documents removes all documents and preserves index structure""" - docs = [ - Document(id="1", content="First document", meta={"category": "test"}), - Document(id="2", content="Second document", meta={"category": "test"}), - Document(id="3", content="Third document", meta={"category": "other"}), - ] + def test_delete_all_documents_index_recreation(self, document_store: OpenSearchDocumentStore): + # populate the index with some documents + docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] document_store.write_documents(docs) - assert document_store.count_documents() == 3 + + # capture index structure before deletion + assert document_store._client is not None + index_info_before = document_store._client.indices.get(index=document_store._index) + mappings_before = index_info_before[document_store._index]["mappings"] + settings_before = index_info_before[document_store._index]["settings"] # delete all documents - document_store.delete_all_documents() + document_store.delete_all_documents(recreate_index=True) assert document_store.count_documents() == 0 - # verify index still exists and can accept new documents and retrieve + # verify index structure is preserved + index_info_after = document_store._client.indices.get(index=document_store._index) + mappings_after = index_info_after[document_store._index]["mappings"] + settings_after = index_info_after[document_store._index]["settings"] + + assert mappings_after == mappings_before, "delete_all_documents should preserve index mappings" + + settings_after["index"].pop("uuid", None) + settings_after["index"].pop("creation_date", None) + settings_before["index"].pop("uuid", None) + settings_before["index"].pop("creation_date", None) + assert settings_after == settings_before, "delete_all_documents should preserve index settings" + new_doc = Document(id="4", content="New document after delete all") document_store.write_documents([new_doc]) assert document_store.count_documents() == 1 results = document_store.filter_documents() assert len(results) == 1 - assert results[0].id == "4" + assert results[0].content == "New document after delete all" + + def test_delete_all_documents(self, document_store: OpenSearchDocumentStore): + docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] + document_store.write_documents(docs) + assert document_store.count_documents() == 2 + + document_store.delete_all_documents(recreate_index=False) + assert document_store.count_documents() == 0 + + new_doc = Document(id="3", content="New document after delete all") + document_store.write_documents([new_doc]) + assert document_store.count_documents() == 1 + + results = document_store.filter_documents() + assert len(results) == 1 assert results[0].content == "New document after delete all" From b8880454d1614c361a1e3b8ef3177a56790266b8 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Mon, 29 Sep 2025 17:27:33 +0200 Subject: [PATCH 4/8] updating tests --- .../opensearch/document_store.py | 26 ++++++--- .../opensearch/tests/test_document_store.py | 6 ++- .../tests/test_document_store_async.py | 54 +++++++++++++++---- 3 files changed, 65 insertions(+), 21 deletions(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index e9eafa2fd9..acfa6e2d9a 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -558,14 +558,14 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: # noqa: F body["settings"]["index"].pop("version", None) self._client.indices.delete(index=self._index) self._client.indices.create(index=self._index, body=body) - logger.debug( + logger.info( "The index '{index}' recreated with the original mappings and settings.", index=self._index, ) else: result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) - logger.debug( + logger.info( "Deleted all the {n_docs} documents from the index '{index}'.", index=self._index, n_docs=result["deleted"], @@ -583,15 +583,24 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None try: if recreate_index: + + # get the current index mappings and settings + index_name = self._index + body = { + "mappings": self._client.indices.get(self._index)[index_name]["mappings"], + "settings": self._client.indices.get(self._index)[index_name]["settings"], + } + body["settings"]["index"].pop("uuid", None) + body["settings"]["index"].pop("creation_date", None) + body["settings"]["index"].pop("provided_name", None) + body["settings"]["index"].pop("version", None) + # delete index await self._async_client.indices.delete(index=self._index) # recreate with mappings and settings - body = {"mappings": self._mappings, "settings": self._settings} await self._async_client.indices.create(index=self._index, body=body) else: - await self._async_client.indices.close(index=self._index) # close the index await self._async_client.delete_by_query(**self._prepare_delete_all_request(is_async=True)) - await self._async_client.indices.open(index=self._index) # reopen except Exception as e: msg = f"Failed to delete all documents from OpenSearch: {e!s}" @@ -652,7 +661,8 @@ def _prepare_bm25_search_request( return body - def _postprocess_bm25_search_results(self, *, results: List[Document], scale_score: bool) -> None: + @staticmethod + def _postprocess_bm25_search_results(*, results: List[Document], scale_score: bool) -> None: if not scale_score: return @@ -696,7 +706,7 @@ def _bm25_retrieval( custom_query=custom_query, ) documents = self._search_documents(search_params) - self._postprocess_bm25_search_results(results=documents, scale_score=scale_score) + OpenSearchDocumentStore._postprocess_bm25_search_results(results=documents, scale_score=scale_score) return documents async def _bm25_retrieval_async( @@ -735,7 +745,7 @@ async def _bm25_retrieval_async( custom_query=custom_query, ) documents = await self._search_documents_async(search_params) - self._postprocess_bm25_search_results(results=documents, scale_score=scale_score) + OpenSearchDocumentStore._postprocess_bm25_search_results(results=documents, scale_score=scale_score) return documents def _prepare_embedding_search_request( diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index 00a24af4ab..72c587922b 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 import random +import time from typing import List from unittest.mock import patch @@ -454,7 +455,7 @@ def test_embedding_retrieval_but_dont_return_embeddings_for_bm25_retrieval( assert len(results) == 2 assert results[0].embedding is None - def filter_documents_no_embedding_returned( + def test_filter_documents_no_embedding_returned( self, document_store_embedding_dim_4_no_emb_returned: OpenSearchDocumentStore ): docs = [ @@ -506,12 +507,13 @@ def test_delete_all_documents_index_recreation(self, document_store: OpenSearchD assert len(results) == 1 assert results[0].content == "New document after delete all" - def test_delete_all_documents(self, document_store: OpenSearchDocumentStore): + def test_delete_all_documents_no_index_recreation(self, document_store: OpenSearchDocumentStore): docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] document_store.write_documents(docs) assert document_store.count_documents() == 2 document_store.delete_all_documents(recreate_index=False) + time.sleep(2) # need to wait for the deletion to be reflected in count_documents assert document_store.count_documents() == 0 new_doc = Document(id="3", content="New document after delete all") diff --git a/integrations/opensearch/tests/test_document_store_async.py b/integrations/opensearch/tests/test_document_store_async.py index e60121aa4c..65b1555f86 100644 --- a/integrations/opensearch/tests/test_document_store_async.py +++ b/integrations/opensearch/tests/test_document_store_async.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 +import time from typing import List import pytest @@ -247,26 +248,57 @@ async def test_delete_documents(self, document_store: OpenSearchDocumentStore): assert await document_store.count_documents_async() == 0 @pytest.mark.asyncio - async def test_delete_all_documents_async(self, document_store): - """Test delete_all_documents_async removes all documents and preserves index structure""" - docs = [ - Document(id="1", content="First document", meta={"category": "test"}), - Document(id="2", content="Second document", meta={"category": "test"}), - Document(id="3", content="Third document", meta={"category": "other"}), - ] + async def test_delete_all_documents_index_recreation(self, document_store: OpenSearchDocumentStore): + # populate the index with some documents + docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] await document_store.write_documents_async(docs) - assert await document_store.count_documents_async() == 3 + + # capture index structure before deletion + assert document_store._client is not None + index_info_before = document_store._client.indices.get(index=document_store._index) + mappings_before = index_info_before[document_store._index]["mappings"] + settings_before = index_info_before[document_store._index]["settings"] # delete all documents - await document_store.delete_all_documents_async() + await document_store.delete_all_documents_async(recreate_index=True) assert await document_store.count_documents_async() == 0 - # verify index still exists and can accept new documents and retrieve + # verify index structure is preserved + index_info_after = document_store._client.indices.get(index=document_store._index) + mappings_after = index_info_after[document_store._index]["mappings"] + settings_after = index_info_after[document_store._index]["settings"] + + assert mappings_after == mappings_before, "delete_all_documents should preserve index mappings" + + settings_after["index"].pop("uuid", None) + settings_after["index"].pop("creation_date", None) + settings_before["index"].pop("uuid", None) + settings_before["index"].pop("creation_date", None) + assert settings_after == settings_before, "delete_all_documents should preserve index settings" + new_doc = Document(id="4", content="New document after delete all") await document_store.write_documents_async([new_doc]) assert await document_store.count_documents_async() == 1 results = await document_store.filter_documents_async() assert len(results) == 1 - assert results[0].id == "4" + assert results[0].content == "New document after delete all" + + @pytest.mark.asyncio + async def test_delete_all_documents_no_index_recreation(self, document_store: OpenSearchDocumentStore): + docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] + await document_store.write_documents_async(docs) + assert await document_store.count_documents_async() == 2 + + await document_store.delete_all_documents_async(recreate_index=False) + # need to wait for the deletion to be reflected in count_documents + time.sleep(2) + assert await document_store.count_documents_async() == 0 + + new_doc = Document(id="3", content="New document after delete all") + await document_store.write_documents_async([new_doc]) + assert await document_store.count_documents_async() == 1 + + results = await document_store.filter_documents_async() + assert len(results) == 1 assert results[0].content == "New document after delete all" From ea1baa1e378c6d50a49fe5d55304f4fea7db17ed Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Mon, 29 Sep 2025 17:28:21 +0200 Subject: [PATCH 5/8] formatting --- .../document_stores/opensearch/document_store.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index acfa6e2d9a..62f9b9180c 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -583,7 +583,6 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None try: if recreate_index: - # get the current index mappings and settings index_name = self._index body = { From 3fd42239fe0ddd41576c0573e9e1fa6237377b0b Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Mon, 29 Sep 2025 17:29:57 +0200 Subject: [PATCH 6/8] adding try/catch to both sync and async --- .../opensearch/document_store.py | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 62f9b9180c..b8c522e452 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -545,31 +545,35 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: # noqa: F self._ensure_initialized() assert self._client is not None - if recreate_index: - # get the current index mappings and settings - index_name = self._index - body = { - "mappings": self._client.indices.get(self._index)[index_name]["mappings"], - "settings": self._client.indices.get(self._index)[index_name]["settings"], - } - body["settings"]["index"].pop("uuid", None) - body["settings"]["index"].pop("creation_date", None) - body["settings"]["index"].pop("provided_name", None) - body["settings"]["index"].pop("version", None) - self._client.indices.delete(index=self._index) - self._client.indices.create(index=self._index, body=body) - logger.info( - "The index '{index}' recreated with the original mappings and settings.", - index=self._index, - ) + try: + if recreate_index: + # get the current index mappings and settings + index_name = self._index + body = { + "mappings": self._client.indices.get(self._index)[index_name]["mappings"], + "settings": self._client.indices.get(self._index)[index_name]["settings"], + } + body["settings"]["index"].pop("uuid", None) + body["settings"]["index"].pop("creation_date", None) + body["settings"]["index"].pop("provided_name", None) + body["settings"]["index"].pop("version", None) + self._client.indices.delete(index=self._index) + self._client.indices.create(index=self._index, body=body) + logger.info( + "The index '{index}' recreated with the original mappings and settings.", + index=self._index, + ) - else: - result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) - logger.info( - "Deleted all the {n_docs} documents from the index '{index}'.", - index=self._index, - n_docs=result["deleted"], - ) + else: + result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False)) + logger.info( + "Deleted all the {n_docs} documents from the index '{index}'.", + index=self._index, + n_docs=result["deleted"], + ) + except Exception as e: + msg = f"Failed to delete all documents from OpenSearch: {e!s}" + raise DocumentStoreError(msg) from e async def delete_all_documents_async(self, recreate_index: bool = False) -> None: # noqa: FBT002, FBT001 """ @@ -594,9 +598,7 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None body["settings"]["index"].pop("provided_name", None) body["settings"]["index"].pop("version", None) - # delete index await self._async_client.indices.delete(index=self._index) - # recreate with mappings and settings await self._async_client.indices.create(index=self._index, body=body) else: await self._async_client.delete_by_query(**self._prepare_delete_all_request(is_async=True)) From 4dae896efbcdb04b9f3f90ed083257d15b1cb899 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Mon, 29 Sep 2025 17:32:59 +0200 Subject: [PATCH 7/8] using correct self._async_client in sync version --- .../document_stores/opensearch/document_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index b8c522e452..44bd9609da 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -590,8 +590,8 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None # get the current index mappings and settings index_name = self._index body = { - "mappings": self._client.indices.get(self._index)[index_name]["mappings"], - "settings": self._client.indices.get(self._index)[index_name]["settings"], + "mappings": self._async_client.indices.get(self._index)[index_name]["mappings"], + "settings": self._async_client.indices.get(self._index)[index_name]["settings"], } body["settings"]["index"].pop("uuid", None) body["settings"]["index"].pop("creation_date", None) From 159c076f8fc4ed9c723857754c30767044056cb5 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Mon, 29 Sep 2025 17:46:20 +0200 Subject: [PATCH 8/8] fixing async version failure --- .../document_stores/opensearch/document_store.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 44bd9609da..c488641348 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -589,9 +589,10 @@ async def delete_all_documents_async(self, recreate_index: bool = False) -> None if recreate_index: # get the current index mappings and settings index_name = self._index + index_info = await self._async_client.indices.get(self._index) body = { - "mappings": self._async_client.indices.get(self._index)[index_name]["mappings"], - "settings": self._async_client.indices.get(self._index)[index_name]["settings"], + "mappings": index_info[index_name]["mappings"], + "settings": index_info[index_name]["settings"], } body["settings"]["index"].pop("uuid", None) body["settings"]["index"].pop("creation_date", None)