diff --git a/clients/python/agentic-sandbox-client/README.md b/clients/python/agentic-sandbox-client/README.md index c55b44023..7d7931177 100644 --- a/clients/python/agentic-sandbox-client/README.md +++ b/clients/python/agentic-sandbox-client/README.md @@ -189,6 +189,40 @@ client = SandboxClient( sandbox = client.create_sandbox(template="node-sandbox-template", namespace="default"). ``` +### 5. Async Client + +For async applications (FastAPI, aiohttp, async agent orchestrators), use the `AsyncSandboxClient`. +Install the async extras first: + +```bash +pip install k8s-agent-sandbox[async] +``` + +The async client requires an explicit connection config — `LocalTunnel` mode is not supported +because it relies on a synchronous `kubectl port-forward` subprocess. Use `DirectConnection` or +`GatewayConnection` instead. + +```python +import asyncio +from k8s_agent_sandbox import AsyncSandboxClient +from k8s_agent_sandbox.models import SandboxDirectConnectionConfig + +async def main(): + config = SandboxDirectConnectionConfig( + api_url="http://sandbox-router-svc.default.svc.cluster.local:8080" + ) + + async with AsyncSandboxClient(connection_config=config) as client: + sandbox = await client.create_sandbox( + template="python-sandbox-template", + namespace="default", + ) + result = await sandbox.commands.run("echo 'Hello from async!'") + print(result.stdout) + +asyncio.run(main()) +``` + ## Testing A test script is included to verify the full lifecycle (Creation -> Execution -> File I/O -> Cleanup). diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/__init__.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/__init__.py index 3b4227458..98cbc231a 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/__init__.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/__init__.py @@ -20,3 +20,15 @@ SandboxPortForwardError, SandboxRequestError, ) + + +try: + from .async_sandbox_client import AsyncSandboxClient +except ImportError: + class AsyncSandboxClient: # type: ignore[no-redef] + """Placeholder that raises ImportError when async extras are missing.""" + def __init__(self, *args, **kwargs): + raise ImportError( + "AsyncSandboxClient requires the 'async' extras. " + "Install with: pip install k8s-agent-sandbox[async]" + ) diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_connector.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_connector.py new file mode 100644 index 000000000..f4eebefe4 --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_connector.py @@ -0,0 +1,141 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging + +import httpx + +logger = logging.getLogger(__name__) + +from .async_k8s_helper import AsyncK8sHelper +from .exceptions import SandboxRequestError +from .models import ( + SandboxConnectionConfig, + SandboxDirectConnectionConfig, + SandboxGatewayConnectionConfig, + SandboxLocalTunnelConnectionConfig, +) + +RETRYABLE_STATUS_CODES = {500, 502, 503, 504} +MAX_RETRIES = 5 +BACKOFF_FACTOR = 0.5 + + +class AsyncSandboxConnector: + """ + Async connector for communicating with a Sandbox over HTTP using httpx. + + Supports DirectConnection and GatewayConnection modes. LocalTunnel mode + is not supported because it relies on a long-running subprocess; use the + sync SandboxConnector for local development. + """ + + def __init__( + self, + sandbox_id: str, + namespace: str, + connection_config: SandboxConnectionConfig, + k8s_helper: AsyncK8sHelper, + ): + if isinstance(connection_config, SandboxLocalTunnelConnectionConfig): + raise ValueError( + "AsyncSandboxConnector does not support SandboxLocalTunnelConnectionConfig. " + "Use SandboxDirectConnectionConfig or SandboxGatewayConnectionConfig instead. " + "For local development, use the synchronous SandboxClient." + ) + + self.id = sandbox_id + self.namespace = namespace + self.connection_config = connection_config + self.k8s_helper = k8s_helper + + self._base_url: str | None = None + transport = httpx.AsyncHTTPTransport(retries=3) + self.client = httpx.AsyncClient( + transport=transport, timeout=httpx.Timeout(60.0) + ) + + async def _resolve_base_url(self) -> str: + if self._base_url: + return self._base_url + + if isinstance(self.connection_config, SandboxDirectConnectionConfig): + self._base_url = self.connection_config.api_url + elif isinstance(self.connection_config, SandboxGatewayConnectionConfig): + ip_address = await self.k8s_helper.wait_for_gateway_ip( + self.connection_config.gateway_name, + self.connection_config.gateway_namespace, + self.connection_config.gateway_ready_timeout, + ) + self._base_url = f"http://{ip_address}" + else: + raise ValueError( + f"AsyncSandboxConnector does not support {type(self.connection_config).__name__}." + ) + + return self._base_url + + async def send_request(self, method: str, endpoint: str, **kwargs) -> httpx.Response: + base_url = await self._resolve_base_url() + url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" + + headers = kwargs.pop("headers", {}).copy() + headers["X-Sandbox-ID"] = self.id + headers["X-Sandbox-Namespace"] = self.namespace + headers["X-Sandbox-Port"] = str(self.connection_config.server_port) + + last_response: httpx.Response | None = None + for attempt in range(MAX_RETRIES + 1): + try: + response = await self.client.request( + method, url, headers=headers, **kwargs + ) + if response.status_code in RETRYABLE_STATUS_CODES and attempt < MAX_RETRIES: + delay = BACKOFF_FACTOR * (2 ** attempt) + logger.warning( + f"Retryable status {response.status_code} from {url}, " + f"attempt {attempt + 1}/{MAX_RETRIES + 1}, retrying in {delay:.1f}s" + ) + last_response = response + await asyncio.sleep(delay) + continue + response.raise_for_status() + return response + except httpx.HTTPStatusError as e: + logger.error(f"Request to sandbox failed: {e}") + raise SandboxRequestError( + f"Failed to communicate with the sandbox at {url}.", + status_code=e.response.status_code, + response=e.response, + ) from e + except httpx.HTTPError as e: + logger.error(f"Request to sandbox failed: {e}") + self._base_url = None + raise SandboxRequestError( + f"Failed to communicate with the sandbox at {url}.", + status_code=None, + response=None, + ) from e + + logger.error(f"All {MAX_RETRIES + 1} attempts failed for {url}") + raise SandboxRequestError( + f"Failed to communicate with the sandbox at {url} after {MAX_RETRIES + 1} attempts.", + status_code=last_response.status_code if last_response else None, + response=last_response, + ) + + async def close(self): + await self.client.aclose() + self._base_url = None diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_k8s_helper.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_k8s_helper.py new file mode 100644 index 000000000..6f26a4836 --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_k8s_helper.py @@ -0,0 +1,278 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import time + +from kubernetes_asyncio import client, config, watch + +logger = logging.getLogger(__name__) + +from .constants import ( + CLAIM_API_GROUP, + CLAIM_API_VERSION, + CLAIM_PLURAL_NAME, + GATEWAY_API_GROUP, + GATEWAY_API_VERSION, + GATEWAY_PLURAL, + SANDBOX_API_GROUP, + SANDBOX_API_VERSION, + SANDBOX_PLURAL_NAME, +) +from .exceptions import SandboxMetadataError, SandboxNotFoundError + + +class AsyncK8sHelper: + """Async helper class for Kubernetes API interactions using kubernetes_asyncio.""" + + def __init__(self): + self._initialized = False + self._init_lock = asyncio.Lock() + self._api_client: client.ApiClient | None = None + + async def _ensure_initialized(self): + if self._initialized: + return + async with self._init_lock: + if self._initialized: + return + try: + config.load_incluster_config() + except config.ConfigException: + await config.load_kube_config() + self._api_client = client.ApiClient() + self.custom_objects_api = client.CustomObjectsApi(self._api_client) + self.core_v1_api = client.CoreV1Api(self._api_client) + self._initialized = True + + async def create_sandbox_claim( + self, + name: str, + template: str, + namespace: str, + annotations: dict | None = None, + labels: dict | None = None, + ): + """Creates a SandboxClaim custom resource.""" + await self._ensure_initialized() + + metadata = { + "name": name, + "annotations": annotations or {}, + } + if labels: + metadata["labels"] = labels + + manifest = { + "apiVersion": f"{CLAIM_API_GROUP}/{CLAIM_API_VERSION}", + "kind": "SandboxClaim", + "metadata": metadata, + "spec": { + "sandboxTemplateRef": { + "name": template, + } + }, + } + logger.info( + f"Creating SandboxClaim '{name}' in namespace '{namespace}' using template '{template}'..." + ) + await self.custom_objects_api.create_namespaced_custom_object( + group=CLAIM_API_GROUP, + version=CLAIM_API_VERSION, + namespace=namespace, + plural=CLAIM_PLURAL_NAME, + body=manifest, + ) + + async def resolve_sandbox_name(self, claim_name: str, namespace: str, timeout: int) -> str: + """Resolves the actual Sandbox name from the SandboxClaim status. + With warm pool adoption, the sandbox name may differ from the claim + name. This method watches the SandboxClaim until the sandbox name + appears in the claim's status, then returns it. + """ + await self._ensure_initialized() + + deadline = time.monotonic() + timeout + logger.info(f"Resolving sandbox name from claim '{claim_name}'...") + while True: + remaining = int(deadline - time.monotonic()) + if remaining <= 0: + raise TimeoutError( + f"Could not resolve sandbox name from claim " + f"'{claim_name}' within {timeout} seconds." + ) + w = watch.Watch() + try: + async for event in w.stream( + func=self.custom_objects_api.list_namespaced_custom_object, + namespace=namespace, + group=CLAIM_API_GROUP, + version=CLAIM_API_VERSION, + plural=CLAIM_PLURAL_NAME, + field_selector=f"metadata.name={claim_name}", + timeout_seconds=remaining, + ): + if event is None: + continue + if event["type"] == "DELETED": + raise SandboxMetadataError( + f"SandboxClaim '{claim_name}' was deleted while resolving sandbox name" + ) + if event["type"] in ["ADDED", "MODIFIED"]: + claim_object = event["object"] + sandbox_status = claim_object.get("status", {}).get("sandbox", {}) + # Support both 'name' (standard) and 'Name' (legacy, before CRD rename in #440) + name = sandbox_status.get("name", "") or sandbox_status.get("Name", "") + if name: + logger.info(f"Resolved sandbox name '{name}' from claim status") + return name + finally: + await w.close() + + async def wait_for_sandbox_ready(self, name: str, namespace: str, timeout: int): + """Waits for the Sandbox custom resource to have a 'Ready' status.""" + await self._ensure_initialized() + + deadline = time.monotonic() + timeout + logger.info(f"Watching for Sandbox {name} to become ready...") + while True: + remaining = int(deadline - time.monotonic()) + if remaining <= 0: + raise TimeoutError(f"Sandbox {name} did not become ready within {timeout} seconds.") + w = watch.Watch() + try: + async for event in w.stream( + func=self.custom_objects_api.list_namespaced_custom_object, + namespace=namespace, + group=SANDBOX_API_GROUP, + version=SANDBOX_API_VERSION, + plural=SANDBOX_PLURAL_NAME, + field_selector=f"metadata.name={name}", + timeout_seconds=remaining, + ): + if event is None: + continue + if event["type"] in ["ADDED", "MODIFIED"]: + sandbox_object = event["object"] + status = sandbox_object.get("status", {}) + conditions = status.get("conditions", []) + for cond in conditions: + if cond.get("type") == "Ready" and cond.get("status") == "True": + logger.info(f"Sandbox {name} is ready.") + return + elif event["type"] == "DELETED": + logger.error(f"Sandbox {name} was deleted before becoming ready.") + raise SandboxNotFoundError( + f"Sandbox {name} was deleted before becoming ready." + ) + finally: + await w.close() + + async def delete_sandbox_claim(self, name: str, namespace: str): + """Deletes a SandboxClaim custom resource.""" + await self._ensure_initialized() + + try: + await self.custom_objects_api.delete_namespaced_custom_object( + group=CLAIM_API_GROUP, + version=CLAIM_API_VERSION, + namespace=namespace, + plural=CLAIM_PLURAL_NAME, + name=name, + ) + logger.info(f"Terminated SandboxClaim: {name}") + except client.ApiException as e: + if e.status != 404: + logger.error(f"Error terminating sandbox {name}: {e}") + raise + + async def get_sandbox(self, name: str, namespace: str): + """Gets a Sandbox custom resource.""" + await self._ensure_initialized() + + try: + return await self.custom_objects_api.get_namespaced_custom_object( + group=SANDBOX_API_GROUP, + version=SANDBOX_API_VERSION, + namespace=namespace, + plural=SANDBOX_PLURAL_NAME, + name=name, + ) + except client.ApiException as e: + if e.status == 404: + return None + raise + + async def list_sandbox_claims(self, namespace: str) -> list[str]: + """Lists all SandboxClaim custom resources in a namespace.""" + await self._ensure_initialized() + + try: + response = await self.custom_objects_api.list_namespaced_custom_object( + group=CLAIM_API_GROUP, + version=CLAIM_API_VERSION, + namespace=namespace, + plural=CLAIM_PLURAL_NAME, + ) + return [ + item.get("metadata", {}).get("name") + for item in response.get("items", []) + if item.get("metadata", {}).get("name") + ] + except client.ApiException as e: + logger.error(f"Error listing sandbox claims in namespace {namespace}: {e}") + raise + + async def wait_for_gateway_ip(self, gateway_name: str, namespace: str, timeout: int) -> str: + """Waits for the Gateway to be assigned an external IP.""" + await self._ensure_initialized() + + deadline = time.monotonic() + timeout + logger.info(f"Waiting for Gateway '{gateway_name}' in namespace '{namespace}'...") + while True: + remaining = int(deadline - time.monotonic()) + if remaining <= 0: + raise TimeoutError(f"Gateway '{gateway_name}' did not get an IP.") + w = watch.Watch() + try: + async for event in w.stream( + func=self.custom_objects_api.list_namespaced_custom_object, + namespace=namespace, + group=GATEWAY_API_GROUP, + version=GATEWAY_API_VERSION, + plural=GATEWAY_PLURAL, + field_selector=f"metadata.name={gateway_name}", + timeout_seconds=remaining, + ): + if event is None: + continue + if event["type"] in ["ADDED", "MODIFIED"]: + gateway_object = event["object"] + status = gateway_object.get("status", {}) + addresses = status.get("addresses", []) + if addresses: + ip_address = addresses[0].get("value") + if ip_address: + logger.info(f"Gateway ready. IP: {ip_address}") + return ip_address + finally: + await w.close() + + async def close(self): + """Closes the shared Kubernetes API client session.""" + if self._api_client: + await self._api_client.close() + self._api_client = None + self._initialized = False diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_sandbox.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_sandbox.py new file mode 100644 index 000000000..c7b1a0df8 --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_sandbox.py @@ -0,0 +1,129 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from .async_connector import AsyncSandboxConnector +from .async_k8s_helper import AsyncK8sHelper +from .commands.async_command_executor import AsyncCommandExecutor +from .constants import POD_NAME_ANNOTATION +from .files.async_filesystem import AsyncFilesystem +from .models import SandboxConnectionConfig, SandboxTracerConfig +from .trace_manager import create_tracer_manager + + +class AsyncSandbox: + """ + Represents an async connection to a specific running Sandbox instance. + + This class provides the async interface for interacting with the Sandbox: + - Executing commands via the ``commands`` property. + - Managing files via the ``files`` property. + - Handling the underlying connection lifecycle. + - Integrating with OpenTelemetry for tracing operations. + + Unlike the sync ``Sandbox``, ``connection_config`` is required because the + async client does not support ``SandboxLocalTunnelConnectionConfig``. + """ + + def __init__( + self, + claim_name: str, + sandbox_id: str, + namespace: str = "default", + connection_config: SandboxConnectionConfig | None = None, + tracer_config: SandboxTracerConfig | None = None, + k8s_helper: AsyncK8sHelper | None = None, + ): + if connection_config is None: + raise ValueError( + "connection_config is required for AsyncSandbox. " + "Use SandboxDirectConnectionConfig or SandboxGatewayConnectionConfig." + ) + + self.claim_name = claim_name + self.sandbox_id = sandbox_id + self.namespace = namespace + self.connection_config = connection_config + + self.k8s_helper = k8s_helper or AsyncK8sHelper() + + self.connector = AsyncSandboxConnector( + sandbox_id=self.sandbox_id, + namespace=self.namespace, + connection_config=self.connection_config, + k8s_helper=self.k8s_helper, + ) + + self.tracer_config = tracer_config or SandboxTracerConfig() + self.trace_service_name = self.tracer_config.trace_service_name + self.tracing_manager, self.tracer = create_tracer_manager(self.tracer_config) + + self._commands = AsyncCommandExecutor( + self.connector, self.tracer, self.trace_service_name + ) + self._files = AsyncFilesystem( + self.connector, self.tracer, self.trace_service_name + ) + + self._is_closed = False + self._pod_name = None + + async def get_pod_name(self) -> str: + """Fetches the Sandbox object from Kubernetes and retrieves its current pod name.""" + if self._pod_name is not None: + return self._pod_name + + sandbox_object = await self.k8s_helper.get_sandbox(self.sandbox_id, self.namespace) or {} + metadata = sandbox_object.get("metadata") or {} + annotations = metadata.get("annotations") or {} + pod_name = annotations.get(POD_NAME_ANNOTATION) + self._pod_name = pod_name if pod_name is not None else self.sandbox_id + return self._pod_name + + @property + def commands(self) -> AsyncCommandExecutor | None: + return self._commands + + @property + def files(self) -> AsyncFilesystem | None: + return self._files + + @property + def is_active(self) -> bool: + return not self._is_closed and self._commands is not None and self._files is not None + + async def _close_connection(self): + """Closes the client-side connection and disables execution engines.""" + if self._is_closed: + return + + await self.connector.close() + + self._commands = None + self._files = None + + if self.tracing_manager: + try: + self.tracing_manager.end_lifecycle_span() + except Exception as e: + logging.error(f"Failed to end tracing span: {e}") + + self._is_closed = True + logging.info(f"Connection to sandbox claim '{self.claim_name}' has been closed.") + + async def terminate(self): + """Permanent deletion of all server side infrastructure and client side connection.""" + await self._close_connection() + await self.k8s_helper.delete_sandbox_claim(self.claim_name, self.namespace) diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_sandbox_client.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_sandbox_client.py new file mode 100644 index 000000000..0056b323e --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/async_sandbox_client.py @@ -0,0 +1,327 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Async version of :class:`SandboxClient` for use in async applications. + +Requires the ``async`` optional dependencies:: + + pip install k8s-agent-sandbox[async] +""" + +import asyncio +import logging +import re +import time +import uuid +from typing import Generic, TypeVar + +from .async_k8s_helper import AsyncK8sHelper +from .async_sandbox import AsyncSandbox +from .exceptions import SandboxNotFoundError +from .models import SandboxConnectionConfig, SandboxTracerConfig +from .trace_manager import async_trace_span, create_tracer_manager, initialize_tracer, trace + +logger = logging.getLogger(__name__) + +T = TypeVar("T", bound=AsyncSandbox) + + +class AsyncSandboxClient(Generic[T]): + """ + Async registry-based client for managing Sandbox lifecycles. + + Use as an async context manager for automatic cleanup:: + + async with AsyncSandboxClient(connection_config=config) as client: + sandbox = await client.create_sandbox("template") + result = await sandbox.commands.run("echo hello") + + ``connection_config`` is required — the async client does not support + ``SandboxLocalTunnelConnectionConfig``. + + Unlike the sync ``SandboxClient``, there is no ``atexit`` fallback because + async cleanup cannot run in an atexit handler. Use the ``async with`` + context manager or explicitly call ``await client.delete_all()`` followed + by ``await client.close()`` to avoid orphaned claims. + """ + + sandbox_class: type[T] = AsyncSandbox # type: ignore + + def __init__( + self, + connection_config: SandboxConnectionConfig | None = None, + tracer_config: SandboxTracerConfig | None = None, + ): + if connection_config is None: + raise ValueError( + "connection_config is required for AsyncSandboxClient. " + "Use SandboxDirectConnectionConfig or SandboxGatewayConnectionConfig. " + "For local development with kubectl port-forward, use the synchronous SandboxClient." + ) + + self.connection_config = connection_config + + self.tracer_config = tracer_config or SandboxTracerConfig() + if self.tracer_config.enable_tracing: + initialize_tracer(self.tracer_config.trace_service_name) + self.tracing_manager, self.tracer = create_tracer_manager(self.tracer_config) + + self.k8s_helper = AsyncK8sHelper() + + self._active_connection_sandboxes: dict[tuple[str, str], T] = {} + self._lock = asyncio.Lock() + + async def __aenter__(self) -> "AsyncSandboxClient[T]": + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + try: + await self.delete_all() + finally: + await self.close() + + async def close(self): + """Shuts down all tracked sandbox connections and the K8s API client.""" + async with self._lock: + for sandbox in self._active_connection_sandboxes.values(): + try: + await sandbox._close_connection() + except Exception as e: + logger.error(f"Failed to close sandbox connection: {e}") + self._active_connection_sandboxes.clear() + await self.k8s_helper.close() + + async def create_sandbox( + self, + template: str, + namespace: str = "default", + sandbox_ready_timeout: int = 180, + labels: dict[str, str] | None = None, + ) -> T: + """Provisions a new Sandbox claim and returns an async Sandbox handle. + + Example:: + + async with AsyncSandboxClient(connection_config=config) as client: + sandbox = await client.create_sandbox("python-sandbox-template") + result = await sandbox.commands.run("echo 'Hello'") + """ + if not template: + raise ValueError("Template name cannot be empty.") + + if labels: + self._validate_labels(labels) + + claim_name = f"sandbox-claim-{uuid.uuid4().hex[:8]}" + + try: + await self._create_claim(claim_name, template, namespace, labels=labels) + start_time = time.monotonic() + sandbox_id = await self.k8s_helper.resolve_sandbox_name( + claim_name, namespace, sandbox_ready_timeout + ) + elapsed_time = time.monotonic() - start_time + remaining_timeout = max(0, int(sandbox_ready_timeout - elapsed_time)) + if remaining_timeout <= 0: + raise TimeoutError("Sandbox resolution exceeded the ready timeout.") + await self._wait_for_sandbox_ready(sandbox_id, namespace, remaining_timeout) + + sandbox = self.sandbox_class( + claim_name=claim_name, + sandbox_id=sandbox_id, + namespace=namespace, + connection_config=self.connection_config, + tracer_config=self.tracer_config, + k8s_helper=self.k8s_helper, + ) + except (Exception, asyncio.CancelledError): + await asyncio.shield(self._delete_claim(claim_name, namespace)) + raise + + async with self._lock: + self._active_connection_sandboxes[(namespace, claim_name)] = sandbox + return sandbox + + async def get_sandbox( + self, claim_name: str, namespace: str = "default", resolve_timeout: int = 30 + ) -> T: + """Retrieves an existing sandbox handle given a sandbox claim name. + + Example:: + + sandbox = await client.get_sandbox("sandbox-claim-1234abcd") + result = await sandbox.commands.run("ls -la") + """ + key = (namespace, claim_name) + + async with self._lock: + existing = self._active_connection_sandboxes.get(key) + + try: + sandbox_id = await self.k8s_helper.resolve_sandbox_name( + claim_name, namespace, timeout=resolve_timeout + ) + sandbox_object = await self.k8s_helper.get_sandbox(sandbox_id, namespace) + if not sandbox_object: + raise SandboxNotFoundError(f"Underlying Sandbox '{sandbox_id}' not found.") + except Exception as e: + if existing: + await existing.terminate() + async with self._lock: + self._active_connection_sandboxes.pop(key, None) + raise SandboxNotFoundError( + f"Sandbox claim '{claim_name}' not found or resolution failed " + f"in namespace '{namespace}': {e}" + ) from e + + if existing and existing.is_active: + return existing + + if existing: + async with self._lock: + self._active_connection_sandboxes.pop(key, None) + + new_handle = self.sandbox_class( + claim_name=claim_name, + sandbox_id=sandbox_id, + namespace=namespace, + connection_config=self.connection_config, + tracer_config=self.tracer_config, + k8s_helper=self.k8s_helper, + ) + + async with self._lock: + self._active_connection_sandboxes[key] = new_handle + return new_handle + + async def list_active_sandboxes(self) -> list[tuple[str, str]]: + """Returns a list of ``(namespace, claim_name)`` tuples currently managed.""" + async with self._lock: + for key, obj in list(self._active_connection_sandboxes.items()): + if not obj.is_active: + self._active_connection_sandboxes.pop(key, None) + return list(self._active_connection_sandboxes.keys()) + + async def list_all_sandboxes(self, namespace: str = "default") -> list[str]: + """Lists all SandboxClaim names in the Kubernetes cluster for a namespace.""" + return await self.k8s_helper.list_sandbox_claims(namespace) + + async def delete_sandbox(self, claim_name: str, namespace: str = "default"): + """Stops the client side connection and deletes the Kubernetes resources.""" + key = (namespace, claim_name) + async with self._lock: + sandbox = self._active_connection_sandboxes.get(key) + try: + if sandbox: + await sandbox.terminate() + async with self._lock: + self._active_connection_sandboxes.pop(key, None) + else: + await self._delete_claim(claim_name, namespace) + except Exception as e: + logger.error( + f"Failed to delete sandbox '{claim_name}' in namespace '{namespace}': {e}" + ) + + async def delete_all(self): + """Cleanup all tracked sandboxes managed by this client.""" + async with self._lock: + items = list(self._active_connection_sandboxes.items()) + + for (ns, claim_name), _ in items: + try: + await self.delete_sandbox(claim_name, namespace=ns) + except Exception as e: + logger.error(f"Cleanup failed for {claim_name} in namespace {ns}: {e}") + + # --- Label validation (shared with sync client) --- + + _LABEL_NAME_RE = re.compile(r"^[A-Za-z0-9][-A-Za-z0-9_.]*[A-Za-z0-9]$|^[A-Za-z0-9]$") + _LABEL_PREFIX_RE = re.compile(r"^[a-z0-9]([-a-z0-9.]*[a-z0-9])?$") + _LABEL_NAME_MAX_LENGTH = 63 + _LABEL_PREFIX_MAX_LENGTH = 253 + + @staticmethod + def _validate_label_name(name: str, context: str): + if len(name) > AsyncSandboxClient._LABEL_NAME_MAX_LENGTH: + raise ValueError( + f"Label {context} '{name}' exceeds max length of " + f"{AsyncSandboxClient._LABEL_NAME_MAX_LENGTH} characters." + ) + if not AsyncSandboxClient._LABEL_NAME_RE.match(name): + raise ValueError( + f"Label {context} '{name}' contains invalid characters. " + f"Must start and end with alphanumeric, and contain only [-A-Za-z0-9_.]." + ) + + @staticmethod + def _validate_labels(labels: dict[str, str]): + for key, value in labels.items(): + if not key: + raise ValueError("Label key cannot be empty.") + + if "/" in key: + prefix, name = key.split("/", 1) + if not prefix or len(prefix) > AsyncSandboxClient._LABEL_PREFIX_MAX_LENGTH: + raise ValueError( + f"Label key prefix '{prefix}' is invalid or exceeds " + f"{AsyncSandboxClient._LABEL_PREFIX_MAX_LENGTH} characters." + ) + if not AsyncSandboxClient._LABEL_PREFIX_RE.match(prefix): + raise ValueError( + f"Label key prefix '{prefix}' must be a valid DNS subdomain." + ) + if not name: + raise ValueError(f"Label key '{key}' has an empty name after prefix.") + AsyncSandboxClient._validate_label_name(name, f"key name in '{key}'") + else: + AsyncSandboxClient._validate_label_name(key, f"key '{key}'") + + if value: + AsyncSandboxClient._validate_label_name( + value, f"value '{value}' for key '{key}'" + ) + + @async_trace_span("create_claim") + async def _create_claim( + self, + claim_name: str, + template_name: str, + namespace: str, + labels: dict[str, str] | None = None, + ): + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("sandbox.claim.name", claim_name) + + annotations = {} + if self.tracing_manager: + trace_context_str = self.tracing_manager.get_trace_context_json() + if trace_context_str: + annotations["opentelemetry.io/trace-context"] = trace_context_str + + await self.k8s_helper.create_sandbox_claim( + claim_name, template_name, namespace, annotations=annotations, labels=labels + ) + + @async_trace_span("wait_for_sandbox_ready") + async def _wait_for_sandbox_ready(self, sandbox_id: str, namespace: str, timeout: int): + await self.k8s_helper.wait_for_sandbox_ready(sandbox_id, namespace, timeout) + + @async_trace_span("delete_claim") + async def _delete_claim(self, claim_name: str, namespace: str): + try: + await self.k8s_helper.delete_sandbox_claim(claim_name, namespace) + except Exception as e: + logger.error(f"Failed to cleanup SandboxClaim '{claim_name}': {e}") diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/commands/async_command_executor.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/commands/async_command_executor.py new file mode 100644 index 000000000..8311b9ed4 --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/commands/async_command_executor.py @@ -0,0 +1,56 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from k8s_agent_sandbox.async_connector import AsyncSandboxConnector +from k8s_agent_sandbox.models import ExecutionResult +from k8s_agent_sandbox.trace_manager import async_trace_span, trace + + +class AsyncCommandExecutor: + """ + Handles async execution of commands within the sandbox. + """ + + def __init__(self, connector: AsyncSandboxConnector, tracer, trace_service_name: str): + self.connector = connector + self.tracer = tracer + self.trace_service_name = trace_service_name + + @async_trace_span("run") + async def run(self, command: str, timeout: int = 60) -> ExecutionResult: + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("sandbox.command", command) + + payload = {"command": command} + response = await self.connector.send_request( + "POST", "execute", json=payload, timeout=timeout + ) + + try: + response_data = response.json() + except ValueError as e: + raise RuntimeError( + f"Failed to decode JSON response from sandbox: {response.text}" + ) from e + try: + result = ExecutionResult(**response_data) + except Exception as e: + raise RuntimeError( + f"Server returned invalid execution result format: {response_data}" + ) from e + + if span.is_recording(): + span.set_attribute("sandbox.exit_code", result.exit_code) + return result diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/exceptions.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/exceptions.py index 9990ffb81..6c8f5b93d 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/exceptions.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/exceptions.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import requests +from typing import Any class SandboxError(RuntimeError): @@ -40,14 +40,15 @@ class SandboxRequestError(SandboxError): Attributes: status_code: The HTTP status code, if available. - response: The raw ``requests.Response``, if available. + response: The raw response object (``requests.Response`` or + ``httpx.Response``), if available. """ def __init__( self, message: str, status_code: int | None = None, - response: requests.Response | None = None, + response: Any = None, ): super().__init__(message) self.status_code = status_code diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/files/async_filesystem.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/files/async_filesystem.py new file mode 100644 index 000000000..12b0ca022 --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/files/async_filesystem.py @@ -0,0 +1,119 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import urllib.parse + +from k8s_agent_sandbox.async_connector import AsyncSandboxConnector +from k8s_agent_sandbox.models import FileEntry +from k8s_agent_sandbox.trace_manager import async_trace_span, trace + + +class AsyncFilesystem: + """ + Handles async file operations within the sandbox. + """ + + def __init__(self, connector: AsyncSandboxConnector, tracer, trace_service_name: str): + self.connector = connector + self.tracer = tracer + self.trace_service_name = trace_service_name + + @async_trace_span("write") + async def write(self, path: str, content: bytes | str, timeout: int = 60): + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("sandbox.file.path", path) + span.set_attribute("sandbox.file.size", len(content)) + + if isinstance(content, str): + content = content.encode("utf-8") + + filename = os.path.basename(path) + files_payload = {"file": (filename, content)} + await self.connector.send_request( + "POST", "upload", files=files_payload, timeout=timeout + ) + logging.info(f"File '{filename}' uploaded successfully.") + + @async_trace_span("read") + async def read(self, path: str, timeout: int = 60) -> bytes: + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("sandbox.file.path", path) + + encoded_path = urllib.parse.quote(path, safe="") + response = await self.connector.send_request( + "GET", f"download/{encoded_path}", timeout=timeout + ) + content = response.content + + if span.is_recording(): + span.set_attribute("sandbox.file.size", len(content)) + + return content + + @async_trace_span("list") + async def list(self, path: str, timeout: int = 60) -> list[FileEntry]: + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("sandbox.file.path", path) + encoded_path = urllib.parse.quote(path, safe="") + response = await self.connector.send_request( + "GET", f"list/{encoded_path}", timeout=timeout + ) + + try: + entries = response.json() + except ValueError as e: + raise RuntimeError( + f"Failed to decode JSON response from sandbox: {response.text}" + ) from e + + if not entries: + return [] + + try: + file_entries = [FileEntry(**e) for e in entries] + except Exception as e: + raise RuntimeError( + f"Server returned invalid file entry format: {entries}" + ) from e + + if span.is_recording(): + span.set_attribute("sandbox.file.count", len(file_entries)) + return file_entries + + @async_trace_span("exists") + async def exists(self, path: str, timeout: int = 60) -> bool: + span = trace.get_current_span() + if span.is_recording(): + span.set_attribute("sandbox.file.path", path) + encoded_path = urllib.parse.quote(path, safe="") + response = await self.connector.send_request( + "GET", f"exists/{encoded_path}", timeout=timeout + ) + + try: + response_data = response.json() + except ValueError as e: + raise RuntimeError( + f"Failed to decode JSON response from sandbox: {response.text}" + ) from e + + exists = response_data.get("exists", False) + if span.is_recording(): + span.set_attribute("sandbox.file.exists", exists) + return exists diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/k8s_helper.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/k8s_helper.py index 72ae6a86f..76e39e508 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/k8s_helper.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/k8s_helper.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import time from typing import List from kubernetes import client, config, watch from .exceptions import SandboxMetadataError, SandboxNotFoundError @@ -75,67 +76,75 @@ def resolve_sandbox_name(self, claim_name: str, namespace: str, timeout: int) -> name. This method watches the SandboxClaim until the sandbox name appears in the claim's status, then returns it. """ - w = watch.Watch() + deadline = time.monotonic() + timeout logging.info(f"Resolving sandbox name from claim '{claim_name}'...") - for event in w.stream( - func=self.custom_objects_api.list_namespaced_custom_object, - namespace=namespace, - group=CLAIM_API_GROUP, - version=CLAIM_API_VERSION, - plural=CLAIM_PLURAL_NAME, - field_selector=f"metadata.name={claim_name}", - timeout_seconds=timeout - ): - if event is None: - continue - if event["type"] == "DELETED": - w.stop() - raise SandboxMetadataError( - f"SandboxClaim '{claim_name}' was deleted while resolving sandbox name") - if event["type"] in ["ADDED", "MODIFIED"]: - claim_object = event['object'] - sandbox_status = claim_object.get( - 'status', {}).get('sandbox', {}) - # Support both 'name' (standard) and 'Name' (legacy, before CRD rename in #440) - name = sandbox_status.get('name', '') or sandbox_status.get('Name', '') - if name: - logging.info( - f"Resolved sandbox name '{name}' from claim status") + while True: + remaining = int(deadline - time.monotonic()) + if remaining <= 0: + raise TimeoutError( + f"Could not resolve sandbox name from claim " + f"'{claim_name}' within {timeout} seconds.") + w = watch.Watch() + for event in w.stream( + func=self.custom_objects_api.list_namespaced_custom_object, + namespace=namespace, + group=CLAIM_API_GROUP, + version=CLAIM_API_VERSION, + plural=CLAIM_PLURAL_NAME, + field_selector=f"metadata.name={claim_name}", + timeout_seconds=remaining + ): + if event is None: + continue + if event["type"] == "DELETED": w.stop() - return name - raise TimeoutError( - f"Could not resolve sandbox name from claim " - f"'{claim_name}' within {timeout} seconds.") + raise SandboxMetadataError( + f"SandboxClaim '{claim_name}' was deleted while resolving sandbox name") + if event["type"] in ["ADDED", "MODIFIED"]: + claim_object = event['object'] + sandbox_status = claim_object.get( + 'status', {}).get('sandbox', {}) + # Support both 'name' (standard) and 'Name' (legacy, before CRD rename in #440) + name = sandbox_status.get('name', '') or sandbox_status.get('Name', '') + if name: + logging.info( + f"Resolved sandbox name '{name}' from claim status") + w.stop() + return name def wait_for_sandbox_ready(self, name: str, namespace: str, timeout: int): """Waits for the Sandbox custom resource to have a 'Ready' status.""" + deadline = time.monotonic() + timeout logging.info(f"Watching for Sandbox {name} to become ready...") - w = watch.Watch() - for event in w.stream( - func=self.custom_objects_api.list_namespaced_custom_object, - namespace=namespace, - group=SANDBOX_API_GROUP, - version=SANDBOX_API_VERSION, - plural=SANDBOX_PLURAL_NAME, - field_selector=f"metadata.name={name}", - timeout_seconds=timeout - ): - if event is None: - continue - if event["type"] in ["ADDED", "MODIFIED"]: - sandbox_object = event['object'] - status = sandbox_object.get('status', {}) - conditions = status.get('conditions', []) - for cond in conditions: - if cond.get('type') == 'Ready' and cond.get('status') == 'True': - logging.info(f"Sandbox {name} is ready.") - w.stop() - return - elif event["type"] == "DELETED": - logging.error(f"Sandbox {name} was deleted before becoming ready.") - w.stop() - raise SandboxNotFoundError(f"Sandbox {name} was deleted before becoming ready.") - raise TimeoutError(f"Sandbox {name} did not become ready within {timeout} seconds.") + while True: + remaining = int(deadline - time.monotonic()) + if remaining <= 0: + raise TimeoutError(f"Sandbox {name} did not become ready within {timeout} seconds.") + w = watch.Watch() + for event in w.stream( + func=self.custom_objects_api.list_namespaced_custom_object, + namespace=namespace, + group=SANDBOX_API_GROUP, + version=SANDBOX_API_VERSION, + plural=SANDBOX_PLURAL_NAME, + field_selector=f"metadata.name={name}", + timeout_seconds=remaining + ): + if event is None: + continue + if event["type"] in ["ADDED", "MODIFIED"]: + sandbox_object = event['object'] + status = sandbox_object.get('status', {}) + conditions = status.get('conditions', []) + for cond in conditions: + if cond.get('type') == 'Ready' and cond.get('status') == 'True': + logging.info(f"Sandbox {name} is ready.") + w.stop() + return + elif event["type"] == "DELETED": + logging.error(f"Sandbox {name} was deleted before becoming ready.") + w.stop() + raise SandboxNotFoundError(f"Sandbox {name} was deleted before becoming ready.") def delete_sandbox_claim(self, name: str, namespace: str): """Deletes a SandboxClaim custom resource.""" @@ -188,27 +197,31 @@ def list_sandbox_claims(self, namespace: str) -> List[str]: def wait_for_gateway_ip(self, gateway_name: str, namespace: str, timeout: int) -> str: """Waits for the Gateway to be assigned an external IP.""" + deadline = time.monotonic() + timeout logging.info(f"Waiting for Gateway '{gateway_name}' in namespace '{namespace}'...") - w = watch.Watch() - for event in w.stream( - func=self.custom_objects_api.list_namespaced_custom_object, - namespace=namespace, - group=GATEWAY_API_GROUP, - version=GATEWAY_API_VERSION, - plural=GATEWAY_PLURAL, - field_selector=f"metadata.name={gateway_name}", - timeout_seconds=timeout, - ): - if event is None: - continue - if event["type"] in ["ADDED", "MODIFIED"]: - gateway_object = event['object'] - status = gateway_object.get('status', {}) - addresses = status.get('addresses', []) - if addresses: - ip_address = addresses[0].get('value') - if ip_address: - logging.info(f"Gateway ready. IP: {ip_address}") - w.stop() - return ip_address - raise TimeoutError(f"Gateway '{gateway_name}' did not get an IP.") + while True: + remaining = int(deadline - time.monotonic()) + if remaining <= 0: + raise TimeoutError(f"Gateway '{gateway_name}' did not get an IP.") + w = watch.Watch() + for event in w.stream( + func=self.custom_objects_api.list_namespaced_custom_object, + namespace=namespace, + group=GATEWAY_API_GROUP, + version=GATEWAY_API_VERSION, + plural=GATEWAY_PLURAL, + field_selector=f"metadata.name={gateway_name}", + timeout_seconds=remaining, + ): + if event is None: + continue + if event["type"] in ["ADDED", "MODIFIED"]: + gateway_object = event['object'] + status = gateway_object.get('status', {}) + addresses = status.get('addresses', []) + if addresses: + ip_address = addresses[0].get('value') + if ip_address: + logging.info(f"Gateway ready. IP: {ip_address}") + w.stop() + return ip_address diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/test/unit/test_async_sandboxclient.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/test/unit/test_async_sandboxclient.py new file mode 100644 index 000000000..34b4e6be9 --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/test/unit/test_async_sandboxclient.py @@ -0,0 +1,367 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import json +import unittest +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, HTTPServer +from threading import Thread +from unittest.mock import ANY, AsyncMock, MagicMock, patch + +import pytest + +httpx = pytest.importorskip("httpx") +pytest.importorskip("kubernetes_asyncio") + +from k8s_agent_sandbox.async_connector import AsyncSandboxConnector +from k8s_agent_sandbox.async_sandbox import AsyncSandbox +from k8s_agent_sandbox.async_sandbox_client import AsyncSandboxClient +from k8s_agent_sandbox.exceptions import SandboxRequestError +from k8s_agent_sandbox.models import ( + SandboxDirectConnectionConfig, + SandboxLocalTunnelConnectionConfig, +) + + +class TestAsyncSandboxClient(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + patcher = patch("k8s_agent_sandbox.async_sandbox_client.AsyncK8sHelper") + self.MockAsyncK8sHelper = patcher.start() + self.addCleanup(patcher.stop) + + self.config = SandboxDirectConnectionConfig( + api_url="http://test-router:8080", server_port=8888 + ) + self.client = AsyncSandboxClient(connection_config=self.config) + self.mock_k8s_helper = self.client.k8s_helper + self.mock_sandbox_class = MagicMock() + self.client.sandbox_class = self.mock_sandbox_class + + async def test_create_sandbox_success(self): + self.mock_k8s_helper.resolve_sandbox_name = AsyncMock(return_value="resolved-id") + self.mock_k8s_helper.get_sandbox = AsyncMock(return_value={"metadata": {}}) + + mock_sandbox_instance = MagicMock() + mock_sandbox_instance.terminate = AsyncMock() + self.mock_sandbox_class.return_value = mock_sandbox_instance + + with patch.object(self.client, "_create_claim", new_callable=AsyncMock) as mock_create, \ + patch.object(self.client, "_wait_for_sandbox_ready", new_callable=AsyncMock): + + sandbox = await self.client.create_sandbox("test-template", "test-namespace") + + mock_create.assert_called_once_with( + ANY, "test-template", "test-namespace", labels=None + ) + self.assertEqual(sandbox, mock_sandbox_instance) + + active = await self.client.list_active_sandboxes() + self.assertEqual(len(active), 1) + + async def test_create_sandbox_failure_cleanup(self): + self.mock_k8s_helper.resolve_sandbox_name = AsyncMock( + side_effect=Exception("Timeout") + ) + + with patch.object(self.client, "_create_claim", new_callable=AsyncMock), \ + patch.object(self.client, "_delete_claim", new_callable=AsyncMock) as mock_delete: + + with self.assertRaises(Exception) as ctx: + await self.client.create_sandbox("test-template", "test-namespace") + + self.assertEqual(str(ctx.exception), "Timeout") + mock_delete.assert_called_once() + + async def test_create_sandbox_cancellation_cleanup(self): + """CancelledError (BaseException) should still trigger claim cleanup.""" + self.mock_k8s_helper.resolve_sandbox_name = AsyncMock( + side_effect=asyncio.CancelledError() + ) + + with patch.object(self.client, "_create_claim", new_callable=AsyncMock), \ + patch.object(self.client, "_delete_claim", new_callable=AsyncMock) as mock_delete: + + with self.assertRaises(asyncio.CancelledError): + await self.client.create_sandbox("test-template", "test-namespace") + + mock_delete.assert_called_once() + + async def test_get_sandbox_existing_active(self): + mock_sandbox = MagicMock() + mock_sandbox.is_active = True + mock_sandbox.terminate = AsyncMock() + self.client._active_connection_sandboxes[("test-namespace", "test-claim")] = mock_sandbox + + self.mock_k8s_helper.resolve_sandbox_name = AsyncMock(return_value="resolved-id") + self.mock_k8s_helper.get_sandbox = AsyncMock(return_value={"metadata": {}}) + + sandbox = await self.client.get_sandbox("test-claim", "test-namespace") + self.assertEqual(sandbox, mock_sandbox) + self.mock_sandbox_class.assert_not_called() + + async def test_get_sandbox_inactive_reattaches(self): + mock_inactive = MagicMock() + mock_inactive.is_active = False + mock_inactive.terminate = AsyncMock() + self.client._active_connection_sandboxes[("test-namespace", "test-claim")] = mock_inactive + + self.mock_k8s_helper.resolve_sandbox_name = AsyncMock(return_value="resolved-id") + self.mock_k8s_helper.get_sandbox = AsyncMock(return_value={"metadata": {}}) + + mock_new = MagicMock() + self.mock_sandbox_class.return_value = mock_new + + sandbox = await self.client.get_sandbox("test-claim", "test-namespace") + self.assertEqual(sandbox, mock_new) + + async def test_get_sandbox_not_found(self): + self.mock_k8s_helper.resolve_sandbox_name = AsyncMock( + side_effect=Exception("Not found") + ) + + with self.assertRaises(RuntimeError) as ctx: + await self.client.get_sandbox("test-claim", "test-namespace") + + self.assertIn("not found", str(ctx.exception)) + + async def test_list_active_sandboxes(self): + mock_active = MagicMock() + mock_active.is_active = True + self.client._active_connection_sandboxes[("ns1", "active-claim")] = mock_active + + mock_inactive = MagicMock() + mock_inactive.is_active = False + self.client._active_connection_sandboxes[("ns2", "inactive-claim")] = mock_inactive + + active = await self.client.list_active_sandboxes() + self.assertEqual(active, [("ns1", "active-claim")]) + + async def test_list_all_sandboxes(self): + self.mock_k8s_helper.list_sandbox_claims = AsyncMock( + return_value=["sb-1", "sb-2"] + ) + result = await self.client.list_all_sandboxes("test-ns") + self.assertEqual(result, ["sb-1", "sb-2"]) + + async def test_delete_sandbox_in_registry(self): + mock_sandbox = MagicMock() + mock_sandbox.terminate = AsyncMock() + self.client._active_connection_sandboxes[("test-ns", "test-claim")] = mock_sandbox + + await self.client.delete_sandbox("test-claim", "test-ns") + mock_sandbox.terminate.assert_called_once() + + async def test_delete_all(self): + mock1 = MagicMock() + mock1.terminate = AsyncMock() + mock2 = MagicMock() + mock2.terminate = AsyncMock() + self.client._active_connection_sandboxes[("ns1", "c1")] = mock1 + self.client._active_connection_sandboxes[("ns2", "c2")] = mock2 + + with patch.object(self.client, "delete_sandbox", new_callable=AsyncMock) as mock_del: + await self.client.delete_all() + self.assertEqual(mock_del.call_count, 2) + + async def test_close_clears_registry(self): + mock_sandbox = MagicMock() + mock_sandbox._close_connection = AsyncMock() + self.client._active_connection_sandboxes[("ns", "claim")] = mock_sandbox + self.mock_k8s_helper.close = AsyncMock() + + await self.client.close() + + self.assertEqual(len(self.client._active_connection_sandboxes), 0) + mock_sandbox._close_connection.assert_called_once() + self.mock_k8s_helper.close.assert_called_once() + + async def test_context_manager(self): + self.mock_k8s_helper.close = AsyncMock() + + async with self.client as c: + self.assertIsInstance(c, AsyncSandboxClient) + + self.mock_k8s_helper.close.assert_called_once() + + async def test_requires_connection_config(self): + with self.assertRaises(ValueError) as ctx: + AsyncSandboxClient(connection_config=None) + self.assertIn("connection_config is required", str(ctx.exception)) + + async def test_validate_labels_rejects_invalid_value(self): + with self.assertRaises(ValueError): + await self.client.create_sandbox("t", labels={"agent": "invalid value!"}) + + async def test_validate_labels_rejects_empty_key(self): + with self.assertRaises(ValueError): + await self.client.create_sandbox("t", labels={"": "v"}) + + +class TestAsyncSandbox(unittest.IsolatedAsyncioTestCase): + + async def test_requires_connection_config(self): + with self.assertRaises(ValueError) as ctx: + AsyncSandbox( + claim_name="test", + sandbox_id="test-id", + connection_config=None, + ) + self.assertIn("connection_config is required", str(ctx.exception)) + + +class TestAsyncConnector(unittest.IsolatedAsyncioTestCase): + + async def test_rejects_local_tunnel_config(self): + with self.assertRaises(ValueError) as ctx: + AsyncSandboxConnector( + sandbox_id="test", + namespace="default", + connection_config=SandboxLocalTunnelConnectionConfig(), + k8s_helper=MagicMock(), + ) + self.assertIn("does not support SandboxLocalTunnelConnectionConfig", str(ctx.exception)) + + +class AsyncSandboxHandler(BaseHTTPRequestHandler): + """Minimal handler for async connector HTTP tests.""" + + def do_POST(self): + if self.path == "/execute": + self._respond(HTTPStatus.OK, {"stdout": "hello", "stderr": "", "exit_code": 0}) + elif self.path == "/server-error": + self._respond(HTTPStatus.INTERNAL_SERVER_ERROR, {"detail": "boom"}) + else: + self._respond(HTTPStatus.NOT_FOUND, {"detail": "not found"}) + + def do_GET(self): + if self.path == "/health": + self._respond(HTTPStatus.OK, {"status": "healthy"}) + else: + self._respond(HTTPStatus.NOT_FOUND, {"detail": "not found"}) + + def _respond(self, status: HTTPStatus, body: dict): + self.send_response(status) + self.send_header("Content-Type", "application/json") + payload = json.dumps(body).encode() + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def log_message(self, *args): + pass + + +class TestAsyncConnectorHTTP(unittest.IsolatedAsyncioTestCase): + + @classmethod + def setUpClass(cls): + cls.server = HTTPServer(("127.0.0.1", 0), AsyncSandboxHandler) + cls.port = cls.server.server_address[1] + cls.server_thread = Thread(target=cls.server.serve_forever) + cls.server_thread.daemon = True + cls.server_thread.start() + + @classmethod + def tearDownClass(cls): + cls.server.shutdown() + cls.server.server_close() + cls.server_thread.join(timeout=5) + + def _make_connector(self) -> AsyncSandboxConnector: + config = SandboxDirectConnectionConfig( + api_url=f"http://127.0.0.1:{self.port}", + server_port=self.port, + ) + k8s_helper = MagicMock() + return AsyncSandboxConnector( + sandbox_id="test-sandbox", + namespace="default", + connection_config=config, + k8s_helper=k8s_helper, + ) + + async def test_successful_request(self): + connector = self._make_connector() + try: + response = await connector.send_request("GET", "health") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["status"], "healthy") + finally: + await connector.close() + + async def test_post_execute(self): + connector = self._make_connector() + try: + response = await connector.send_request( + "POST", "execute", json={"command": "echo hello"} + ) + self.assertEqual(response.status_code, 200) + data = response.json() + self.assertEqual(data["stdout"], "hello") + self.assertEqual(data["exit_code"], 0) + finally: + await connector.close() + + async def test_404_raises_sandbox_request_error(self): + connector = self._make_connector() + try: + with self.assertRaises(SandboxRequestError) as ctx: + await connector.send_request("GET", "nonexistent") + self.assertEqual(ctx.exception.status_code, 404) + finally: + await connector.close() + + async def test_sandbox_request_error_is_runtime_error(self): + """Backward compat: SandboxRequestError is still a RuntimeError.""" + connector = self._make_connector() + try: + with self.assertRaises(RuntimeError): + await connector.send_request("GET", "nonexistent") + finally: + await connector.close() + + async def test_connection_refused_no_status_code(self): + config = SandboxDirectConnectionConfig( + api_url="http://127.0.0.1:1", server_port=1 + ) + connector = AsyncSandboxConnector( + sandbox_id="test", + namespace="default", + connection_config=config, + k8s_helper=MagicMock(), + ) + try: + with self.assertRaises(SandboxRequestError) as ctx: + await connector.send_request("POST", "run", timeout=1) + self.assertIsNone(ctx.exception.status_code) + finally: + await connector.close() + + async def test_sandbox_headers_sent(self): + """Verify X-Sandbox-* headers are included in requests.""" + connector = self._make_connector() + try: + response = await connector.send_request("GET", "health") + # We can't easily inspect request headers from the server side + # in this test setup, but the request succeeds which validates + # the header injection doesn't break the flow. + self.assertEqual(response.status_code, 200) + finally: + await connector.close() + + +if __name__ == "__main__": + unittest.main() diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/trace_manager.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/trace_manager.py index 011e38872..d63884d4c 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/trace_manager.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/trace_manager.py @@ -192,6 +192,30 @@ def wrapper(self, *args, **kwargs): return decorator +def async_trace_span(span_suffix): + """ + Async version of trace_span. Wraps an async method in an OpenTelemetry span. + + Same requirements as trace_span: the instance must have `self.tracer` and + `self.trace_service_name`. + """ + def decorator(func): + @functools.wraps(func) + async def wrapper(self, *args, **kwargs): + tracer = getattr(self, 'tracer', None) + if not tracer: + return await func(self, *args, **kwargs) + + service_name = getattr( + self, 'trace_service_name', 'sandbox-client') + span_name = f"{service_name}.{span_suffix}" + + with tracer.start_as_current_span(span_name): + return await func(self, *args, **kwargs) + return wrapper + return decorator + + class TracerManager: """ Manages the tracing lifecycle for a single client instance. diff --git a/clients/python/agentic-sandbox-client/pyproject.toml b/clients/python/agentic-sandbox-client/pyproject.toml index e22cb7c6c..d25748af3 100644 --- a/clients/python/agentic-sandbox-client/pyproject.toml +++ b/clients/python/agentic-sandbox-client/pyproject.toml @@ -45,9 +45,16 @@ exclude = [ root = "../../.." [project.optional-dependencies] +async = [ + "httpx", + "kubernetes_asyncio", +] test = [ "pytest", "pytest-xdist", + "pytest-asyncio", + "httpx", + "kubernetes_asyncio", ] tracing = [ "opentelemetry-api~=1.39.0",