diff --git a/pyproject.toml b/pyproject.toml index 7b332c2..dbf911d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "upstash-vector" -version = "0.5.0" +version = "0.6.0" description = "Serverless Vector SDK from Upstash" license = "MIT" authors = ["Upstash "] diff --git a/tests/core/test_resumable_query.py b/tests/core/test_resumable_query.py index eee274e..bd791a1 100644 --- a/tests/core/test_resumable_query.py +++ b/tests/core/test_resumable_query.py @@ -1,8 +1,11 @@ +import asyncio +import time + import pytest -from upstash_vector import Index, AsyncIndex -from upstash_vector.errors import ClientError + from tests import assert_eventually_async, assert_eventually, NAMESPACES -import time +from upstash_vector import Index, AsyncIndex +from upstash_vector.errors import UpstashError @pytest.mark.parametrize("ns", NAMESPACES) @@ -16,31 +19,31 @@ def test_resumable_query(index: Index, ns: str): namespace=ns, ) - time.sleep(1) + def assertion(): + result, handle = index.resumable_query( + vector=[0.1, 0.2], + top_k=2, + include_metadata=True, + include_vectors=True, + namespace=ns, + ) - query = index.resumable_query( - vector=[0.1, 0.2], - top_k=2, - include_metadata=True, - include_vectors=True, - namespace=ns, - ) + assert isinstance(result, list) - initial_results = query.start() - assert isinstance(initial_results, list) + assert len(result) > 0 + assert result[0].metadata is not None + assert result[0].vector is not None - assert len(initial_results) > 0 - assert hasattr(initial_results[0], "id") - assert hasattr(initial_results[0], "metadata") + handle.stop() - stop_result = query.stop() - assert stop_result == "Success" + with pytest.raises(UpstashError): + handle.fetch_next(1) - with pytest.raises(ClientError): - query.fetch_next(1) + with pytest.raises(UpstashError): + handle.stop() - with pytest.raises(ClientError): - query.stop() + time.sleep(1) + assert_eventually(assertion) @pytest.mark.parametrize("ns", NAMESPACES) @@ -53,21 +56,21 @@ def test_resumable_query_with_data(embedding_index: Index, ns: str): namespace=ns, ) - time.sleep(1) + def assertion(): + result, handle = embedding_index.resumable_query( + data="Hello", + top_k=1, + include_metadata=True, + namespace=ns, + ) - query = embedding_index.resumable_query( - data="Hello", - top_k=1, - include_metadata=True, - namespace=ns, - ) + assert len(result) == 1 + assert result[0].id == "id1" - results = query.start() - assert len(results) == 1 - assert results[0].id == "id1" + handle.stop() - stop_result = query.stop() - assert stop_result == "Success" + time.sleep(1) + assert_eventually(assertion) @pytest.mark.asyncio @@ -83,7 +86,7 @@ async def test_resumable_query_async(async_index: AsyncIndex, ns: str): ) async def assertion(): - query = await async_index.resumable_query( + result, handle = await async_index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, @@ -91,25 +94,24 @@ async def assertion(): namespace=ns, ) - initial_results = await query.async_start() - assert isinstance(initial_results, list) - assert len(initial_results) > 0 - assert "id" in initial_results[0] - assert "metadata" in initial_results[0] + assert isinstance(result, list) + assert len(result) > 0 + assert result[0].metadata is not None + assert result[0].vector is not None - next_results = await query.async_fetch_next(1) + next_results = await handle.fetch_next(1) assert isinstance(next_results, list) assert len(next_results) == 1 - stop_result = await query.async_stop() - assert stop_result == "Success" + await handle.stop() - with pytest.raises(ClientError): - await query.async_fetch_next(1) + with pytest.raises(UpstashError): + await handle.fetch_next(1) - with pytest.raises(ClientError): - await query.async_stop() + with pytest.raises(UpstashError): + await handle.stop() + await asyncio.sleep(1) await assert_eventually_async(assertion) @@ -127,20 +129,19 @@ async def test_resumable_query_with_data_async( ) async def assertion(): - query = await async_embedding_index.resumable_query( + result, handle = await async_embedding_index.resumable_query( data="Hello", top_k=1, include_metadata=True, namespace=ns, ) - results = await query.async_start() - assert len(results) == 1 - assert results[0]["id"] == "id1" + assert len(result) == 1 + assert result[0].id == "id1" - stop_result = await query.async_stop() - assert stop_result == "Success" + await handle.stop() + await asyncio.sleep(1) await assert_eventually_async(assertion) @@ -158,35 +159,35 @@ def test_resumable_query_fetch_next(index: Index, ns: str): ) def assertion(): - query = index.resumable_query( + result, handle = index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, namespace=ns, ) - initial_results = query.start() - assert len(initial_results) == 2 - assert initial_results[0].id == "id1" - assert initial_results[1].id == "id2" + assert len(result) == 2 + assert result[0].id == "id1" + assert result[1].id == "id2" # Fetch next 2 results - next_results_1 = query.fetch_next(2) + next_results_1 = handle.fetch_next(2) assert len(next_results_1) == 2 assert next_results_1[0].id == "id3" assert next_results_1[1].id == "id4" # Fetch next 1 result - next_results_2 = query.fetch_next(1) + next_results_2 = handle.fetch_next(1) assert len(next_results_2) == 1 assert next_results_2[0].id == "id5" # Try to fetch more, should return empty list - next_results_3 = query.fetch_next(1) + next_results_3 = handle.fetch_next(1) assert len(next_results_3) == 0 - query.stop() + handle.stop() + time.sleep(1) assert_eventually(assertion) @@ -205,25 +206,24 @@ async def test_resumable_query_multiple_fetch_async(async_index: AsyncIndex, ns: ) async def assertion(): - query = await async_index.resumable_query( + result, handle = await async_index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, namespace=ns, ) - initial_results = await query.async_start() - assert len(initial_results) == 2 + assert len(result) == 2 - next_results_1 = await query.async_fetch_next(2) + next_results_1 = await handle.fetch_next(2) assert len(next_results_1) == 2 - next_results_2 = await query.async_fetch_next(1) + next_results_2 = await handle.fetch_next(1) assert len(next_results_2) == 1 - stop_result = await query.async_stop() - assert stop_result == "Success" + await handle.stop() + await asyncio.sleep(1) await assert_eventually_async(assertion) @@ -239,31 +239,33 @@ def test_resumable_query_context_manager(index: Index, ns: str): ) def assertion(): - with index.resumable_query( + result, handle = index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, include_vectors=True, namespace=ns, - ) as query: - initial_results = query.start() - assert isinstance(initial_results, list) - assert len(initial_results) > 0 - assert hasattr(initial_results[0], "id") - assert hasattr(initial_results[0], "metadata") - - next_results = query.fetch_next(1) + ) + + with handle: + assert isinstance(result, list) + assert len(result) > 0 + assert result[0].metadata is not None + assert result[0].vector is not None + + next_results = handle.fetch_next(1) assert isinstance(next_results, list) assert len(next_results) == 1 # The query should be stopped automatically after exiting the context - with pytest.raises(ClientError): - query.fetch_next(1) + with pytest.raises(UpstashError): + handle.fetch_next(1) - with pytest.raises(ClientError): - query.stop() + with pytest.raises(UpstashError): + handle.stop() + time.sleep(1) assert_eventually(assertion) @@ -280,102 +282,31 @@ async def test_resumable_query_async_context_manager(async_index: AsyncIndex, ns ) async def assertion(): - async with await async_index.resumable_query( + result, handle = await async_index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, include_vectors=True, namespace=ns, - ) as query: - initial_results = await query.async_start() - assert isinstance(initial_results, list) - assert len(initial_results) > 0 - assert "id" in initial_results[0] - assert "metadata" in initial_results[0] - - next_results = await query.async_fetch_next(1) + ) + + async with handle: + assert isinstance(result, list) + assert len(result) > 0 + assert result[0].metadata is not None + assert result[0].vector is not None + + next_results = await handle.fetch_next(1) assert isinstance(next_results, list) assert len(next_results) == 1 # The query should be stopped automatically after exiting the context - with pytest.raises(ClientError): - await query.async_fetch_next(1) + with pytest.raises(UpstashError): + await handle.fetch_next(1) - with pytest.raises(ClientError): - await query.async_stop() - - await assert_eventually_async(assertion) - - -@pytest.mark.parametrize("ns", NAMESPACES) -def test_resumable_query_context_manager_exception_handling(index: Index, ns: str): - index.upsert( - vectors=[ - ("id1", [0.1, 0.2], {"field": "value1"}), - ("id2", [0.3, 0.4], {"field": "value2"}), - ], - namespace=ns, - ) - - def assertion(): - try: - with index.resumable_query( - vector=[0.1, 0.2], - top_k=2, - include_metadata=True, - namespace=ns, - ) as query: - initial_results = query.start() - assert len(initial_results) == 2 - raise ValueError("Test exception") - except ValueError: - pass - - # The query should be stopped even if an exception occurred - - with pytest.raises(ClientError): - query.fetch_next(1) - - with pytest.raises(ClientError): - query.stop() - - assert_eventually(assertion) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("ns", NAMESPACES) -async def test_resumable_query_async_context_manager_exception_handling( - async_index: AsyncIndex, ns: str -): - await async_index.upsert( - vectors=[ - ("id1", [0.1, 0.2], {"field": "value1"}), - ("id2", [0.3, 0.4], {"field": "value2"}), - ], - namespace=ns, - ) - - async def assertion(): - try: - async with await async_index.resumable_query( - vector=[0.1, 0.2], - top_k=2, - include_metadata=True, - namespace=ns, - ) as query: - initial_results = await query.async_start() - assert len(initial_results) == 2 - raise ValueError("Test exception") - except ValueError: - pass - - # The query should be stopped even if an exception occurred - - with pytest.raises(ClientError): - await query.async_fetch_next(1) - - with pytest.raises(ClientError): - await query.async_stop() + with pytest.raises(UpstashError): + await handle.stop() + await asyncio.sleep(1) await assert_eventually_async(assertion) diff --git a/upstash_vector/__init__.py b/upstash_vector/__init__.py index a3d0ca2..48cd11f 100644 --- a/upstash_vector/__init__.py +++ b/upstash_vector/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.5.0" +__version__ = "0.6.0" from upstash_vector.client import Index, AsyncIndex from upstash_vector.types import Vector diff --git a/upstash_vector/core/index_operations.py b/upstash_vector/core/index_operations.py index c783154..ceeac15 100644 --- a/upstash_vector/core/index_operations.py +++ b/upstash_vector/core/index_operations.py @@ -1,7 +1,14 @@ -# Define vector operations here: -# Upsert and query functions and signatures - -from typing import Sequence, Union, List, Dict, Optional, Any +from typing import ( + Sequence, + Union, + List, + Dict, + Optional, + Any, + Tuple, + Callable, + Awaitable, +) from upstash_vector.errors import ClientError from upstash_vector.types import ( @@ -22,7 +29,6 @@ convert_to_vectors, convert_to_payload, ) -from upstash_vector.core.resumable_query import ResumableQuery DEFAULT_NAMESPACE = "" @@ -39,6 +45,10 @@ LIST_NAMESPACES_PATH = "/list-namespaces" DELETE_NAMESPACE_PATH = "/delete-namespace" UPDATE_PATH = "/update" +RESUMABLE_QUERY_PATH = "/resumable-query" +RESUMABLE_QUERY_DATA_PATH = "/resumable-query-data" +RESUMABLE_QUERY_NEXT_PATH = "/resumable-query-next" +RESUMABLE_QUERY_END_PATH = "/resumable-query-end" def _path_for(namespace: str, path: str) -> str: @@ -272,9 +282,11 @@ def resumable_query( namespace: str = DEFAULT_NAMESPACE, include_data: bool = False, max_idle: int = 3600, - ) -> ResumableQuery: + ) -> Tuple[List[QueryResult], "ResumableQueryHandle"]: """ - Creates a resumable query. After fetching the necessary results, it's strongly recommended to stop the query with the `.stop()` method. + Creates a resumable query. + After fetching the necessary results, it's recommended to stop the query + to release the acquired resources. :param vector: The vector value to query. :param top_k: How many vectors will be returned as the query result. @@ -286,25 +298,30 @@ def resumable_query( :param include_data: Whether the resulting vectors will have their unstructured data or not. :param max_idle: Maximum idle time for the resumable query in seconds. - :return: A ResumableQuery object. + :return: First batch of the results, along with a handle to fetch more or stop the query. Example usage: ```python - query = index.resumable_query( + result, handle = index.resumable_query( vector=[0.6, 0.9], top_k=100, include_vectors=False, include_metadata=True, - max_idle=7200 ) - result = query.start() - # Fetch results in batches - batch1 = query.fetch_next(10) - batch2 = query.fetch_next(20) - query.stop() + # Fetch the next batch of the results + result1 = handle.fetch_next(10) + result2 = handle.fetch_next(20) + handle.stop() ``` """ + if data is None and vector is None: + raise ClientError("either `data` or `vector` values must be given") + if data is not None and vector is not None: + raise ClientError( + "`data` and `vector` values cannot be given at the same time" + ) + payload = { "topK": top_k, "includeVectors": include_vectors, @@ -314,19 +331,19 @@ def resumable_query( "maxIdle": max_idle, } - if data is None and vector is None: - raise ClientError("either `data` or `vector` values must be given") - if data is not None and vector is not None: - raise ClientError( - "`data` and `vector` values cannot be given at the same time" - ) - if data is not None: payload["data"] = data + path = RESUMABLE_QUERY_DATA_PATH else: payload["vector"] = convert_to_list(vector) + path = RESUMABLE_QUERY_PATH + + result = self._execute_request(payload=payload, path=_path_for(namespace, path)) + + uid = result["uuid"] + scores = [QueryResult._from_json(obj) for obj in result["scores"]] - return ResumableQuery(payload, self, namespace) + return scores, ResumableQueryHandle(self._execute_request, uid) def delete( self, @@ -762,9 +779,11 @@ async def resumable_query( namespace: str = DEFAULT_NAMESPACE, include_data: bool = False, max_idle: int = 3600, - ) -> ResumableQuery: + ) -> Tuple[List[QueryResult], "AsyncResumableQueryHandle"]: """ - Creates a resumable query. After fetching the necessary results, it's strongly recommended to stop the query with the `.stop()` method. + Creates a resumable query. + After fetching the necessary results, it's recommended to stop the query + to release the acquired resources. :param vector: The vector value to query. :param top_k: How many vectors will be returned as the query result. @@ -776,25 +795,30 @@ async def resumable_query( :param include_data: Whether the resulting vectors will have their unstructured data or not. :param max_idle: Maximum idle time for the resumable query in seconds. - :return: A ResumableQuery object. + :return: First batch of the results, along with a handle to fetch more or stop the query. Example usage: ```python - query = await index.resumable_query( + result, handle = await index.resumable_query( vector=[0.6, 0.9], top_k=100, include_vectors=False, include_metadata=True, - max_idle=7200 ) - result = await query.start() - # Fetch results in batches - batch1 = await query.fetch_next(10) - batch2 = await query.fetch_next(20) - await query.stop() + # Fetch the next batch of results + result1 = await handle.fetch_next(10) + result2 = await handle.fetch_next(20) + await handle.stop() ``` """ + if data is None and vector is None: + raise ClientError("either `data` or `vector` values must be given") + if data is not None and vector is not None: + raise ClientError( + "`data` and `vector` values cannot be given at the same time" + ) + payload = { "topK": top_k, "includeVectors": include_vectors, @@ -804,19 +828,21 @@ async def resumable_query( "maxIdle": max_idle, } - if data is None and vector is None: - raise ClientError("either `data` or `vector` values must be given") - if data is not None and vector is not None: - raise ClientError( - "`data` and `vector` values cannot be given at the same time" - ) - if data is not None: payload["data"] = data + path = RESUMABLE_QUERY_DATA_PATH else: payload["vector"] = convert_to_list(vector) + path = RESUMABLE_QUERY_PATH - return ResumableQuery(payload, self, namespace) + result = await self._execute_request_async( + payload=payload, path=_path_for(namespace, path) + ) + + uid = result["uuid"] + scores = [QueryResult._from_json(obj) for obj in result["scores"]] + + return scores, AsyncResumableQueryHandle(self._execute_request_async, uid) async def delete( self, @@ -1032,3 +1058,85 @@ async def delete_namespace(self, namespace: str) -> None: await self._execute_request_async( payload=None, path=_path_for(namespace, DELETE_NAMESPACE_PATH) ) + + +class ResumableQueryHandle: + """ + A class representing a resumable query for vector search operations. + + This class allows for fetching next results, and stopping a resumable query. + """ + + def __init__(self, exec_fn: Callable[[Any, str], Any], uid: str): + self._exec_fn = exec_fn + self._uid = uid + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.stop() + + def fetch_next(self, additional_k: int) -> List[QueryResult]: + """ + Fetches the next batch of results. + + Args: + additional_k (int): The number of additional results to fetch. + + Returns: + List[QueryResult]: The next batch of query results. + """ + payload = {"uuid": self._uid, "additionalK": additional_k} + result = self._exec_fn(payload, RESUMABLE_QUERY_NEXT_PATH) + return [QueryResult._from_json(obj) for obj in result] + + def stop(self) -> None: + """ + Stops the resumable query. + """ + payload = {"uuid": self._uid} + self._exec_fn(payload, RESUMABLE_QUERY_END_PATH) + + +class AsyncResumableQueryHandle: + """ + A class representing a resumable query for vector search operations. + + This class allows for fetching next results, and stopping a resumable query. + """ + + def __init__( + self, + exec_fn: Callable[[Any, str], Awaitable[Any]], + uid: str, + ): + self._exec_fn = exec_fn + self._uid = uid + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.stop() + + async def fetch_next(self, additional_k: int) -> List[QueryResult]: + """ + Fetches the next batch of results. + + Args: + additional_k (int): The number of additional results to fetch. + + Returns: + List[QueryResult]: The next batch of query results. + """ + payload = {"uuid": self._uid, "additionalK": additional_k} + result = await self._exec_fn(payload, RESUMABLE_QUERY_NEXT_PATH) + return [QueryResult._from_json(obj) for obj in result] + + async def stop(self) -> None: + """ + Stops the resumable query. + """ + payload = {"uuid": self._uid} + await self._exec_fn(payload, RESUMABLE_QUERY_END_PATH) diff --git a/upstash_vector/core/resumable_query.py b/upstash_vector/core/resumable_query.py deleted file mode 100644 index 7906ef3..0000000 --- a/upstash_vector/core/resumable_query.py +++ /dev/null @@ -1,223 +0,0 @@ -from typing import List, Optional, Dict, Any -from upstash_vector.types import QueryResult -from upstash_vector.errors import ClientError - -RESUMABLE_QUERY_VECTOR_PATH = "/resumable-query" -RESUMABLE_QUERY_DATA_PATH = "/resumable-query-data" -RESUMABLE_QUERY_NEXT_PATH = "/resumable-query-next" -RESUMABLE_QUERY_END_PATH = "/resumable-query-end" - - -class ResumableQuery: - """ - A class representing a resumable query for vector search operations. - - This class allows for starting, fetching next results, and stopping a resumable query. - It supports both synchronous and asynchronous operations. - - Attributes: - payload (Dict[str, Any]): The query payload. - client: The client object for executing requests. - namespace (Optional[str]): The namespace for the query. - uuid (Optional[str]): The unique identifier for the resumable query session. - """ - - def __init__( - self, payload: Dict[str, Any], client, namespace: Optional[str] = None - ): - """ - Initialize a ResumableQuery instance. - - Args: - payload (Dict[str, Any]): The query payload. - client: The client object for executing requests. - namespace (Optional[str]): The namespace for the query. Defaults to None. - """ - self.payload = payload - self.client = client - self.namespace = namespace - self.uuid = None - - def __enter__(self): - """ - Start the resumable query asynchronously. - Enter the runtime context related to this object. - The with statement will bind this method's return value to the target(s) - specified in the as clause of the statement, if any. - Returns: - self: The ResumableQuery instance. - """ - return self - - def __exit__(self, exc_type, exc_value, traceback): - """ - Exit the runtime context related to this object. - The parameters describe the exception that caused the context to be exited. - If the context was exited without an exception, all three arguments will be None. - Args: - exc_type: The exception type if an exception was raised in the context, else None. - exc_value: The exception instance if an exception was raised in the context, else None. - traceback: The traceback if an exception was raised in the context, else None. - """ - self.stop() - - async def __aenter__(self): - """ - Enter the runtime context related to this object asynchronously. - Returns: - self: The ResumableQuery instance. - """ - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - """ - Exit the runtime context related to this object asynchronously. - Args: - exc_type: The exception type if an exception was raised in the context, else None. - exc_value: The exception instance if an exception was raised in the context, else None. - traceback: The traceback if an exception was raised in the context, else None. - """ - await self.async_stop() - - async def async_start(self) -> List[QueryResult]: - """ - Start the resumable query asynchronously. - - Returns: - List[QueryResult]: The initial query results. - - Raises: - ClientError: If the payload doesn't contain 'vector' or 'data' key. - """ - if "vector" in self.payload: - path = RESUMABLE_QUERY_VECTOR_PATH - elif "data" in self.payload: - path = RESUMABLE_QUERY_DATA_PATH - else: - raise ClientError("Payload must contain either 'vector' or 'data' key.") - - if self.namespace: - path = f"{path}/{self.namespace}" - - result = await self.client._execute_request_async( - payload=self.payload, path=path - ) - self.uuid = result["uuid"] - return result["scores"] - - def start(self) -> List[QueryResult]: - """ - Start the resumable query synchronously. - - Returns: - List[QueryResult]: The initial query results. - - Raises: - ClientError: If the payload doesn't contain 'vector' or 'data' key, - or if the resumable query couldn't be started. - """ - if "vector" in self.payload: - path = RESUMABLE_QUERY_VECTOR_PATH - elif "data" in self.payload: - path = RESUMABLE_QUERY_DATA_PATH - else: - raise ClientError("Payload must contain either 'vector' or 'data' key.") - - if self.namespace: - path = f"{path}/{self.namespace}" - - result = self.client._execute_request(payload=self.payload, path=path) - - self.uuid = result["uuid"] - if not self.uuid: - raise ClientError("Resumable query could not be started.") - - return [QueryResult._from_json(obj) for obj in result.get("scores", [])] - - def fetch_next(self, additional_k: int) -> List[QueryResult]: - """ - Fetch the next batch of results synchronously. - - Args: - additional_k (int): The number of additional results to fetch. - - Returns: - List[QueryResult]: The next batch of query results. - - Raises: - ClientError: If the resumable query hasn't been started. - """ - if self.uuid is None: - raise ClientError( - "Resumable query has not been started. Call start() first." - ) - payload = {"uuid": self.uuid, "additionalK": additional_k} - result = self.client._execute_request( - payload=payload, path=RESUMABLE_QUERY_NEXT_PATH - ) - return [QueryResult._from_json(obj) for obj in result] - - async def async_fetch_next(self, additional_k: int) -> List[QueryResult]: - """ - Fetch the next batch of results asynchronously. - - Args: - additional_k (int): The number of additional results to fetch. - - Returns: - List[QueryResult]: The next batch of query results. - - Raises: - ClientError: If the resumable query hasn't been started. - """ - if not self.uuid: - raise ClientError( - "Resumable query has not been started. Call start() first." - ) - payload = {"uuid": self.uuid, "additionalK": additional_k} - result = await self.client._execute_request_async( - payload=payload, path=RESUMABLE_QUERY_NEXT_PATH - ) - return [QueryResult._from_json(obj) for obj in result] - - def stop(self) -> str: - """ - Stop the resumable query synchronously. - - Returns: - str: The result of stopping the query. - - Raises: - ClientError: If the resumable query hasn't been started. - """ - if not self.uuid: - raise ClientError( - "Resumable query has not been started. Call start() first." - ) - payload = {"uuid": self.uuid} - result = self.client._execute_request( - payload=payload, path=RESUMABLE_QUERY_END_PATH - ) - self.uuid = None - return result - - async def async_stop(self) -> str: - """ - Stop the resumable query asynchronously. - - Returns: - str: The result of stopping the query. - - Raises: - ClientError: If the resumable query hasn't been started. - """ - if not self.uuid: - raise ClientError( - "Resumable query has not been started. Call start() first." - ) - payload = {"uuid": self.uuid} - result = await self.client._execute_request_async( - payload=payload, path=RESUMABLE_QUERY_END_PATH - ) - self.uuid = None - return result