feat(python-sdk): Add async API using httpx#242
feat(python-sdk): Add async API using httpx#242cmq2525 wants to merge 2 commits intovolcano-sh:mainfrom
Conversation
|
Welcome @cmq2525! It looks like this is your first PR to volcano-sh/agentcube 🎉 |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces fully asynchronous APIs to the Python SDK, enabling developers to interact with AgentRuntime and Code Interpreter services using Python's async/await pattern. It leverages httpx for efficient asynchronous HTTP requests and maintains API parity with existing synchronous clients. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-implemented feature: fully asynchronous clients for the AgentRuntime and CodeInterpreter services, using httpx. The API design maintains parity with the existing synchronous clients, which is great for developer experience. The use of async context managers and factory methods for client initialization is a correct and modern approach for handling async I/O in constructors. The code is generally of high quality, with good error handling and resource management.
I've identified a high-severity race condition in temporary file generation that could cause issues under concurrent use, and I've also suggested a couple of medium-severity improvements for robustness and documentation consistency. Once these points are addressed, this will be an excellent addition to the SDK.
sdk-python/agentcube/clients/async_code_interpreter_data_plane.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR adds first-class asynchronous APIs to the AgentCube Python SDK by introducing async client implementations backed by httpx.AsyncClient, along with tests and usage examples to support async/await workflows.
Changes:
- Added async SDK clients for Code Interpreter and Agent Runtime, plus async Control/Data Plane HTTP clients using
httpx. - Added async unit tests (using
unittest.IsolatedAsyncioTestCase) and new async usage examples. - Added
httpxas a dependency in bothrequirements.txtandpyproject.toml.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk-python/agentcube/utils/async_http.py | Adds a shared helper to create pooled httpx.AsyncClient instances. |
| sdk-python/agentcube/clients/async_control_plane.py | Implements async Control Plane client for session create/delete. |
| sdk-python/agentcube/clients/async_code_interpreter_data_plane.py | Implements async Code Interpreter Data Plane operations (commands/files). |
| sdk-python/agentcube/clients/async_agent_runtime_data_plane.py | Implements async Agent Runtime Data Plane bootstrap/invoke calls. |
| sdk-python/agentcube/async_code_interpreter.py | Introduces AsyncCodeInterpreterClient lifecycle + async APIs. |
| sdk-python/agentcube/async_agent_runtime.py | Introduces AsyncAgentRuntimeClient lifecycle + async invoke API. |
| sdk-python/agentcube/clients/init.py | Exports newly added async client implementations. |
| sdk-python/agentcube/init.py | Exposes async clients from the package top-level. |
| sdk-python/tests/test_async_code_interpreter.py | Adds async unit tests for Code Interpreter client session management. |
| sdk-python/tests/test_async_agent_runtime.py | Adds async unit tests for Agent Runtime client bootstrap/invoke behavior. |
| sdk-python/examples/async_basic_usage.py | Adds an async Code Interpreter usage example (incl. session reuse). |
| sdk-python/examples/async_agent_runtime_usage.py | Adds an async Agent Runtime usage example (incl. session reuse). |
| sdk-python/requirements.txt | Adds httpx dependency for async HTTP transport. |
| sdk-python/pyproject.toml | Adds httpx dependency for packaging/install parity. |
| timeout_str = ( | ||
| f"{timeout_value}s" if isinstance(timeout_value, (int, float)) else str(timeout_value) | ||
| ) | ||
|
|
||
| cmd_list = shlex.split(command, posix=True) if isinstance(command, str) else command | ||
|
|
||
| payload = {"command": cmd_list, "timeout": timeout_str} | ||
| body = json.dumps(payload).encode("utf-8") | ||
|
|
||
| # Add a buffer so httpx doesn't time out before PicoD returns the JSON response | ||
| read_timeout = ( | ||
| timeout_value + _TIMEOUT_BUFFER_SECONDS | ||
| if isinstance(timeout_value, (int, float)) | ||
| else timeout_value | ||
| ) |
There was a problem hiding this comment.
execute_command() builds timeout_str for the payload, but if a caller passes a non-numeric timeout (e.g., a string like "30s"), read_timeout will also become non-numeric and will later be passed into httpx.Timeout(...), which will raise a TypeError. Either validate timeout is numeric (and keep the type strictly Optional[float]) or add parsing so string timeouts work end-to-end.
| timeout_str = ( | |
| f"{timeout_value}s" if isinstance(timeout_value, (int, float)) else str(timeout_value) | |
| ) | |
| cmd_list = shlex.split(command, posix=True) if isinstance(command, str) else command | |
| payload = {"command": cmd_list, "timeout": timeout_str} | |
| body = json.dumps(payload).encode("utf-8") | |
| # Add a buffer so httpx doesn't time out before PicoD returns the JSON response | |
| read_timeout = ( | |
| timeout_value + _TIMEOUT_BUFFER_SECONDS | |
| if isinstance(timeout_value, (int, float)) | |
| else timeout_value | |
| ) | |
| # Ensure timeout is either numeric seconds or None so httpx.Timeout does not fail | |
| if timeout_value is not None and not isinstance(timeout_value, (int, float)): | |
| raise TypeError( | |
| f"timeout must be a number of seconds (int or float), got {type(timeout_value).__name__}" | |
| ) | |
| if timeout_value is not None: | |
| timeout_str = f"{timeout_value}s" | |
| else: | |
| # Preserve existing behavior: str(None) -> "None" | |
| timeout_str = str(timeout_value) | |
| cmd_list = shlex.split(command, posix=True) if isinstance(command, str) else command | |
| payload = {"command": cmd_list, "timeout": timeout_str} | |
| body = json.dumps(payload).encode("utf-8") | |
| # Add a buffer so httpx doesn't time out before PicoD returns the JSON response | |
| if timeout_value is not None: | |
| read_timeout = timeout_value + _TIMEOUT_BUFFER_SECONDS | |
| else: | |
| read_timeout = timeout_value |
| await client1.write_file("42", "value.txt") | ||
| session_id = client1.session_id | ||
| print(f"Session ID saved: {session_id}") | ||
| # Don't call stop() - let session persist |
There was a problem hiding this comment.
In the session reuse example, client1 = await AsyncCodeInterpreterClient.create(...) is intentionally not stopped so the remote session persists, but this also leaves the local httpx.AsyncClient instances open (no aclose()), which can produce ResourceWarnings and leak sockets. Consider updating the example to explicitly close local HTTP resources while keeping the session alive (e.g., provide a public close() that only closes transports, or demonstrate the supported way to do this without reaching into internal attributes).
| # Don't call stop() - let session persist | |
| # Close local HTTP resources but keep the remote session alive | |
| await client1.close() |
There was a problem hiding this comment.
CodeInterpreterClient has the same problem. It's not in scope for this pr though—let's open a separate issue to track it.
| as an async context manager:: | ||
|
|
||
| # Async context manager (recommended) | ||
| async with AsyncCodeInterpreterClient.create(...) as client: |
There was a problem hiding this comment.
The class docstring example uses async with AsyncCodeInterpreterClient.create(...) as client:, but create() is an async factory that must be awaited and it returns an AsyncCodeInterpreterClient (not an async context manager). As written, this example will fail and it also encourages an init pattern that can lead to double-initialization; update the example to use async with AsyncCodeInterpreterClient(...) as client: (or client = await ...create() with an explicit try/finally).
| async with AsyncCodeInterpreterClient.create(...) as client: | |
| async with AsyncCodeInterpreterClient(...) as client: |
| """Store configuration; does *not* create a session. | ||
|
|
||
| Call ``await AsyncCodeInterpreterClient.create(...)`` instead of | ||
| constructing this class directly. | ||
| """ |
There was a problem hiding this comment.
__init__ states callers should not construct this class directly, but the intended/working usage for the async context manager is async with AsyncCodeInterpreterClient(...) as client: (as shown in the PR description and examples). Consider adjusting this docstring guidance so it doesn't contradict the supported context-manager construction pattern.
| async def __aenter__(self) -> "AsyncCodeInterpreterClient": | ||
| await self._async_init() | ||
| return self |
There was a problem hiding this comment.
__aenter__ always calls _async_init(). If a user creates a client via await AsyncCodeInterpreterClient.create(...) and then uses it in an async with block (which the current class docstring implies), _async_init() will run twice and create a second dp_client without closing the first one, leaking the first httpx.AsyncClient. Make initialization idempotent (e.g., no-op if dp_client is already set / already initialized) and/or ensure the docs do not suggest create() + context manager together.
| instance = cls( | ||
| name=name, | ||
| namespace=namespace, | ||
| ttl=ttl, | ||
| workload_manager_url=workload_manager_url, | ||
| router_url=router_url, | ||
| auth_token=auth_token, | ||
| verbose=verbose, | ||
| session_id=session_id, | ||
| ) | ||
| await instance._async_init() | ||
| return instance |
There was a problem hiding this comment.
If _async_init() raises (e.g., create_session fails, or _init_data_plane raises), the underlying AsyncControlPlaneClient's httpx.AsyncClient is never closed. This can leak sockets/file descriptors in error paths. Wrap the _async_init() call in create() (and/or in _async_init() itself) with a try/except/finally that closes cp_client when initialization fails.
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #242 +/- ##
==========================================
+ Coverage 35.60% 43.35% +7.74%
==========================================
Files 29 30 +1
Lines 2533 2611 +78
==========================================
+ Hits 902 1132 +230
+ Misses 1505 1358 -147
+ Partials 126 121 -5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Signed-off-by: cmq2525 <[email protected]>
Signed-off-by: cmq2525 <[email protected]>
a65f1ab to
b132b38
Compare
| try: | ||
| self._init_data_plane() | ||
| except Exception: | ||
| self.logger.warning( | ||
| f"Failed to initialize data plane client, " | ||
| f"deleting session {self.session_id} to prevent resource leak" | ||
| ) | ||
| await self.cp_client.delete_session(self.session_id) | ||
| self.session_id = None | ||
| raise |
| connector_limit_per_host: int = 10, | ||
| ) -> httpx.AsyncClient: | ||
| """Create an httpx AsyncClient with connection pooling. | ||
|
|
||
| Args: | ||
| connector_limit: Total number of simultaneous connections (default: 10). | ||
| connector_limit_per_host: Max keepalive connections per host (default: 10). | ||
|
|
||
| Returns: | ||
| A configured httpx.AsyncClient with connection limits. | ||
| """ | ||
| limits = httpx.Limits( | ||
| max_connections=connector_limit, | ||
| max_keepalive_connections=connector_limit_per_host, |
hzxuzhonghu
left a comment
There was a problem hiding this comment.
Posting a couple of correctness issues I found while reviewing the new async SDK path.
| self.dp_client.logger.setLevel(logging.DEBUG) | ||
|
|
||
| async def __aenter__(self) -> "AsyncCodeInterpreterClient": | ||
| await self._async_init() |
There was a problem hiding this comment.
create() already calls _async_init(), so async with (await AsyncCodeInterpreterClient.create(...)) as client: will enter here with an initialized instance and run _async_init() a second time. That replaces dp_client without closing the first httpx.AsyncClient, which leaks the original connection pool. It would be safer for __aenter__ to skip re-initialization when session_id/dp_client are already set, or to make create() and context-manager usage mutually exclusive.
| as an async context manager:: | ||
|
|
||
| # Async context manager (recommended) | ||
| async with AsyncCodeInterpreterClient.create(...) as client: |
There was a problem hiding this comment.
This example does not work as written: AsyncCodeInterpreterClient.create(...) returns a coroutine, so async with AsyncCodeInterpreterClient.create(...) as client: raises before entering the context manager. The example needs either client = await AsyncCodeInterpreterClient.create(...); ... or async with AsyncCodeInterpreterClient(...) as client:.
Summary
This PR introduces fully asynchronous APIs to the Python SDK, allowing developers to interact with the AgentRuntime and Code Interpreter using the async/await pattern.
Key Changes
API Variability & Demo
The only notable difference is in object initialization:
Because Python's init cannot be asynchronous, performing blocking network I/O in the constructor would block the event loop.
Therefore, the async clients utilize async context managers or async factory methods for initialization.
Comparison Demo: