From 0c24fb326e4396e5110cd06826a1481199b054be Mon Sep 17 00:00:00 2001 From: Isaac Francisco <78627776+isahers1@users.noreply.github.com> Date: Mon, 20 Jan 2025 13:50:22 -0800 Subject: [PATCH] python[patch]: Add annotation queue async methods (#1396) Co-authored-by: MarkRx --- python/langsmith/async_client.py | 224 ++++++++++++++++++ python/langsmith/client.py | 2 +- .../integration_tests/test_async_client.py | 171 +++++++++++++ python/tests/integration_tests/test_client.py | 129 ++++++++++ 4 files changed, 525 insertions(+), 1 deletion(-) diff --git a/python/langsmith/async_client.py b/python/langsmith/async_client.py index 8245edbab..0be62924d 100644 --- a/python/langsmith/async_client.py +++ b/python/langsmith/async_client.py @@ -25,6 +25,8 @@ from langsmith import utils as ls_utils from langsmith._internal import _beta_decorator as ls_beta +ID_TYPE = Union[uuid.UUID, str] + class AsyncClient: """Async Client for interacting with the LangSmith API.""" @@ -95,6 +97,14 @@ async def _arequest_with_retries( ) -> httpx.Response: """Make an async HTTP request with retries.""" max_retries = cast(int, self._retry_config.get("max_retries", 3)) + + # Python requests library used by the normal Client filters out params with None values + # The httpx library does not. Filter them out here to keep behavior consistent + if "params" in kwargs: + params = kwargs["params"] + filtered_params = {k: v for k, v in params.items() if v is not None} + kwargs["params"] = filtered_params + for attempt in range(max_retries): try: response = await self._client.request(method, endpoint, **kwargs) @@ -826,6 +836,220 @@ async def list_feedback( if limit is not None and ix >= limit: break + async def delete_feedback(self, feedback_id: ID_TYPE) -> None: + """Delete a feedback by ID. + + Args: + feedback_id (Union[UUID, str]): + The ID of the feedback to delete. + + Returns: + None + """ + response = await self._arequest_with_retries( + "DELETE", f"/feedback/{ls_client._as_uuid(feedback_id, 'feedback_id')}" + ) + ls_utils.raise_for_status_with_text(response) + + # Annotation Queue API + + async def list_annotation_queues( + self, + *, + queue_ids: Optional[List[ID_TYPE]] = None, + name: Optional[str] = None, + name_contains: Optional[str] = None, + limit: Optional[int] = None, + ) -> AsyncIterator[ls_schemas.AnnotationQueue]: + """List the annotation queues on the LangSmith API. + + Args: + queue_ids (Optional[List[Union[UUID, str]]]): + The IDs of the queues to filter by. + name (Optional[str]): + The name of the queue to filter by. + name_contains (Optional[str]): + The substring that the queue name should contain. + limit (Optional[int]): + The maximum number of queues to return. + + Yields: + The annotation queues. + """ + params: dict = { + "ids": ( + [ + ls_client._as_uuid(id_, f"queue_ids[{i}]") + for i, id_ in enumerate(queue_ids) + ] + if queue_ids is not None + else None + ), + "name": name, + "name_contains": name_contains, + "limit": min(limit, 100) if limit is not None else 100, + } + ix = 0 + async for feedback in self._aget_paginated_list( + "/annotation-queues", params=params + ): + yield ls_schemas.AnnotationQueue(**feedback) + ix += 1 + if limit is not None and ix >= limit: + break + + async def create_annotation_queue( + self, + *, + name: str, + description: Optional[str] = None, + queue_id: Optional[ID_TYPE] = None, + ) -> ls_schemas.AnnotationQueue: + """Create an annotation queue on the LangSmith API. + + Args: + name (str): + The name of the annotation queue. + description (Optional[str]): + The description of the annotation queue. + queue_id (Optional[Union[UUID, str]]): + The ID of the annotation queue. + + Returns: + AnnotationQueue: The created annotation queue object. + """ + body = { + "name": name, + "description": description, + "id": str(queue_id) if queue_id is not None else str(uuid.uuid4()), + } + response = await self._arequest_with_retries( + "POST", + "/annotation-queues", + json={k: v for k, v in body.items() if v is not None}, + ) + ls_utils.raise_for_status_with_text(response) + return ls_schemas.AnnotationQueue( + **response.json(), + ) + + async def read_annotation_queue( + self, queue_id: ID_TYPE + ) -> ls_schemas.AnnotationQueue: + """Read an annotation queue with the specified queue ID. + + Args: + queue_id (Union[UUID, str]): The ID of the annotation queue to read. + + Returns: + AnnotationQueue: The annotation queue object. + """ + # TODO: Replace when actual endpoint is added + return await self.list_annotation_queues(queue_ids=[queue_id]).__anext__() + + async def update_annotation_queue( + self, queue_id: ID_TYPE, *, name: str, description: Optional[str] = None + ) -> None: + """Update an annotation queue with the specified queue_id. + + Args: + queue_id (Union[UUID, str]): The ID of the annotation queue to update. + name (str): The new name for the annotation queue. + description (Optional[str]): The new description for the + annotation queue. Defaults to None. + + Returns: + None + """ + response = await self._arequest_with_retries( + "PATCH", + f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}", + json={ + "name": name, + "description": description, + }, + ) + ls_utils.raise_for_status_with_text(response) + + async def delete_annotation_queue(self, queue_id: ID_TYPE) -> None: + """Delete an annotation queue with the specified queue ID. + + Args: + queue_id (Union[UUID, str]): The ID of the annotation queue to delete. + + Returns: + None + """ + response = await self._arequest_with_retries( + "DELETE", + f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}", + headers={"Accept": "application/json", **self._client.headers}, + ) + ls_utils.raise_for_status_with_text(response) + + async def add_runs_to_annotation_queue( + self, queue_id: ID_TYPE, *, run_ids: List[ID_TYPE] + ) -> None: + """Add runs to an annotation queue with the specified queue ID. + + Args: + queue_id (Union[UUID, str]): The ID of the annotation queue. + run_ids (List[Union[UUID, str]]): The IDs of the runs to be added to the annotation + queue. + + Returns: + None + """ + response = await self._arequest_with_retries( + "POST", + f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/runs", + json=[ + str(ls_client._as_uuid(id_, f"run_ids[{i}]")) + for i, id_ in enumerate(run_ids) + ], + ) + ls_utils.raise_for_status_with_text(response) + + async def delete_run_from_annotation_queue( + self, queue_id: ID_TYPE, *, run_id: ID_TYPE + ) -> None: + """Delete a run from an annotation queue with the specified queue ID and run ID. + + Args: + queue_id (Union[UUID, str]): The ID of the annotation queue. + run_id (Union[UUID, str]): The ID of the run to be added to the annotation + queue. + + Returns: + None + """ + response = await self._arequest_with_retries( + "DELETE", + f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/runs/{ls_client._as_uuid(run_id, 'run_id')}", + ) + ls_utils.raise_for_status_with_text(response) + + async def get_run_from_annotation_queue( + self, queue_id: ID_TYPE, *, index: int + ) -> ls_schemas.RunWithAnnotationQueueInfo: + """Get a run from an annotation queue at the specified index. + + Args: + queue_id (Union[UUID, str]): The ID of the annotation queue. + index (int): The index of the run to retrieve. + + Returns: + RunWithAnnotationQueueInfo: The run at the specified index. + + Raises: + LangSmithNotFoundError: If the run is not found at the given index. + LangSmithError: For other API-related errors. + """ + base_url = f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/run" + response = await self._arequest_with_retries("GET", f"{base_url}/{index}") + ls_utils.raise_for_status_with_text(response) + return ls_schemas.RunWithAnnotationQueueInfo(**response.json()) + @ls_beta.warn_beta async def index_dataset( self, diff --git a/python/langsmith/client.py b/python/langsmith/client.py index c7bb8299c..4cbe2dbd9 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5701,7 +5701,7 @@ def create_annotation_queue( body = { "name": name, "description": description, - "id": queue_id or str(uuid.uuid4()), + "id": str(queue_id) if queue_id is not None else str(uuid.uuid4()), } response = self.request_with_retries( "POST", diff --git a/python/tests/integration_tests/test_async_client.py b/python/tests/integration_tests/test_async_client.py index ecd03c87f..c311fc434 100644 --- a/python/tests/integration_tests/test_async_client.py +++ b/python/tests/integration_tests/test_async_client.py @@ -276,3 +276,174 @@ async def check_feedbacks(): return len(feedbacks) == 3 await wait_for(check_feedbacks, timeout=10) + + feedbacks = [ + feedback async for feedback in async_client.list_feedback(run_ids=[run_id]) + ] + assert len(feedbacks) == 3 + + +@pytest.mark.asyncio +async def test_delete_feedback(async_client: AsyncClient): + """Test deleting feedback.""" + project_name = "__test_delete_feedback" + uuid.uuid4().hex[:8] + run_id = uuid.uuid4() + + await async_client.create_run( + name="test_run", + inputs={"input": "hello"}, + run_type="llm", + project_name=project_name, + id=run_id, + start_time=datetime.datetime.now(datetime.timezone.utc), + ) + + # Create feedback + feedback = await async_client.create_feedback( + run_id=run_id, + key="test_feedback", + value=1, + comment="test comment", + ) + + # Delete the feedback + await async_client.delete_feedback(feedback.id) + + # Verify feedback is deleted by checking list_feedback + feedbacks = [ + feedback async for feedback in async_client.list_feedback(run_ids=[run_id]) + ] + assert len(feedbacks) == 0 + + +@pytest.mark.asyncio +async def test_annotation_queue_crud(async_client: AsyncClient): + """Test basic CRUD operations for annotation queues.""" + queue_name = f"test_queue_{uuid.uuid4().hex[:8]}" + queue_id = uuid.uuid4() + + # Test creation + queue = await async_client.create_annotation_queue( + name=queue_name, description="Test queue", queue_id=queue_id + ) + assert queue.name == queue_name + assert queue.id == queue_id + + # Test reading + read_queue = await async_client.read_annotation_queue(queue_id) + assert read_queue.id == queue_id + assert read_queue.name == queue_name + + # Test updating + new_name = f"updated_{queue_name}" + await async_client.update_annotation_queue( + queue_id=queue_id, name=new_name, description="Updated description" + ) + + updated_queue = await async_client.read_annotation_queue(queue_id) + assert updated_queue.name == new_name + + # Test deletion + await async_client.delete_annotation_queue(queue_id) + + # Verify deletion + queues = [ + queue + async for queue in async_client.list_annotation_queues(queue_ids=[queue_id]) + ] + assert len(queues) == 0 + + +@pytest.mark.asyncio +async def test_list_annotation_queues(async_client: AsyncClient): + """Test listing and filtering annotation queues.""" + queue_names = [f"test_queue_{i}_{uuid.uuid4().hex[:8]}" for i in range(3)] + queue_ids = [] + + try: + # Create test queues + for name in queue_names: + queue = await async_client.create_annotation_queue( + name=name, description="Test queue" + ) + queue_ids.append(queue.id) + + # Test listing with various filters + queues = [ + queue + async for queue in async_client.list_annotation_queues( + queue_ids=queue_ids[:2], limit=2 + ) + ] + assert len(queues) == 2 + + # Test name filter + queues = [ + queue + async for queue in async_client.list_annotation_queues(name=queue_names[0]) + ] + assert len(queues) == 1 + assert queues[0].name == queue_names[0] + + # Test name_contains filter + queues = [ + queue + async for queue in async_client.list_annotation_queues( + name_contains="test_queue" + ) + ] + assert len(queues) >= 3 # Could be more if other tests left queues + + finally: + # Clean up + for queue_id in queue_ids: + await async_client.delete_annotation_queue(queue_id) + + +@pytest.mark.asyncio +async def test_annotation_queue_runs(async_client: AsyncClient): + """Test managing runs within an annotation queue.""" + queue_name = f"test_queue_{uuid.uuid4().hex[:8]}" + project_name = f"test_project_{uuid.uuid4().hex[:8]}" + + # Create a queue + queue = await async_client.create_annotation_queue( + name=queue_name, description="Test queue" + ) + + # Create some test runs + run_ids = [uuid.uuid4() for _ in range(3)] + for i in range(3): + await async_client.create_run( + name=f"test_run_{i}", + inputs={"input": f"test_{i}"}, + run_type="llm", + project_name=project_name, + start_time=datetime.datetime.now(datetime.timezone.utc), + id=run_ids[i], + ) + + # Add runs to queue + await async_client.add_runs_to_annotation_queue(queue_id=queue.id, run_ids=run_ids) + + # Test getting run at index + run_info = await async_client.get_run_from_annotation_queue( + queue_id=queue.id, index=0 + ) + assert run_info.id in run_ids + + # Test deleting a run from queue + await async_client.delete_run_from_annotation_queue( + queue_id=queue.id, run_id=run_ids[2] + ) + + # Test that runs are deleted + with pytest.raises(ls_utils.LangSmithAPIError): + await async_client.get_run_from_annotation_queue(queue_id=queue.id, index=2) + + run_1 = await async_client.get_run_from_annotation_queue(queue_id=queue.id, index=0) + run_2 = await async_client.get_run_from_annotation_queue(queue_id=queue.id, index=1) + assert sorted([run_1.id, run_2.id]) == sorted(run_ids[:2]) + + # Clean up + await async_client.delete_annotation_queue(queue.id) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 3aa72baa1..b680cbba0 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -2124,3 +2124,132 @@ def test_update_examples_multipart(langchain_client: Client) -> None: # Clean up langchain_client.delete_dataset(dataset_id=dataset.id) + + +def test_annotation_queue_crud(langchain_client: Client): + """Test basic CRUD operations for annotation queues.""" + queue_name = f"test_queue_{uuid.uuid4().hex[:8]}" + queue_id = uuid.uuid4() + + # Test creation + queue = langchain_client.create_annotation_queue( + name=queue_name, description="Test queue", queue_id=queue_id + ) + assert queue.name == queue_name + assert queue.id == queue_id + + # Test reading + read_queue = langchain_client.read_annotation_queue(queue_id) + assert read_queue.id == queue_id + assert read_queue.name == queue_name + + # Test updating + new_name = f"updated_{queue_name}" + langchain_client.update_annotation_queue( + queue_id=queue_id, name=new_name, description="Updated description" + ) + + updated_queue = langchain_client.read_annotation_queue(queue_id) + assert updated_queue.name == new_name + + # Test deletion + langchain_client.delete_annotation_queue(queue_id) + + # Verify deletion + queues = list(langchain_client.list_annotation_queues(queue_ids=[queue_id])) + assert len(queues) == 0 + + +def test_list_annotation_queues(langchain_client: Client): + """Test listing and filtering annotation queues.""" + queue_names = [f"test_queue_{i}_{uuid.uuid4().hex[:8]}" for i in range(3)] + queue_ids = [] + + try: + # Create test queues + for name in queue_names: + queue = langchain_client.create_annotation_queue( + name=name, description="Test queue" + ) + queue_ids.append(queue.id) + + # Test listing with various filters + queues = list( + langchain_client.list_annotation_queues(queue_ids=queue_ids[:2], limit=2) + ) + assert len(queues) == 2 + + # Test name filter + queues = list(langchain_client.list_annotation_queues(name=queue_names[0])) + assert len(queues) == 1 + assert queues[0].name == queue_names[0] + + # Test name_contains filter + queues = list( + langchain_client.list_annotation_queues(name_contains="test_queue") + ) + assert len(queues) >= 3 # Could be more if other tests left queues + + finally: + # Clean up + for queue_id in queue_ids: + langchain_client.delete_annotation_queue(queue_id) + + +def test_annotation_queue_runs(langchain_client: Client): + """Test managing runs within an annotation queue.""" + queue_name = f"test_queue_{uuid.uuid4().hex[:8]}" + project_name = f"test_project_{uuid.uuid4().hex[:8]}" + # Create a queue + queue = langchain_client.create_annotation_queue( + name=queue_name, description="Test queue" + ) + + # Create some test runs + run_ids = [uuid.uuid4() for _ in range(3)] + for i in range(3): + langchain_client.create_run( + name=f"test_run_{i}", + inputs={"input": f"test_{i}"}, + run_type="llm", + project_name=project_name, + start_time=datetime.datetime.now(datetime.timezone.utc), + id=run_ids[i], + ) + + def _get_run(run_id: ID_TYPE, has_end: bool = False) -> bool: + try: + r = langchain_client.read_run(run_id) # type: ignore + if has_end: + return r.end_time is not None + return True + except LangSmithError: + return False + + wait_for(lambda: _get_run(run_ids[0])) + wait_for(lambda: _get_run(run_ids[1])) + wait_for(lambda: _get_run(run_ids[2])) + # Add runs to queue + langchain_client.add_runs_to_annotation_queue(queue_id=queue.id, run_ids=run_ids) + + # Test getting run at index + run_info = langchain_client.get_run_from_annotation_queue( + queue_id=queue.id, index=0 + ) + assert run_info.id in run_ids + + # Test deleting a run from queue + langchain_client.delete_run_from_annotation_queue( + queue_id=queue.id, run_id=run_ids[2] + ) + + # Test that runs are deleted + with pytest.raises(LangSmithNotFoundError): + langchain_client.get_run_from_annotation_queue(queue_id=queue.id, index=2) + + run_1 = langchain_client.get_run_from_annotation_queue(queue_id=queue.id, index=0) + run_2 = langchain_client.get_run_from_annotation_queue(queue_id=queue.id, index=1) + assert sorted([run_1.id, run_2.id]) == sorted(run_ids[:2]) + + # Clean up + langchain_client.delete_annotation_queue(queue.id)