Skip to content

Commit

Permalink
python[patch]: Add annotation queue async methods (#1396)
Browse files Browse the repository at this point in the history
Co-authored-by: MarkRx <[email protected]>
  • Loading branch information
isahers1 and MarkRx authored Jan 20, 2025
1 parent 98c9009 commit 0c24fb3
Show file tree
Hide file tree
Showing 4 changed files with 525 additions and 1 deletion.
224 changes: 224 additions & 0 deletions python/langsmith/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from langsmith import utils as ls_utils
from langsmith._internal import _beta_decorator as ls_beta

ID_TYPE = Union[uuid.UUID, str]


class AsyncClient:
"""Async Client for interacting with the LangSmith API."""
Expand Down Expand Up @@ -95,6 +97,14 @@ async def _arequest_with_retries(
) -> httpx.Response:
"""Make an async HTTP request with retries."""
max_retries = cast(int, self._retry_config.get("max_retries", 3))

# Python requests library used by the normal Client filters out params with None values
# The httpx library does not. Filter them out here to keep behavior consistent
if "params" in kwargs:
params = kwargs["params"]
filtered_params = {k: v for k, v in params.items() if v is not None}
kwargs["params"] = filtered_params

for attempt in range(max_retries):
try:
response = await self._client.request(method, endpoint, **kwargs)
Expand Down Expand Up @@ -826,6 +836,220 @@ async def list_feedback(
if limit is not None and ix >= limit:
break

async def delete_feedback(self, feedback_id: ID_TYPE) -> None:
"""Delete a feedback by ID.
Args:
feedback_id (Union[UUID, str]):
The ID of the feedback to delete.
Returns:
None
"""
response = await self._arequest_with_retries(
"DELETE", f"/feedback/{ls_client._as_uuid(feedback_id, 'feedback_id')}"
)
ls_utils.raise_for_status_with_text(response)

# Annotation Queue API

async def list_annotation_queues(
self,
*,
queue_ids: Optional[List[ID_TYPE]] = None,
name: Optional[str] = None,
name_contains: Optional[str] = None,
limit: Optional[int] = None,
) -> AsyncIterator[ls_schemas.AnnotationQueue]:
"""List the annotation queues on the LangSmith API.
Args:
queue_ids (Optional[List[Union[UUID, str]]]):
The IDs of the queues to filter by.
name (Optional[str]):
The name of the queue to filter by.
name_contains (Optional[str]):
The substring that the queue name should contain.
limit (Optional[int]):
The maximum number of queues to return.
Yields:
The annotation queues.
"""
params: dict = {
"ids": (
[
ls_client._as_uuid(id_, f"queue_ids[{i}]")
for i, id_ in enumerate(queue_ids)
]
if queue_ids is not None
else None
),
"name": name,
"name_contains": name_contains,
"limit": min(limit, 100) if limit is not None else 100,
}
ix = 0
async for feedback in self._aget_paginated_list(
"/annotation-queues", params=params
):
yield ls_schemas.AnnotationQueue(**feedback)
ix += 1
if limit is not None and ix >= limit:
break

async def create_annotation_queue(
self,
*,
name: str,
description: Optional[str] = None,
queue_id: Optional[ID_TYPE] = None,
) -> ls_schemas.AnnotationQueue:
"""Create an annotation queue on the LangSmith API.
Args:
name (str):
The name of the annotation queue.
description (Optional[str]):
The description of the annotation queue.
queue_id (Optional[Union[UUID, str]]):
The ID of the annotation queue.
Returns:
AnnotationQueue: The created annotation queue object.
"""
body = {
"name": name,
"description": description,
"id": str(queue_id) if queue_id is not None else str(uuid.uuid4()),
}
response = await self._arequest_with_retries(
"POST",
"/annotation-queues",
json={k: v for k, v in body.items() if v is not None},
)
ls_utils.raise_for_status_with_text(response)
return ls_schemas.AnnotationQueue(
**response.json(),
)

async def read_annotation_queue(
self, queue_id: ID_TYPE
) -> ls_schemas.AnnotationQueue:
"""Read an annotation queue with the specified queue ID.
Args:
queue_id (Union[UUID, str]): The ID of the annotation queue to read.
Returns:
AnnotationQueue: The annotation queue object.
"""
# TODO: Replace when actual endpoint is added
return await self.list_annotation_queues(queue_ids=[queue_id]).__anext__()

async def update_annotation_queue(
self, queue_id: ID_TYPE, *, name: str, description: Optional[str] = None
) -> None:
"""Update an annotation queue with the specified queue_id.
Args:
queue_id (Union[UUID, str]): The ID of the annotation queue to update.
name (str): The new name for the annotation queue.
description (Optional[str]): The new description for the
annotation queue. Defaults to None.
Returns:
None
"""
response = await self._arequest_with_retries(
"PATCH",
f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}",
json={
"name": name,
"description": description,
},
)
ls_utils.raise_for_status_with_text(response)

async def delete_annotation_queue(self, queue_id: ID_TYPE) -> None:
"""Delete an annotation queue with the specified queue ID.
Args:
queue_id (Union[UUID, str]): The ID of the annotation queue to delete.
Returns:
None
"""
response = await self._arequest_with_retries(
"DELETE",
f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}",
headers={"Accept": "application/json", **self._client.headers},
)
ls_utils.raise_for_status_with_text(response)

async def add_runs_to_annotation_queue(
self, queue_id: ID_TYPE, *, run_ids: List[ID_TYPE]
) -> None:
"""Add runs to an annotation queue with the specified queue ID.
Args:
queue_id (Union[UUID, str]): The ID of the annotation queue.
run_ids (List[Union[UUID, str]]): The IDs of the runs to be added to the annotation
queue.
Returns:
None
"""
response = await self._arequest_with_retries(
"POST",
f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/runs",
json=[
str(ls_client._as_uuid(id_, f"run_ids[{i}]"))
for i, id_ in enumerate(run_ids)
],
)
ls_utils.raise_for_status_with_text(response)

async def delete_run_from_annotation_queue(
self, queue_id: ID_TYPE, *, run_id: ID_TYPE
) -> None:
"""Delete a run from an annotation queue with the specified queue ID and run ID.
Args:
queue_id (Union[UUID, str]): The ID of the annotation queue.
run_id (Union[UUID, str]): The ID of the run to be added to the annotation
queue.
Returns:
None
"""
response = await self._arequest_with_retries(
"DELETE",
f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/runs/{ls_client._as_uuid(run_id, 'run_id')}",
)
ls_utils.raise_for_status_with_text(response)

async def get_run_from_annotation_queue(
self, queue_id: ID_TYPE, *, index: int
) -> ls_schemas.RunWithAnnotationQueueInfo:
"""Get a run from an annotation queue at the specified index.
Args:
queue_id (Union[UUID, str]): The ID of the annotation queue.
index (int): The index of the run to retrieve.
Returns:
RunWithAnnotationQueueInfo: The run at the specified index.
Raises:
LangSmithNotFoundError: If the run is not found at the given index.
LangSmithError: For other API-related errors.
"""
base_url = f"/annotation-queues/{ls_client._as_uuid(queue_id, 'queue_id')}/run"
response = await self._arequest_with_retries("GET", f"{base_url}/{index}")
ls_utils.raise_for_status_with_text(response)
return ls_schemas.RunWithAnnotationQueueInfo(**response.json())

@ls_beta.warn_beta
async def index_dataset(
self,
Expand Down
2 changes: 1 addition & 1 deletion python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5701,7 +5701,7 @@ def create_annotation_queue(
body = {
"name": name,
"description": description,
"id": queue_id or str(uuid.uuid4()),
"id": str(queue_id) if queue_id is not None else str(uuid.uuid4()),
}
response = self.request_with_retries(
"POST",
Expand Down
Loading

0 comments on commit 0c24fb3

Please sign in to comment.