From 084a95bb7d9374ae398bdcd356fe77b74f7677ed Mon Sep 17 00:00:00 2001 From: DosticJelena Date: Wed, 20 Sep 2023 09:07:35 +0200 Subject: [PATCH 1/3] add async_add implementation and async upsert to Pinecone vector store --- llama_index/indices/vector_store/base.py | 2 +- llama_index/vector_stores/pinecone.py | 36 ++++++++++++++++++------ tests/indices/vector_store/utils.py | 24 ++++++++-------- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/llama_index/indices/vector_store/base.py b/llama_index/indices/vector_store/base.py index f64c1c5cad..57c3e355c1 100644 --- a/llama_index/indices/vector_store/base.py +++ b/llama_index/indices/vector_store/base.py @@ -145,7 +145,7 @@ async def _async_add_nodes_to_index( return nodes = await self._aget_node_with_embedding(nodes, show_progress) - new_ids = self._vector_store.add(nodes) + new_ids = await self._vector_store.async_add(nodes) # if the vector store doesn't store text, we need to add the nodes to the # index struct and document store diff --git a/llama_index/vector_stores/pinecone.py b/llama_index/vector_stores/pinecone.py index c624b0c6b8..25266ae5bd 100644 --- a/llama_index/vector_stores/pinecone.py +++ b/llama_index/vector_stores/pinecone.py @@ -8,7 +8,8 @@ from collections import Counter from functools import partial from typing import Any, Callable, Dict, List, Optional, cast - +import asyncio +from llama_index.utils import iter_batch from llama_index.bridge.pydantic import PrivateAttr from llama_index.schema import BaseNode, MetadataMode, TextNode from llama_index.vector_stores.types import ( @@ -30,7 +31,7 @@ SPARSE_VECTOR_KEY = "sparse_values" METADATA_KEY = "metadata" -DEFAULT_BATCH_SIZE = 100 +DEFAULT_BATCH_SIZE = 200 _logger = logging.getLogger(__name__) @@ -170,7 +171,7 @@ def __init__( if tokenizer is None and add_sparse_vector: tokenizer = get_default_tokenizer() - self._tokenizer = tokenizer + self._tokenizer = tokenizer # type: ignore super().__init__( index_name=index_name, @@ -256,14 +257,31 @@ def add( ids.append(node_id) entries.append(entry) - self._pinecone_index.upsert( - entries, - namespace=self.namespace, - batch_size=self.batch_size, - **self.insert_kwargs, - ) + + [ + self._pinecone_index.upsert( + vectors=batch, + async_req=True, + ) + for batch in iter_batch(entries, self.batch_size) + ] + return ids + async def async_add( + self, + nodes: List[BaseNode], + ) -> List[str]: + """Asynchronously add a list of embedding results to the collection. + + Args: + nodes (List[BaseNode]): Embedding results to add. + + Returns: + List[str]: List of IDs of the added documents. + """ + return await asyncio.to_thread(self.add, nodes) # type: ignore + def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: """ Delete nodes using with ref_doc_id. diff --git a/tests/indices/vector_store/utils.py b/tests/indices/vector_store/utils.py index a8c5044d02..d15de7779d 100644 --- a/tests/indices/vector_store/utils.py +++ b/tests/indices/vector_store/utils.py @@ -12,19 +12,19 @@ class MockPineconeIndex: def __init__(self) -> None: """Mock pinecone index.""" - self._tuples: List[Dict[str, Any]] = [] + self._vectors: List[Dict[str, Any]] = [] - def upsert(self, tuples: List[Dict[str, Any]], **kwargs: Any) -> None: + def upsert(self, vectors: List[Dict[str, Any]], **kwargs: Any) -> None: """Mock upsert.""" - self._tuples.extend(tuples) + self._vectors.extend(vectors) def delete(self, ids: List[str]) -> None: """Mock delete.""" - new_tuples = [] - for tup in self._tuples: - if tup["id"] not in ids: - new_tuples.append(tup) - self._tuples = new_tuples + new_vectors = [] + for vec in self._vectors: + if vec["id"] not in ids: + new_vectors.append(vec) + self._vectors = new_vectors def query( self, @@ -38,7 +38,7 @@ def query( ) -> Any: """Mock query.""" # index_mat is n x k - index_mat = np.array([tup["values"] for tup in self._tuples]) + index_mat = np.array([tup["values"] for tup in self._vectors]) query_vec = np.array(vector)[np.newaxis, :] # compute distances @@ -49,10 +49,10 @@ def query( matches = [] for index in indices: - tup = self._tuples[index] + vec = self._vectors[index] match = MagicMock() - match.metadata = tup["metadata"] - match.id = tup["id"] + match.metadata = vec["metadata"] + match.id = vec["id"] matches.append(match) response = MagicMock() From fab147ba55d0eec90e61cf91db83bb634192d5a0 Mon Sep 17 00:00:00 2001 From: DosticJelena Date: Mon, 16 Oct 2023 18:03:59 +0200 Subject: [PATCH 2/3] update async_add method to follow pinecone-client example --- llama_index/vector_stores/pinecone.py | 56 ++++++++++++++++++--------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/llama_index/vector_stores/pinecone.py b/llama_index/vector_stores/pinecone.py index 25266ae5bd..98c580206c 100644 --- a/llama_index/vector_stores/pinecone.py +++ b/llama_index/vector_stores/pinecone.py @@ -33,6 +33,8 @@ DEFAULT_BATCH_SIZE = 200 +SEM_MAX_CONCURRENT = 10 + _logger = logging.getLogger(__name__) @@ -98,6 +100,14 @@ def _to_pinecone_filter(standard_filters: MetadataFilters) -> dict: return filters +async def async_upload(index, vectors, batch_size, semaphore): + async def send_batch(batch): + async with semaphore: + return await asyncio.to_thread(index.upsert, batch, async_req=True) + + await asyncio.gather(*[send_batch(chunk) for chunk in iter_batch(vectors, size=batch_size)]) + + import_err_msg = ( "`pinecone` package not found, please run `pip install pinecone-client`" ) @@ -224,39 +234,42 @@ def from_params( def class_name(cls) -> str: return "PinconeVectorStore" - def add( - self, - nodes: List[BaseNode], - ) -> List[str]: - """Add nodes to index. - - Args: - nodes: List[BaseNode]: list of nodes with embeddings - - """ - ids = [] + def _prepare_entries_for_upsert(self, nodes: List[BaseNode]) -> List[Dict]: entries = [] for node in nodes: - node_id = node.node_id - metadata = node_to_metadata_dict( node, remove_text=False, flat_metadata=self.flat_metadata ) entry = { - ID_KEY: node_id, + ID_KEY: node.node_id, VECTOR_KEY: node.get_embedding(), METADATA_KEY: metadata, } - if self.add_sparse_vector and self._tokenizer is not None: + + if self.add_sparse_vector: sparse_vector = generate_sparse_vectors( [node.get_content(metadata_mode=MetadataMode.EMBED)], self._tokenizer, )[0] entry[SPARSE_VECTOR_KEY] = sparse_vector - ids.append(node_id) entries.append(entry) + + return entries + + def add( + self, + nodes: List[BaseNode], + ) -> List[str]: + """Add nodes to index. + + Args: + nodes: List[BaseNode]: list of nodes with embeddings + + """ + + entries = self._prepare_entries_for_upsert(nodes) [ self._pinecone_index.upsert( @@ -266,7 +279,7 @@ def add( for batch in iter_batch(entries, self.batch_size) ] - return ids + return [entry[ID_KEY] for entry in entries] async def async_add( self, @@ -280,7 +293,14 @@ async def async_add( Returns: List[str]: List of IDs of the added documents. """ - return await asyncio.to_thread(self.add, nodes) # type: ignore + + entries = self._prepare_entries_for_upsert(nodes) + + semaphore = asyncio.Semaphore(SEM_MAX_CONCURRENT) + await async_upload(self._pinecone_index, entries, DEFAULT_BATCH_SIZE, semaphore) + + return [entry[ID_KEY] for entry in entries] + def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: """ From 7aab5f31a7623297bcff2d8d8a33f0579851165c Mon Sep 17 00:00:00 2001 From: DosticJelena Date: Tue, 17 Oct 2023 10:31:35 +0200 Subject: [PATCH 3/3] fix linter issues --- llama_index/vector_stores/pinecone.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/llama_index/vector_stores/pinecone.py b/llama_index/vector_stores/pinecone.py index 98c580206c..7918bc3abd 100644 --- a/llama_index/vector_stores/pinecone.py +++ b/llama_index/vector_stores/pinecone.py @@ -4,14 +4,15 @@ """ +import asyncio import logging from collections import Counter from functools import partial from typing import Any, Callable, Dict, List, Optional, cast -import asyncio -from llama_index.utils import iter_batch + from llama_index.bridge.pydantic import PrivateAttr from llama_index.schema import BaseNode, MetadataMode, TextNode +from llama_index.utils import iter_batch from llama_index.vector_stores.types import ( BasePydanticVectorStore, MetadataFilters, @@ -100,12 +101,16 @@ def _to_pinecone_filter(standard_filters: MetadataFilters) -> dict: return filters -async def async_upload(index, vectors, batch_size, semaphore): - async def send_batch(batch): +async def async_upload( + index: Any, vectors: List[Dict], batch_size: int, semaphore: asyncio.Semaphore +) -> None: + async def send_batch(batch: List[Dict]): # type: ignore async with semaphore: return await asyncio.to_thread(index.upsert, batch, async_req=True) - - await asyncio.gather(*[send_batch(chunk) for chunk in iter_batch(vectors, size=batch_size)]) + + await asyncio.gather( + *[send_batch(chunk) for chunk in iter_batch(vectors, size=batch_size)] + ) import_err_msg = ( @@ -250,12 +255,12 @@ def _prepare_entries_for_upsert(self, nodes: List[BaseNode]) -> List[Dict]: if self.add_sparse_vector: sparse_vector = generate_sparse_vectors( [node.get_content(metadata_mode=MetadataMode.EMBED)], - self._tokenizer, + self._tokenizer, # type: ignore )[0] entry[SPARSE_VECTOR_KEY] = sparse_vector entries.append(entry) - + return entries def add( @@ -268,7 +273,6 @@ def add( nodes: List[BaseNode]: list of nodes with embeddings """ - entries = self._prepare_entries_for_upsert(nodes) [ @@ -293,7 +297,6 @@ async def async_add( Returns: List[str]: List of IDs of the added documents. """ - entries = self._prepare_entries_for_upsert(nodes) semaphore = asyncio.Semaphore(SEM_MAX_CONCURRENT) @@ -301,7 +304,6 @@ async def async_add( return [entry[ID_KEY] for entry in entries] - def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: """ Delete nodes using with ref_doc_id.