Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(
Creates a new OpenSearchDocumentStore instance.

The ``embeddings_dim``, ``method``, ``mappings``, and ``settings`` arguments are only used if the index does not
exists and needs to be created. If the index already exists, its current configurations will be used.
exist and needs to be created. If the index already exists, its current configurations will be used.

For more information on connection parameters, see the [official OpenSearch documentation](https://opensearch.org/docs/latest/clients/python-low-level/#connecting-to-opensearch)

Expand All @@ -107,7 +107,7 @@ def __init__(
:param settings: The settings of the index to be created. Please see the [official OpenSearch docs](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/#index-settings)
for more information. Defaults to {"index.knn": True}
:param create_index: Whether to create the index if it doesn't exist. Defaults to True
:param http_auth: http_auth param passed to the underying connection class.
:param http_auth: http_auth param passed to the underlying connection class.
For basic authentication with default connection class `Urllib3HttpConnection` this can be
- a tuple of (username, password)
- a list of [username, password]
Expand Down Expand Up @@ -319,7 +319,8 @@ async def count_documents_async(self) -> int:
assert self._async_client is not None
return (await self._async_client.count(index=self._index))["count"]

def _deserialize_search_hits(self, hits: List[Dict[str, Any]]) -> List[Document]:
@staticmethod
def _deserialize_search_hits(hits: List[Dict[str, Any]]) -> List[Document]:
out = []
for hit in hits:
data = hit["_source"]
Expand All @@ -344,12 +345,12 @@ def _prepare_filter_search_request(self, filters: Optional[Dict[str, Any]]) -> D
def _search_documents(self, request_body: Dict[str, Any]) -> List[Document]:
assert self._client is not None
search_results = self._client.search(index=self._index, body=request_body)
return self._deserialize_search_hits(search_results["hits"]["hits"])
return OpenSearchDocumentStore._deserialize_search_hits(search_results["hits"]["hits"])

async def _search_documents_async(self, request_body: Dict[str, Any]) -> List[Document]:
assert self._async_client is not None
search_results = await self._async_client.search(index=self._index, body=request_body)
return self._deserialize_search_hits(search_results["hits"]["hits"])
return OpenSearchDocumentStore._deserialize_search_hits(search_results["hits"]["hits"])

def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
"""
Expand Down Expand Up @@ -418,7 +419,8 @@ def _prepare_bulk_write_request(
"stats_only": False,
}

def _process_bulk_write_errors(self, errors: List[Dict[str, Any]], policy: DuplicatePolicy) -> None:
@staticmethod
def _process_bulk_write_errors(errors: List[Dict[str, Any]], policy: DuplicatePolicy) -> None:
if len(errors) == 0:
return

Expand Down Expand Up @@ -461,7 +463,7 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D

bulk_params = self._prepare_bulk_write_request(documents=documents, policy=policy, is_async=False)
documents_written, errors = bulk(**bulk_params)
self._process_bulk_write_errors(errors, policy)
OpenSearchDocumentStore._process_bulk_write_errors(errors, policy)
return documents_written

async def write_documents_async(
Expand All @@ -478,10 +480,11 @@ async def write_documents_async(
bulk_params = self._prepare_bulk_write_request(documents=documents, policy=policy, is_async=True)
documents_written, errors = await async_bulk(**bulk_params)
# since we call async_bulk with stats_only=False, errors is guaranteed to be a list (not int)
self._process_bulk_write_errors(errors=errors, policy=policy) # type: ignore[arg-type]
OpenSearchDocumentStore._process_bulk_write_errors(errors=errors, policy=policy) # type: ignore[arg-type]
return documents_written

def _deserialize_document(self, hit: Dict[str, Any]) -> Document:
@staticmethod
def _deserialize_document(hit: Dict[str, Any]) -> Document:
"""
Creates a Document from the search hit provided.
This is mostly useful in self.filter_documents().
Expand Down Expand Up @@ -525,6 +528,86 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:

await async_bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=True))

def _prepare_delete_all_request(self, *, is_async: bool) -> Dict[str, Any]:
return {
"index": self._index,
"body": {"query": {"match_all": {}}}, # Delete all documents
"wait_for_completion": False if is_async else True, # block until done (set False for async)
}

def delete_all_documents(self, recreate_index: bool = False) -> None: # noqa: FBT002, FBT001
"""
Deletes all documents in the document store.

:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
settings. If False, all documents will be deleted using the `delete_by_query` API.
"""
self._ensure_initialized()
assert self._client is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's consistent with the other methods to have this assert here. Therefore, I agree it's good to have it here.

However, I believe the only case it covers is if self._client was set at some point and then for some reason is set to None again. We have the same assert in _ensure_index_exists, which get's called as part of _ensure_initialized when it is run for the first time.


try:
if recreate_index:
# get the current index mappings and settings
index_name = self._index
body = {
"mappings": self._client.indices.get(self._index)[index_name]["mappings"],
"settings": self._client.indices.get(self._index)[index_name]["settings"],
}
body["settings"]["index"].pop("uuid", None)
body["settings"]["index"].pop("creation_date", None)
body["settings"]["index"].pop("provided_name", None)
body["settings"]["index"].pop("version", None)
self._client.indices.delete(index=self._index)
self._client.indices.create(index=self._index, body=body)
logger.info(
"The index '{index}' recreated with the original mappings and settings.",
index=self._index,
)

else:
result = self._client.delete_by_query(**self._prepare_delete_all_request(is_async=False))
logger.info(
"Deleted all the {n_docs} documents from the index '{index}'.",
index=self._index,
n_docs=result["deleted"],
)
except Exception as e:
msg = f"Failed to delete all documents from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

async def delete_all_documents_async(self, recreate_index: bool = False) -> None: # noqa: FBT002, FBT001
"""
Asynchronously deletes all documents in the document store.

:param recreate_index: If True, the index will be deleted and recreated with the original mappings and
settings. If False, all documents will be deleted using the `delete_by_query` API.
"""
self._ensure_initialized()
assert self._async_client is not None

try:
if recreate_index:
# get the current index mappings and settings
index_name = self._index
index_info = await self._async_client.indices.get(self._index)
body = {
"mappings": index_info[index_name]["mappings"],
"settings": index_info[index_name]["settings"],
}
body["settings"]["index"].pop("uuid", None)
body["settings"]["index"].pop("creation_date", None)
body["settings"]["index"].pop("provided_name", None)
body["settings"]["index"].pop("version", None)

await self._async_client.indices.delete(index=self._index)
await self._async_client.indices.create(index=self._index, body=body)
else:
await self._async_client.delete_by_query(**self._prepare_delete_all_request(is_async=True))

except Exception as e:
msg = f"Failed to delete all documents from OpenSearch: {e!s}"
raise DocumentStoreError(msg) from e

def _prepare_bm25_search_request(
self,
*,
Expand Down Expand Up @@ -580,7 +663,8 @@ def _prepare_bm25_search_request(

return body

def _postprocess_bm25_search_results(self, *, results: List[Document], scale_score: bool) -> None:
@staticmethod
def _postprocess_bm25_search_results(*, results: List[Document], scale_score: bool) -> None:
if not scale_score:
return

Expand Down Expand Up @@ -624,7 +708,7 @@ def _bm25_retrieval(
custom_query=custom_query,
)
documents = self._search_documents(search_params)
self._postprocess_bm25_search_results(results=documents, scale_score=scale_score)
OpenSearchDocumentStore._postprocess_bm25_search_results(results=documents, scale_score=scale_score)
return documents

async def _bm25_retrieval_async(
Expand Down Expand Up @@ -663,7 +747,7 @@ async def _bm25_retrieval_async(
custom_query=custom_query,
)
documents = await self._search_documents_async(search_params)
self._postprocess_bm25_search_results(results=documents, scale_score=scale_score)
OpenSearchDocumentStore._postprocess_bm25_search_results(results=documents, scale_score=scale_score)
return documents

def _prepare_embedding_search_request(
Expand Down
4 changes: 4 additions & 0 deletions integrations/opensearch/tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import Mock, patch

import pytest
Expand Down
1 change: 1 addition & 0 deletions integrations/opensearch/tests/test_bm25_retriever.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import Mock, patch

import pytest
Expand Down
57 changes: 56 additions & 1 deletion integrations/opensearch/tests/test_document_store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import random
import time
from typing import List
from unittest.mock import patch

Expand Down Expand Up @@ -453,7 +455,7 @@ def test_embedding_retrieval_but_dont_return_embeddings_for_bm25_retrieval(
assert len(results) == 2
assert results[0].embedding is None

def filter_documents_no_embedding_returned(
def test_filter_documents_no_embedding_returned(
self, document_store_embedding_dim_4_no_emb_returned: OpenSearchDocumentStore
):
docs = [
Expand All @@ -468,3 +470,56 @@ def filter_documents_no_embedding_returned(
assert results[0].embedding is None
assert results[1].embedding is None
assert results[2].embedding is None

def test_delete_all_documents_index_recreation(self, document_store: OpenSearchDocumentStore):
# populate the index with some documents
docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]
document_store.write_documents(docs)

# capture index structure before deletion
assert document_store._client is not None
index_info_before = document_store._client.indices.get(index=document_store._index)
mappings_before = index_info_before[document_store._index]["mappings"]
settings_before = index_info_before[document_store._index]["settings"]

# delete all documents
document_store.delete_all_documents(recreate_index=True)
assert document_store.count_documents() == 0

# verify index structure is preserved
index_info_after = document_store._client.indices.get(index=document_store._index)
mappings_after = index_info_after[document_store._index]["mappings"]
settings_after = index_info_after[document_store._index]["settings"]

assert mappings_after == mappings_before, "delete_all_documents should preserve index mappings"

settings_after["index"].pop("uuid", None)
settings_after["index"].pop("creation_date", None)
settings_before["index"].pop("uuid", None)
settings_before["index"].pop("creation_date", None)
assert settings_after == settings_before, "delete_all_documents should preserve index settings"

new_doc = Document(id="4", content="New document after delete all")
document_store.write_documents([new_doc])
assert document_store.count_documents() == 1

results = document_store.filter_documents()
assert len(results) == 1
assert results[0].content == "New document after delete all"

def test_delete_all_documents_no_index_recreation(self, document_store: OpenSearchDocumentStore):
docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]
document_store.write_documents(docs)
assert document_store.count_documents() == 2

document_store.delete_all_documents(recreate_index=False)
time.sleep(2) # need to wait for the deletion to be reflected in count_documents
assert document_store.count_documents() == 0

new_doc = Document(id="3", content="New document after delete all")
document_store.write_documents([new_doc])
assert document_store.count_documents() == 1

results = document_store.filter_documents()
assert len(results) == 1
assert results[0].content == "New document after delete all"
61 changes: 61 additions & 0 deletions integrations/opensearch/tests/test_document_store_async.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import time
from typing import List

import pytest
Expand Down Expand Up @@ -241,3 +246,59 @@ async def test_delete_documents(self, document_store: OpenSearchDocumentStore):

await document_store.delete_documents_async([doc.id])
assert await document_store.count_documents_async() == 0

@pytest.mark.asyncio
async def test_delete_all_documents_index_recreation(self, document_store: OpenSearchDocumentStore):
# populate the index with some documents
docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]
await document_store.write_documents_async(docs)

# capture index structure before deletion
assert document_store._client is not None
index_info_before = document_store._client.indices.get(index=document_store._index)
mappings_before = index_info_before[document_store._index]["mappings"]
settings_before = index_info_before[document_store._index]["settings"]

# delete all documents
await document_store.delete_all_documents_async(recreate_index=True)
assert await document_store.count_documents_async() == 0

# verify index structure is preserved
index_info_after = document_store._client.indices.get(index=document_store._index)
mappings_after = index_info_after[document_store._index]["mappings"]
settings_after = index_info_after[document_store._index]["settings"]

assert mappings_after == mappings_before, "delete_all_documents should preserve index mappings"

settings_after["index"].pop("uuid", None)
settings_after["index"].pop("creation_date", None)
settings_before["index"].pop("uuid", None)
settings_before["index"].pop("creation_date", None)
assert settings_after == settings_before, "delete_all_documents should preserve index settings"

new_doc = Document(id="4", content="New document after delete all")
await document_store.write_documents_async([new_doc])
assert await document_store.count_documents_async() == 1

results = await document_store.filter_documents_async()
assert len(results) == 1
assert results[0].content == "New document after delete all"

@pytest.mark.asyncio
async def test_delete_all_documents_no_index_recreation(self, document_store: OpenSearchDocumentStore):
docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]
await document_store.write_documents_async(docs)
assert await document_store.count_documents_async() == 2

await document_store.delete_all_documents_async(recreate_index=False)
# need to wait for the deletion to be reflected in count_documents
time.sleep(2)
assert await document_store.count_documents_async() == 0

new_doc = Document(id="3", content="New document after delete all")
await document_store.write_documents_async([new_doc])
assert await document_store.count_documents_async() == 1

results = await document_store.filter_documents_async()
assert len(results) == 1
assert results[0].content == "New document after delete all"
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import Mock, patch

import pytest
Expand Down
1 change: 1 addition & 0 deletions integrations/opensearch/tests/test_filters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from typing import List

import pytest
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from copy import deepcopy
from typing import Any, Dict
from unittest.mock import Mock
Expand Down