From 0800e7c342f775b01d618b0b6feacd38571d1241 Mon Sep 17 00:00:00 2001 From: Metin Dumandag <29387993+mdumandag@users.noreply.github.com> Date: Fri, 27 Sep 2024 15:00:54 +0300 Subject: [PATCH] Change the resumable query API (#32) There were couple of problems with it: - We don't use the async prefix in async method names in the public API - We don't return the result for APIs that returns Success string only So, I have changed the implementation to fix these problems. Also, I think it is much better to return the first batch of the results directly to the user, along with an handle to fetch more or stop. This is more similar to API we have in JS SDK, makes the possible misuses impossible (you can't call stop before the start anymore etc.), and makes more sense for the async version. For the async version, we previously marked the function async but did not call any async functions, which was weird. ALso, I have changed the tests a bit to make them more resillient to failures due to potentially delayed indexing, which might happen from time to time. --- pyproject.toml | 2 +- tests/core/test_resumable_query.py | 267 +++++++++--------------- upstash_vector/__init__.py | 2 +- upstash_vector/core/index_operations.py | 190 +++++++++++++---- upstash_vector/core/resumable_query.py | 223 -------------------- 5 files changed, 250 insertions(+), 434 deletions(-) delete mode 100644 upstash_vector/core/resumable_query.py diff --git a/pyproject.toml b/pyproject.toml index 7b332c2..dbf911d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "upstash-vector" -version = "0.5.0" +version = "0.6.0" description = "Serverless Vector SDK from Upstash" license = "MIT" authors = ["Upstash "] diff --git a/tests/core/test_resumable_query.py b/tests/core/test_resumable_query.py index eee274e..bd791a1 100644 --- a/tests/core/test_resumable_query.py +++ b/tests/core/test_resumable_query.py @@ -1,8 +1,11 @@ +import asyncio +import time + import pytest -from upstash_vector import Index, AsyncIndex -from upstash_vector.errors import ClientError + from tests import assert_eventually_async, assert_eventually, NAMESPACES -import time +from upstash_vector import Index, AsyncIndex +from upstash_vector.errors import UpstashError @pytest.mark.parametrize("ns", NAMESPACES) @@ -16,31 +19,31 @@ def test_resumable_query(index: Index, ns: str): namespace=ns, ) - time.sleep(1) + def assertion(): + result, handle = index.resumable_query( + vector=[0.1, 0.2], + top_k=2, + include_metadata=True, + include_vectors=True, + namespace=ns, + ) - query = index.resumable_query( - vector=[0.1, 0.2], - top_k=2, - include_metadata=True, - include_vectors=True, - namespace=ns, - ) + assert isinstance(result, list) - initial_results = query.start() - assert isinstance(initial_results, list) + assert len(result) > 0 + assert result[0].metadata is not None + assert result[0].vector is not None - assert len(initial_results) > 0 - assert hasattr(initial_results[0], "id") - assert hasattr(initial_results[0], "metadata") + handle.stop() - stop_result = query.stop() - assert stop_result == "Success" + with pytest.raises(UpstashError): + handle.fetch_next(1) - with pytest.raises(ClientError): - query.fetch_next(1) + with pytest.raises(UpstashError): + handle.stop() - with pytest.raises(ClientError): - query.stop() + time.sleep(1) + assert_eventually(assertion) @pytest.mark.parametrize("ns", NAMESPACES) @@ -53,21 +56,21 @@ def test_resumable_query_with_data(embedding_index: Index, ns: str): namespace=ns, ) - time.sleep(1) + def assertion(): + result, handle = embedding_index.resumable_query( + data="Hello", + top_k=1, + include_metadata=True, + namespace=ns, + ) - query = embedding_index.resumable_query( - data="Hello", - top_k=1, - include_metadata=True, - namespace=ns, - ) + assert len(result) == 1 + assert result[0].id == "id1" - results = query.start() - assert len(results) == 1 - assert results[0].id == "id1" + handle.stop() - stop_result = query.stop() - assert stop_result == "Success" + time.sleep(1) + assert_eventually(assertion) @pytest.mark.asyncio @@ -83,7 +86,7 @@ async def test_resumable_query_async(async_index: AsyncIndex, ns: str): ) async def assertion(): - query = await async_index.resumable_query( + result, handle = await async_index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, @@ -91,25 +94,24 @@ async def assertion(): namespace=ns, ) - initial_results = await query.async_start() - assert isinstance(initial_results, list) - assert len(initial_results) > 0 - assert "id" in initial_results[0] - assert "metadata" in initial_results[0] + assert isinstance(result, list) + assert len(result) > 0 + assert result[0].metadata is not None + assert result[0].vector is not None - next_results = await query.async_fetch_next(1) + next_results = await handle.fetch_next(1) assert isinstance(next_results, list) assert len(next_results) == 1 - stop_result = await query.async_stop() - assert stop_result == "Success" + await handle.stop() - with pytest.raises(ClientError): - await query.async_fetch_next(1) + with pytest.raises(UpstashError): + await handle.fetch_next(1) - with pytest.raises(ClientError): - await query.async_stop() + with pytest.raises(UpstashError): + await handle.stop() + await asyncio.sleep(1) await assert_eventually_async(assertion) @@ -127,20 +129,19 @@ async def test_resumable_query_with_data_async( ) async def assertion(): - query = await async_embedding_index.resumable_query( + result, handle = await async_embedding_index.resumable_query( data="Hello", top_k=1, include_metadata=True, namespace=ns, ) - results = await query.async_start() - assert len(results) == 1 - assert results[0]["id"] == "id1" + assert len(result) == 1 + assert result[0].id == "id1" - stop_result = await query.async_stop() - assert stop_result == "Success" + await handle.stop() + await asyncio.sleep(1) await assert_eventually_async(assertion) @@ -158,35 +159,35 @@ def test_resumable_query_fetch_next(index: Index, ns: str): ) def assertion(): - query = index.resumable_query( + result, handle = index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, namespace=ns, ) - initial_results = query.start() - assert len(initial_results) == 2 - assert initial_results[0].id == "id1" - assert initial_results[1].id == "id2" + assert len(result) == 2 + assert result[0].id == "id1" + assert result[1].id == "id2" # Fetch next 2 results - next_results_1 = query.fetch_next(2) + next_results_1 = handle.fetch_next(2) assert len(next_results_1) == 2 assert next_results_1[0].id == "id3" assert next_results_1[1].id == "id4" # Fetch next 1 result - next_results_2 = query.fetch_next(1) + next_results_2 = handle.fetch_next(1) assert len(next_results_2) == 1 assert next_results_2[0].id == "id5" # Try to fetch more, should return empty list - next_results_3 = query.fetch_next(1) + next_results_3 = handle.fetch_next(1) assert len(next_results_3) == 0 - query.stop() + handle.stop() + time.sleep(1) assert_eventually(assertion) @@ -205,25 +206,24 @@ async def test_resumable_query_multiple_fetch_async(async_index: AsyncIndex, ns: ) async def assertion(): - query = await async_index.resumable_query( + result, handle = await async_index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, namespace=ns, ) - initial_results = await query.async_start() - assert len(initial_results) == 2 + assert len(result) == 2 - next_results_1 = await query.async_fetch_next(2) + next_results_1 = await handle.fetch_next(2) assert len(next_results_1) == 2 - next_results_2 = await query.async_fetch_next(1) + next_results_2 = await handle.fetch_next(1) assert len(next_results_2) == 1 - stop_result = await query.async_stop() - assert stop_result == "Success" + await handle.stop() + await asyncio.sleep(1) await assert_eventually_async(assertion) @@ -239,31 +239,33 @@ def test_resumable_query_context_manager(index: Index, ns: str): ) def assertion(): - with index.resumable_query( + result, handle = index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, include_vectors=True, namespace=ns, - ) as query: - initial_results = query.start() - assert isinstance(initial_results, list) - assert len(initial_results) > 0 - assert hasattr(initial_results[0], "id") - assert hasattr(initial_results[0], "metadata") - - next_results = query.fetch_next(1) + ) + + with handle: + assert isinstance(result, list) + assert len(result) > 0 + assert result[0].metadata is not None + assert result[0].vector is not None + + next_results = handle.fetch_next(1) assert isinstance(next_results, list) assert len(next_results) == 1 # The query should be stopped automatically after exiting the context - with pytest.raises(ClientError): - query.fetch_next(1) + with pytest.raises(UpstashError): + handle.fetch_next(1) - with pytest.raises(ClientError): - query.stop() + with pytest.raises(UpstashError): + handle.stop() + time.sleep(1) assert_eventually(assertion) @@ -280,102 +282,31 @@ async def test_resumable_query_async_context_manager(async_index: AsyncIndex, ns ) async def assertion(): - async with await async_index.resumable_query( + result, handle = await async_index.resumable_query( vector=[0.1, 0.2], top_k=2, include_metadata=True, include_vectors=True, namespace=ns, - ) as query: - initial_results = await query.async_start() - assert isinstance(initial_results, list) - assert len(initial_results) > 0 - assert "id" in initial_results[0] - assert "metadata" in initial_results[0] - - next_results = await query.async_fetch_next(1) + ) + + async with handle: + assert isinstance(result, list) + assert len(result) > 0 + assert result[0].metadata is not None + assert result[0].vector is not None + + next_results = await handle.fetch_next(1) assert isinstance(next_results, list) assert len(next_results) == 1 # The query should be stopped automatically after exiting the context - with pytest.raises(ClientError): - await query.async_fetch_next(1) + with pytest.raises(UpstashError): + await handle.fetch_next(1) - with pytest.raises(ClientError): - await query.async_stop() - - await assert_eventually_async(assertion) - - -@pytest.mark.parametrize("ns", NAMESPACES) -def test_resumable_query_context_manager_exception_handling(index: Index, ns: str): - index.upsert( - vectors=[ - ("id1", [0.1, 0.2], {"field": "value1"}), - ("id2", [0.3, 0.4], {"field": "value2"}), - ], - namespace=ns, - ) - - def assertion(): - try: - with index.resumable_query( - vector=[0.1, 0.2], - top_k=2, - include_metadata=True, - namespace=ns, - ) as query: - initial_results = query.start() - assert len(initial_results) == 2 - raise ValueError("Test exception") - except ValueError: - pass - - # The query should be stopped even if an exception occurred - - with pytest.raises(ClientError): - query.fetch_next(1) - - with pytest.raises(ClientError): - query.stop() - - assert_eventually(assertion) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("ns", NAMESPACES) -async def test_resumable_query_async_context_manager_exception_handling( - async_index: AsyncIndex, ns: str -): - await async_index.upsert( - vectors=[ - ("id1", [0.1, 0.2], {"field": "value1"}), - ("id2", [0.3, 0.4], {"field": "value2"}), - ], - namespace=ns, - ) - - async def assertion(): - try: - async with await async_index.resumable_query( - vector=[0.1, 0.2], - top_k=2, - include_metadata=True, - namespace=ns, - ) as query: - initial_results = await query.async_start() - assert len(initial_results) == 2 - raise ValueError("Test exception") - except ValueError: - pass - - # The query should be stopped even if an exception occurred - - with pytest.raises(ClientError): - await query.async_fetch_next(1) - - with pytest.raises(ClientError): - await query.async_stop() + with pytest.raises(UpstashError): + await handle.stop() + await asyncio.sleep(1) await assert_eventually_async(assertion) diff --git a/upstash_vector/__init__.py b/upstash_vector/__init__.py index a3d0ca2..48cd11f 100644 --- a/upstash_vector/__init__.py +++ b/upstash_vector/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.5.0" +__version__ = "0.6.0" from upstash_vector.client import Index, AsyncIndex from upstash_vector.types import Vector diff --git a/upstash_vector/core/index_operations.py b/upstash_vector/core/index_operations.py index c783154..ceeac15 100644 --- a/upstash_vector/core/index_operations.py +++ b/upstash_vector/core/index_operations.py @@ -1,7 +1,14 @@ -# Define vector operations here: -# Upsert and query functions and signatures - -from typing import Sequence, Union, List, Dict, Optional, Any +from typing import ( + Sequence, + Union, + List, + Dict, + Optional, + Any, + Tuple, + Callable, + Awaitable, +) from upstash_vector.errors import ClientError from upstash_vector.types import ( @@ -22,7 +29,6 @@ convert_to_vectors, convert_to_payload, ) -from upstash_vector.core.resumable_query import ResumableQuery DEFAULT_NAMESPACE = "" @@ -39,6 +45,10 @@ LIST_NAMESPACES_PATH = "/list-namespaces" DELETE_NAMESPACE_PATH = "/delete-namespace" UPDATE_PATH = "/update" +RESUMABLE_QUERY_PATH = "/resumable-query" +RESUMABLE_QUERY_DATA_PATH = "/resumable-query-data" +RESUMABLE_QUERY_NEXT_PATH = "/resumable-query-next" +RESUMABLE_QUERY_END_PATH = "/resumable-query-end" def _path_for(namespace: str, path: str) -> str: @@ -272,9 +282,11 @@ def resumable_query( namespace: str = DEFAULT_NAMESPACE, include_data: bool = False, max_idle: int = 3600, - ) -> ResumableQuery: + ) -> Tuple[List[QueryResult], "ResumableQueryHandle"]: """ - Creates a resumable query. After fetching the necessary results, it's strongly recommended to stop the query with the `.stop()` method. + Creates a resumable query. + After fetching the necessary results, it's recommended to stop the query + to release the acquired resources. :param vector: The vector value to query. :param top_k: How many vectors will be returned as the query result. @@ -286,25 +298,30 @@ def resumable_query( :param include_data: Whether the resulting vectors will have their unstructured data or not. :param max_idle: Maximum idle time for the resumable query in seconds. - :return: A ResumableQuery object. + :return: First batch of the results, along with a handle to fetch more or stop the query. Example usage: ```python - query = index.resumable_query( + result, handle = index.resumable_query( vector=[0.6, 0.9], top_k=100, include_vectors=False, include_metadata=True, - max_idle=7200 ) - result = query.start() - # Fetch results in batches - batch1 = query.fetch_next(10) - batch2 = query.fetch_next(20) - query.stop() + # Fetch the next batch of the results + result1 = handle.fetch_next(10) + result2 = handle.fetch_next(20) + handle.stop() ``` """ + if data is None and vector is None: + raise ClientError("either `data` or `vector` values must be given") + if data is not None and vector is not None: + raise ClientError( + "`data` and `vector` values cannot be given at the same time" + ) + payload = { "topK": top_k, "includeVectors": include_vectors, @@ -314,19 +331,19 @@ def resumable_query( "maxIdle": max_idle, } - if data is None and vector is None: - raise ClientError("either `data` or `vector` values must be given") - if data is not None and vector is not None: - raise ClientError( - "`data` and `vector` values cannot be given at the same time" - ) - if data is not None: payload["data"] = data + path = RESUMABLE_QUERY_DATA_PATH else: payload["vector"] = convert_to_list(vector) + path = RESUMABLE_QUERY_PATH + + result = self._execute_request(payload=payload, path=_path_for(namespace, path)) + + uid = result["uuid"] + scores = [QueryResult._from_json(obj) for obj in result["scores"]] - return ResumableQuery(payload, self, namespace) + return scores, ResumableQueryHandle(self._execute_request, uid) def delete( self, @@ -762,9 +779,11 @@ async def resumable_query( namespace: str = DEFAULT_NAMESPACE, include_data: bool = False, max_idle: int = 3600, - ) -> ResumableQuery: + ) -> Tuple[List[QueryResult], "AsyncResumableQueryHandle"]: """ - Creates a resumable query. After fetching the necessary results, it's strongly recommended to stop the query with the `.stop()` method. + Creates a resumable query. + After fetching the necessary results, it's recommended to stop the query + to release the acquired resources. :param vector: The vector value to query. :param top_k: How many vectors will be returned as the query result. @@ -776,25 +795,30 @@ async def resumable_query( :param include_data: Whether the resulting vectors will have their unstructured data or not. :param max_idle: Maximum idle time for the resumable query in seconds. - :return: A ResumableQuery object. + :return: First batch of the results, along with a handle to fetch more or stop the query. Example usage: ```python - query = await index.resumable_query( + result, handle = await index.resumable_query( vector=[0.6, 0.9], top_k=100, include_vectors=False, include_metadata=True, - max_idle=7200 ) - result = await query.start() - # Fetch results in batches - batch1 = await query.fetch_next(10) - batch2 = await query.fetch_next(20) - await query.stop() + # Fetch the next batch of results + result1 = await handle.fetch_next(10) + result2 = await handle.fetch_next(20) + await handle.stop() ``` """ + if data is None and vector is None: + raise ClientError("either `data` or `vector` values must be given") + if data is not None and vector is not None: + raise ClientError( + "`data` and `vector` values cannot be given at the same time" + ) + payload = { "topK": top_k, "includeVectors": include_vectors, @@ -804,19 +828,21 @@ async def resumable_query( "maxIdle": max_idle, } - if data is None and vector is None: - raise ClientError("either `data` or `vector` values must be given") - if data is not None and vector is not None: - raise ClientError( - "`data` and `vector` values cannot be given at the same time" - ) - if data is not None: payload["data"] = data + path = RESUMABLE_QUERY_DATA_PATH else: payload["vector"] = convert_to_list(vector) + path = RESUMABLE_QUERY_PATH - return ResumableQuery(payload, self, namespace) + result = await self._execute_request_async( + payload=payload, path=_path_for(namespace, path) + ) + + uid = result["uuid"] + scores = [QueryResult._from_json(obj) for obj in result["scores"]] + + return scores, AsyncResumableQueryHandle(self._execute_request_async, uid) async def delete( self, @@ -1032,3 +1058,85 @@ async def delete_namespace(self, namespace: str) -> None: await self._execute_request_async( payload=None, path=_path_for(namespace, DELETE_NAMESPACE_PATH) ) + + +class ResumableQueryHandle: + """ + A class representing a resumable query for vector search operations. + + This class allows for fetching next results, and stopping a resumable query. + """ + + def __init__(self, exec_fn: Callable[[Any, str], Any], uid: str): + self._exec_fn = exec_fn + self._uid = uid + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.stop() + + def fetch_next(self, additional_k: int) -> List[QueryResult]: + """ + Fetches the next batch of results. + + Args: + additional_k (int): The number of additional results to fetch. + + Returns: + List[QueryResult]: The next batch of query results. + """ + payload = {"uuid": self._uid, "additionalK": additional_k} + result = self._exec_fn(payload, RESUMABLE_QUERY_NEXT_PATH) + return [QueryResult._from_json(obj) for obj in result] + + def stop(self) -> None: + """ + Stops the resumable query. + """ + payload = {"uuid": self._uid} + self._exec_fn(payload, RESUMABLE_QUERY_END_PATH) + + +class AsyncResumableQueryHandle: + """ + A class representing a resumable query for vector search operations. + + This class allows for fetching next results, and stopping a resumable query. + """ + + def __init__( + self, + exec_fn: Callable[[Any, str], Awaitable[Any]], + uid: str, + ): + self._exec_fn = exec_fn + self._uid = uid + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.stop() + + async def fetch_next(self, additional_k: int) -> List[QueryResult]: + """ + Fetches the next batch of results. + + Args: + additional_k (int): The number of additional results to fetch. + + Returns: + List[QueryResult]: The next batch of query results. + """ + payload = {"uuid": self._uid, "additionalK": additional_k} + result = await self._exec_fn(payload, RESUMABLE_QUERY_NEXT_PATH) + return [QueryResult._from_json(obj) for obj in result] + + async def stop(self) -> None: + """ + Stops the resumable query. + """ + payload = {"uuid": self._uid} + await self._exec_fn(payload, RESUMABLE_QUERY_END_PATH) diff --git a/upstash_vector/core/resumable_query.py b/upstash_vector/core/resumable_query.py deleted file mode 100644 index 7906ef3..0000000 --- a/upstash_vector/core/resumable_query.py +++ /dev/null @@ -1,223 +0,0 @@ -from typing import List, Optional, Dict, Any -from upstash_vector.types import QueryResult -from upstash_vector.errors import ClientError - -RESUMABLE_QUERY_VECTOR_PATH = "/resumable-query" -RESUMABLE_QUERY_DATA_PATH = "/resumable-query-data" -RESUMABLE_QUERY_NEXT_PATH = "/resumable-query-next" -RESUMABLE_QUERY_END_PATH = "/resumable-query-end" - - -class ResumableQuery: - """ - A class representing a resumable query for vector search operations. - - This class allows for starting, fetching next results, and stopping a resumable query. - It supports both synchronous and asynchronous operations. - - Attributes: - payload (Dict[str, Any]): The query payload. - client: The client object for executing requests. - namespace (Optional[str]): The namespace for the query. - uuid (Optional[str]): The unique identifier for the resumable query session. - """ - - def __init__( - self, payload: Dict[str, Any], client, namespace: Optional[str] = None - ): - """ - Initialize a ResumableQuery instance. - - Args: - payload (Dict[str, Any]): The query payload. - client: The client object for executing requests. - namespace (Optional[str]): The namespace for the query. Defaults to None. - """ - self.payload = payload - self.client = client - self.namespace = namespace - self.uuid = None - - def __enter__(self): - """ - Start the resumable query asynchronously. - Enter the runtime context related to this object. - The with statement will bind this method's return value to the target(s) - specified in the as clause of the statement, if any. - Returns: - self: The ResumableQuery instance. - """ - return self - - def __exit__(self, exc_type, exc_value, traceback): - """ - Exit the runtime context related to this object. - The parameters describe the exception that caused the context to be exited. - If the context was exited without an exception, all three arguments will be None. - Args: - exc_type: The exception type if an exception was raised in the context, else None. - exc_value: The exception instance if an exception was raised in the context, else None. - traceback: The traceback if an exception was raised in the context, else None. - """ - self.stop() - - async def __aenter__(self): - """ - Enter the runtime context related to this object asynchronously. - Returns: - self: The ResumableQuery instance. - """ - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - """ - Exit the runtime context related to this object asynchronously. - Args: - exc_type: The exception type if an exception was raised in the context, else None. - exc_value: The exception instance if an exception was raised in the context, else None. - traceback: The traceback if an exception was raised in the context, else None. - """ - await self.async_stop() - - async def async_start(self) -> List[QueryResult]: - """ - Start the resumable query asynchronously. - - Returns: - List[QueryResult]: The initial query results. - - Raises: - ClientError: If the payload doesn't contain 'vector' or 'data' key. - """ - if "vector" in self.payload: - path = RESUMABLE_QUERY_VECTOR_PATH - elif "data" in self.payload: - path = RESUMABLE_QUERY_DATA_PATH - else: - raise ClientError("Payload must contain either 'vector' or 'data' key.") - - if self.namespace: - path = f"{path}/{self.namespace}" - - result = await self.client._execute_request_async( - payload=self.payload, path=path - ) - self.uuid = result["uuid"] - return result["scores"] - - def start(self) -> List[QueryResult]: - """ - Start the resumable query synchronously. - - Returns: - List[QueryResult]: The initial query results. - - Raises: - ClientError: If the payload doesn't contain 'vector' or 'data' key, - or if the resumable query couldn't be started. - """ - if "vector" in self.payload: - path = RESUMABLE_QUERY_VECTOR_PATH - elif "data" in self.payload: - path = RESUMABLE_QUERY_DATA_PATH - else: - raise ClientError("Payload must contain either 'vector' or 'data' key.") - - if self.namespace: - path = f"{path}/{self.namespace}" - - result = self.client._execute_request(payload=self.payload, path=path) - - self.uuid = result["uuid"] - if not self.uuid: - raise ClientError("Resumable query could not be started.") - - return [QueryResult._from_json(obj) for obj in result.get("scores", [])] - - def fetch_next(self, additional_k: int) -> List[QueryResult]: - """ - Fetch the next batch of results synchronously. - - Args: - additional_k (int): The number of additional results to fetch. - - Returns: - List[QueryResult]: The next batch of query results. - - Raises: - ClientError: If the resumable query hasn't been started. - """ - if self.uuid is None: - raise ClientError( - "Resumable query has not been started. Call start() first." - ) - payload = {"uuid": self.uuid, "additionalK": additional_k} - result = self.client._execute_request( - payload=payload, path=RESUMABLE_QUERY_NEXT_PATH - ) - return [QueryResult._from_json(obj) for obj in result] - - async def async_fetch_next(self, additional_k: int) -> List[QueryResult]: - """ - Fetch the next batch of results asynchronously. - - Args: - additional_k (int): The number of additional results to fetch. - - Returns: - List[QueryResult]: The next batch of query results. - - Raises: - ClientError: If the resumable query hasn't been started. - """ - if not self.uuid: - raise ClientError( - "Resumable query has not been started. Call start() first." - ) - payload = {"uuid": self.uuid, "additionalK": additional_k} - result = await self.client._execute_request_async( - payload=payload, path=RESUMABLE_QUERY_NEXT_PATH - ) - return [QueryResult._from_json(obj) for obj in result] - - def stop(self) -> str: - """ - Stop the resumable query synchronously. - - Returns: - str: The result of stopping the query. - - Raises: - ClientError: If the resumable query hasn't been started. - """ - if not self.uuid: - raise ClientError( - "Resumable query has not been started. Call start() first." - ) - payload = {"uuid": self.uuid} - result = self.client._execute_request( - payload=payload, path=RESUMABLE_QUERY_END_PATH - ) - self.uuid = None - return result - - async def async_stop(self) -> str: - """ - Stop the resumable query asynchronously. - - Returns: - str: The result of stopping the query. - - Raises: - ClientError: If the resumable query hasn't been started. - """ - if not self.uuid: - raise ClientError( - "Resumable query has not been started. Call start() first." - ) - payload = {"uuid": self.uuid} - result = await self.client._execute_request_async( - payload=payload, path=RESUMABLE_QUERY_END_PATH - ) - self.uuid = None - return result