Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import subprocess
import logging
from dataclasses import dataclass
from enum import Enum

import requests
from requests.adapters import HTTPAdapter
Expand Down Expand Up @@ -55,6 +56,13 @@
format='%(asctime)s - %(levelname)s - %(message)s',
stream=sys.stdout)

class SandboxStatus(Enum):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i'm not mistaken, this PR will be affected by this one #121 , right ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice, we are exposing the Sandbox status directly. Yes let me hold on to my current PR.

"""Enumeration for Sandbox status states."""
PROVISIONING = "Provisioning"
RUNNING = "Running"
SUCCEEDED = "Succeeded"
FAILED = "Failed"
UNKNOWN = "Unknown"

@dataclass
class ExecutionResult:
Expand Down Expand Up @@ -119,6 +127,7 @@ def __init__(
config.load_kube_config()

self.custom_objects_api = client.CustomObjectsApi()
self.core_v1_api = client.CoreV1Api()

# HTTP session with retries
self.session = requests.Session()
Expand Down Expand Up @@ -477,3 +486,38 @@ def read(self, path: str, timeout: int = 60) -> bytes:
span.set_attribute("sandbox.file.size", len(content))

return content

@trace_span("status")
def status(self) -> SandboxStatus:
"""
Returns the lifecycle status of the sandbox.
"""

span = trace.get_current_span()
if span.is_recording():
span.set_attribute("sandbox.name", self.sandbox_name)

try:
pod = self.core_v1_api.read_namespaced_pod(
name=self.pod_name, namespace=self.namespace)

status_map = {
"Pending": SandboxStatus.PROVISIONING,
"Running": SandboxStatus.RUNNING,
"Succeeded": SandboxStatus.SUCCEEDED,
"Failed": SandboxStatus.FAILED,
"Unknown": SandboxStatus.UNKNOWN
}
status = status_map.get(pod.status.phase, SandboxStatus.UNKNOWN)
if span.is_recording():
span.set_attribute("sandbox.status", status.value)
return status

except client.ApiException as e:
if e.status == 404:
return SandboxStatus.FAILED
logging.error(f"Pod not found: {e}")
except Exception as e:
logging.error(f"Unexpected error fetching sandbox status: {e}")
return SandboxStatus.UNKNOWN

9 changes: 8 additions & 1 deletion clients/python/agentic-sandbox-client/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import argparse
import asyncio
import time
from agentic_sandbox import SandboxClient

POD_NAME_ANNOTATION = "agents.x-k8s.io/pod-name"
Expand Down Expand Up @@ -60,12 +61,18 @@ async def main(template_name: str, gateway_name: str | None, api_url: str | None
assert sandbox.pod_name == sandbox.sandbox_name, f"Expected pod_name to be '{sandbox.sandbox_name}', but got '{sandbox.pod_name}'"
print("--- Pod Name Discovery Test Passed (Fallback) ---")

print("\n--- Testing Sandbox Status ---")

status = sandbox.status()
print(f"Sandbox status: {status.value}")
assert status.value == "Running", f"Expected sandbox status to be 'Running', but got '{status}'"
print("--- Sandbox Status Test Passed ---")

print("\n--- Testing Command Execution ---")
command_to_run = "echo 'Hello from the sandbox!'"
print(f"Executing command: '{command_to_run}'")

result = sandbox.run(command_to_run)

print(f"Stdout: {result.stdout.strip()}")
print(f"Stderr: {result.stderr.strip()}")
print(f"Exit Code: {result.exit_code}")
Expand Down
137 changes: 137 additions & 0 deletions clients/python/agentic-sandbox-client/test_sandbox_client_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright 2025 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
from unittest.mock import MagicMock, patch, ANY
import sys
import os

from agentic_sandbox.sandbox_client import SandboxClient, SandboxStatus

class TestSandboxClient(unittest.TestCase):

def setUp(self):
# Patch Kubernetes config loading to avoid errors
self.patcher_load_kube_config = patch('kubernetes.config.load_kube_config')
self.patcher_load_incluster_config = patch('kubernetes.config.load_incluster_config')
self.mock_load_kube_config = self.patcher_load_kube_config.start()
self.mock_load_incluster_config = self.patcher_load_incluster_config.start()

# Patch Kubernetes Client classes
self.patcher_custom_objects = patch('kubernetes.client.CustomObjectsApi')
self.mock_custom_objects_cls = self.patcher_custom_objects.start()
self.mock_custom_objects_api = self.mock_custom_objects_cls.return_value

self.patcher_core_v1 = patch('kubernetes.client.CoreV1Api')
self.mock_core_v1_cls = self.patcher_core_v1.start()
self.mock_core_v1_api = self.mock_core_v1_cls.return_value

def tearDown(self):
self.patcher_load_kube_config.stop()
self.patcher_load_incluster_config.stop()
self.patcher_custom_objects.stop()
self.patcher_core_v1.stop()

def test_initialization(self):
"""Test that the client initializes with correct defaults."""
client = SandboxClient(template_name="test-template")
self.assertEqual(client.template_name, "test-template")
self.assertEqual(client.namespace, "default")
self.assertIsNone(client.base_url)

def test_create_claim(self):
"""Test that _create_claim calls the Kubernetes API correctly."""
client = SandboxClient(template_name="test-template")
client._create_claim()

self.assertIsNotNone(client.claim_name)
self.assertTrue(client.claim_name.startswith("sandbox-claim-"))

self.mock_custom_objects_api.create_namespaced_custom_object.assert_called_once()
call_args = self.mock_custom_objects_api.create_namespaced_custom_object.call_args
self.assertEqual(call_args.kwargs['group'], "extensions.agents.x-k8s.io")
self.assertEqual(call_args.kwargs['version'], "v1alpha1")
self.assertEqual(call_args.kwargs['plural'], "sandboxclaims")
self.assertEqual(call_args.kwargs['namespace'], "default")
self.assertEqual(call_args.kwargs['body']['spec']['sandboxTemplateRef']['name'], "test-template")

@patch('kubernetes.watch.Watch')
def test_wait_for_sandbox_ready(self, mock_watch_cls):
"""Test waiting for the sandbox to become ready."""
mock_watch = mock_watch_cls.return_value

# Simulate a watch event where the sandbox is Ready
mock_event = {
'type': 'MODIFIED',
'object': {
'metadata': {
'name': 'test-sandbox',
'annotations': {'agents.x-k8s.io/pod-name': 'test-pod-123'}
},
'status': {
'conditions': [
{'type': 'Ready', 'status': 'True'}
]
}
}
}
mock_watch.stream.return_value = [mock_event]

client = SandboxClient(template_name="test-template")
client.claim_name = "test-claim"

client._wait_for_sandbox_ready()

self.assertEqual(client.sandbox_name, "test-sandbox")
self.assertEqual(client.pod_name, "test-pod-123")

def test_status_running(self):
"""Test fetching status when pod is running."""
client = SandboxClient(template_name="test-template")
client.claim_name = "test-claim"
client.pod_name = "test-claim"

# Mock the pod status response
mock_pod = MagicMock()
mock_pod.status.phase = "Running"
self.mock_core_v1_api.read_namespaced_pod.return_value = mock_pod

status = client.status()

self.assertEqual(status, SandboxStatus.RUNNING)
self.mock_core_v1_api.read_namespaced_pod.assert_called_with(
name="test-claim", namespace="default"
)

@patch('requests.Session')
def test_run_command_success(self, mock_session_cls):
"""Test running a command successfully."""
mock_session = mock_session_cls.return_value
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
'stdout': 'Hello World',
'stderr': '',
'exit_code': 0
}
mock_session.request.return_value = mock_response

client = SandboxClient(template_name="test-template", api_url="http://localhost:8080")
result = client.run("echo Hello World")

self.assertEqual(result.stdout, "Hello World")
self.assertEqual(result.exit_code, 0)

if __name__ == '__main__':
unittest.main()