diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py index d6e13dab1..9068a2227 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py @@ -29,4 +29,7 @@ PODSNAPSHOT_API_GROUP = "podsnapshot.gke.io" PODSNAPSHOT_API_VERSION = "v1alpha1" +PODSNAPSHOT_PLURAL = "podsnapshots" +PODSNAPSHOTMANUALTRIGGER_PLURAL = "podsnapshotmanualtriggers" +PODSNAPSHOTMANUALTRIGGER_API_KIND = "PodSnapshotManualTrigger" PODSNAPSHOT_API_KIND = "PodSnapshot" diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot.md b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot.md index 909085497..189852990 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot.md +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot.md @@ -4,18 +4,24 @@ This directory contains the Python client extension for interacting with the Age ## `podsnapshot_client.py` -This file defines the `PodSnapshotSandboxClient` class, which extend the base `SandboxClient` to provide snapshot capabilities. +This file defines the `PodSnapshotSandboxClient` class, which extends the base `SandboxClient` to provide snapshot capabilities. ### `PodSnapshotSandboxClient` -A specialized Sandbox client for interacting with the gke pod snapshot controller. +A specialized Sandbox client for interacting with the GKE Pod Snapshot Controller. ### Key Features: -* **`PodSnapshotSandboxClient(template_name: str, ...)`**: - * Initializes the client with optional server port. - +* **`PodSnapshotSandboxClient(template_name: str, podsnapshot_timeout: int = 180, ...)`**: + * Initializes the client with optional podsnapshot timeout. +* **`snapshot(self, trigger_name: str) -> SnapshotResponse`**: + * Triggers a manual snapshot of the current sandbox pod by creating a `PodSnapshotManualTrigger` resource. + * The `trigger_name` is suffixed with a timestamp and unique hash. + * Waits for the snapshot to be processed. + * The Pod Snapshot Controller creates a `PodSnapshot` resource automatically. + * Returns the SnapshotResponse object(success, error_code, error_reason, trigger_name, snapshot_uid). * **`__exit__(self)`**: + * Cleans up the `PodSnapshotManualTrigger` resources. * Cleans up the `SandboxClaim` resources. ## `test_podsnapshot_extension.py` @@ -24,8 +30,10 @@ This file, located in the parent directory (`clients/python/agentic-sandbox-clie ### Test Phases: -1. **Phase 1: Starting Counter Sandbox**: +1. **Phase 1: Starting Counter Sandbox & Snapshotting**: * Starts a sandbox with a counter application. + * Takes a snapshot (`test-snapshot-10`) after ~10 seconds. + * Takes a snapshot (`test-snapshot-20`) after ~20 seconds. ### Prerequisites @@ -59,4 +67,4 @@ python3 clients/python/agentic-sandbox-client/test_podsnapshot_extension.py \ --namespace sandbox-test ``` -Adjust the `--namespace`, `--template-name` as needed for your environment. \ No newline at end of file +Adjust the `--namespace`, `--template-name` as needed for your environment. diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot_client.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot_client.py index 59725b69c..49981c833 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot_client.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot_client.py @@ -13,34 +13,66 @@ # limitations under the License. import logging -from kubernetes import client +import uuid +from datetime import datetime, timezone +from typing import Any +from dataclasses import dataclass +from kubernetes import client, watch from kubernetes.client import ApiException from ..sandbox_client import SandboxClient from ..constants import ( PODSNAPSHOT_API_GROUP, PODSNAPSHOT_API_VERSION, PODSNAPSHOT_API_KIND, + PODSNAPSHOTMANUALTRIGGER_API_KIND, + PODSNAPSHOTMANUALTRIGGER_PLURAL, ) +SNAPSHOT_SUCCESS_CODE = 0 +SNAPSHOT_ERROR_CODE = 1 + logger = logging.getLogger(__name__) +@dataclass +class SnapshotResult: + """Result of a snapshot processing operation.""" + + snapshot_uid: str + snapshot_timestamp: str + + +@dataclass +class SnapshotResponse: + """Structured response for snapshot operations.""" + + success: bool + trigger_name: str + snapshot_uid: str + snapshot_timestamp: str + error_reason: str + error_code: int + + class PodSnapshotSandboxClient(SandboxClient): """ A specialized Sandbox client for interacting with the GKE Pod Snapshot Controller. - - TODO: This class enables users to take a snapshot of their sandbox and restore from the taken snapshot. + This class enables users to take a manual trigger snapshot of their sandbox and restore from the taken snapshot. """ def __init__( self, template_name: str, + podsnapshot_timeout: int = 180, **kwargs, ): super().__init__(template_name, **kwargs) self.snapshot_crd_installed = False self.core_v1_api = client.CoreV1Api() + self.podsnapshot_timeout = podsnapshot_timeout + + self.created_manual_triggers = [] def __enter__(self) -> "PodSnapshotSandboxClient": try: @@ -61,7 +93,9 @@ def __enter__(self) -> "PodSnapshotSandboxClient": def _check_snapshot_crd_installed(self) -> bool: """ - Checks if the PodSnapshot CRD is installed in the cluster. + Checks if the PodSnapshot CRD is installed and available in the cluster. + Returns: + bool: True if the CRD is installed, False otherwise. """ if self.snapshot_crd_installed: @@ -87,8 +121,221 @@ def _check_snapshot_crd_installed(self) -> bool: return False raise + def _parse_created_snapshot_info(self, obj: dict[str, Any]) -> SnapshotResult: + """Parses the object to extract snapshot details.""" + status = obj.get("status", {}) + conditions = status.get("conditions") or [] + for condition in conditions: + if ( + condition.get("type") == "Triggered" + and condition.get("status") == "True" + and condition.get("reason") == "Complete" + ): + snapshot_created = status.get("snapshotCreated") or {} + snapshot_uid = snapshot_created.get("name") + snapshot_timestamp = condition.get("lastTransitionTime") + return SnapshotResult( + snapshot_uid=snapshot_uid, + snapshot_timestamp=snapshot_timestamp, + ) + elif condition.get("status") == "False" and condition.get("reason") in [ + "Failed", + "Error", + ]: + raise RuntimeError( + f"Snapshot failed. Condition: {condition.get('message', 'Unknown error')}" + ) + raise ValueError("Snapshot is not yet complete.") + + def _wait_for_snapshot_to_be_completed( + self, trigger_name: str, resource_version: str | None = None + ) -> SnapshotResult: + """ + Waits for the PodSnapshotManualTrigger to be processed and returns SnapshotResult. + """ + w = watch.Watch() + logger.info( + f"Waiting for snapshot manual trigger '{trigger_name}' to be processed..." + ) + + kwargs = {} + if resource_version: + kwargs["resource_version"] = resource_version + + try: + for event in w.stream( + func=self.custom_objects_api.list_namespaced_custom_object, + namespace=self.namespace, + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + field_selector=f"metadata.name={trigger_name}", + timeout_seconds=self.podsnapshot_timeout, + **kwargs, + ): + if event["type"] in ["ADDED", "MODIFIED"]: + obj = event["object"] + try: + result = self._parse_created_snapshot_info(obj) + logger.info( + f"Snapshot manual trigger '{trigger_name}' processed successfully. Created Snapshot UID: {result.snapshot_uid}" + ) + return result + except ValueError: + # Continue watching if snapshot is not yet complete + continue + elif event["type"] == "ERROR": + logger.error( + f"Snapshot watch received error event: {event['object']}" + ) + raise RuntimeError(f"Snapshot watch error: {event['object']}") + elif event["type"] == "DELETED": + logger.error( + f"Snapshot manual trigger '{trigger_name}' was deleted before completion." + ) + raise RuntimeError( + f"Snapshot manual trigger '{trigger_name}' was deleted." + ) + except Exception as e: + logger.error(f"Error watching snapshot: {e}") + raise + finally: + w.stop() + + raise TimeoutError( + f"Snapshot manual trigger '{trigger_name}' was not processed within {self.podsnapshot_timeout} seconds." + ) + + def snapshot(self, trigger_name: str) -> SnapshotResponse: + """ + Triggers a snapshot of the specified pod by creating a PodSnapshotManualTrigger resource. + The trigger_name will be suffixed with a timestamp and random hex string. + Returns: + SnapshotResponse: The result of the operation. + """ + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") + suffix = uuid.uuid4().hex[:8] + trigger_name = f"{trigger_name}-{timestamp}-{suffix}" + + if not self.snapshot_crd_installed: + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + snapshot_timestamp=None, + error_reason="Snapshot CRD is not installed. Ensure it is installed and running.", + error_code=SNAPSHOT_ERROR_CODE, + ) + if not self.pod_name: + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + snapshot_timestamp=None, + error_reason="Sandbox pod name not found. Ensure sandbox is created.", + error_code=SNAPSHOT_ERROR_CODE, + ) + + manifest = { + "apiVersion": f"{PODSNAPSHOT_API_GROUP}/{PODSNAPSHOT_API_VERSION}", + "kind": f"{PODSNAPSHOTMANUALTRIGGER_API_KIND}", + "metadata": {"name": trigger_name, "namespace": self.namespace}, + "spec": {"targetPod": self.pod_name}, + } + + try: + created_obj = self.custom_objects_api.create_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + body=manifest, + ) + self.created_manual_triggers.append(trigger_name) + + # Start watching from the version we just created to avoid missing updates + resource_version = created_obj.get("metadata", {}).get("resourceVersion") + snapshot_result = self._wait_for_snapshot_to_be_completed( + trigger_name, resource_version + ) + + return SnapshotResponse( + success=True, + trigger_name=trigger_name, + snapshot_uid=snapshot_result.snapshot_uid, + snapshot_timestamp=snapshot_result.snapshot_timestamp, + error_reason="", + error_code=SNAPSHOT_SUCCESS_CODE, + ) + except ApiException as e: + logger.exception( + f"Failed to create PodSnapshotManualTrigger '{trigger_name}': {e}" + ) + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + snapshot_timestamp=None, + error_reason=f"Failed to create PodSnapshotManualTrigger: {e}", + error_code=SNAPSHOT_ERROR_CODE, + ) + except TimeoutError as e: + logger.exception( + f"Snapshot creation timed out for trigger '{trigger_name}': {e}" + ) + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + snapshot_timestamp=None, + error_reason=f"Snapshot creation timed out: {e}", + error_code=SNAPSHOT_ERROR_CODE, + ) + except RuntimeError as e: + logger.exception( + f"Snapshot creation failed for trigger '{trigger_name}': {e}" + ) + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + snapshot_timestamp=None, + error_reason=f"Snapshot creation failed: {e}", + error_code=SNAPSHOT_ERROR_CODE, + ) + except Exception as e: + logger.exception( + f"Unexpected error during snapshot creation for trigger '{trigger_name}': {e}" + ) + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + snapshot_timestamp=None, + error_reason=f"Unexpected error: {e}", + error_code=SNAPSHOT_ERROR_CODE, + ) + def __exit__(self, exc_type, exc_val, exc_tb): """ + Cleans up the PodSnapshotManualTrigger Resources. Automatically cleans up the Sandbox. """ + for trigger_name in self.created_manual_triggers: + try: + self.custom_objects_api.delete_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + name=trigger_name, + ) + logger.info(f"Deleted PodSnapshotManualTrigger '{trigger_name}'") + except ApiException as e: + if e.status == 404: + # Ignore if the resource is already deleted + continue + logger.error( + f"Failed to delete PodSnapshotManualTrigger '{trigger_name}': {e}" + ) super().__exit__(exc_type, exc_val, exc_tb) diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/test_podsnapshot_client.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/test_podsnapshot_client.py index 1ae581867..c778621df 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/test_podsnapshot_client.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/test_podsnapshot_client.py @@ -13,9 +13,8 @@ # limitations under the License. import unittest -import os import logging -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, patch, call from k8s_agent_sandbox.gke_extensions.podsnapshot_client import ( PodSnapshotSandboxClient, ) @@ -23,6 +22,8 @@ PODSNAPSHOT_API_KIND, PODSNAPSHOT_API_GROUP, PODSNAPSHOT_API_VERSION, + PODSNAPSHOTMANUALTRIGGER_PLURAL, + PODSNAPSHOTMANUALTRIGGER_API_KIND, ) from kubernetes.client import ApiException @@ -193,6 +194,365 @@ def test_check_snapshot_crd_installed_already_ready(self): self.assertTrue(result) self.client.custom_objects_api.get_api_resources.assert_not_called() + @patch("k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch") + def test_snapshot_success(self, mock_watch_cls): + """Test successful snapshot creation.""" + logging.info("Starting test_snapshot_success...") + + # Mock the watch + mock_watch = MagicMock() + mock_watch_cls.return_value = mock_watch + + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + self.client.namespace = "test-ns" + + # Mock the watch stream + mock_event = { + "type": "MODIFIED", + "object": { + "status": { + "conditions": [ + { + "type": "Triggered", + "status": "True", + "reason": "Complete", + "lastTransitionTime": "2023-01-01T00:00:00Z", + } + ], + "snapshotCreated": {"name": "snapshot-uid"}, + } + }, + } + mock_watch.stream.return_value = [mock_event] + + # Mock create to return an object with resourceVersion + mock_created_obj = {"metadata": {"resourceVersion": "123"}, "status": {}} + self.client.custom_objects_api.create_namespaced_custom_object.return_value = ( + mock_created_obj + ) + + result = self.client.snapshot("test-trigger") + + self.assertEqual(result.error_code, 0) + self.assertTrue(result.success, result.error_reason) + self.assertIn("test-trigger", result.trigger_name) + + # Verify create call was made + self.client.custom_objects_api.create_namespaced_custom_object.assert_called_once_with( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.client.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + body={ + "apiVersion": f"{PODSNAPSHOT_API_GROUP}/{PODSNAPSHOT_API_VERSION}", + "kind": f"{PODSNAPSHOTMANUALTRIGGER_API_KIND}", + "metadata": {"name": result.trigger_name, "namespace": self.client.namespace}, + "spec": {"targetPod": self.client.pod_name}, + }, + ) + # Verify watch was called with resource_version + mock_watch.stream.assert_called_once() + _, kwargs = mock_watch.stream.call_args + self.assertEqual(kwargs.get("resource_version"), "123") + logging.info("Finished test_snapshot_success.") + + @patch("k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch") + def test_snapshot_processed_retry(self, mock_watch_cls): + """Test that snapshot waits for 'Complete' status, ignoring intermediate states.""" + logging.info("Starting test_snapshot_processed_retry...") + + mock_watch = MagicMock() + mock_watch_cls.return_value = mock_watch + + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + self.client.namespace = "test-ns" + + # Mock events: + # 1. Triggered but not complete (should raise ValueError internally and retry) + # 2. Triggered and Complete (should succeed) + event_incomplete = { + "type": "MODIFIED", + "object": { + "status": { + "conditions": [ + { + "type": "Triggered", + "status": "False", # Not complete yet + "reason": "Pending", + } + ] + } + }, + } + event_complete = { + "type": "MODIFIED", + "object": { + "status": { + "conditions": [ + { + "type": "Triggered", + "status": "True", + "reason": "Complete", + "lastTransitionTime": "2023-01-01T00:00:00Z", + } + ], + "snapshotCreated": {"name": "snapshot-uid-retry"}, + } + }, + } + + mock_watch.stream.return_value = [event_incomplete, event_complete] + + # Mock create object + self.client.custom_objects_api.create_namespaced_custom_object.return_value = { + "metadata": {"resourceVersion": "999"} + } + + result = self.client.snapshot("test-retry") + + self.assertTrue(result.success,result.error_reason) + self.assertEqual(result.snapshot_uid, "snapshot-uid-retry") + logging.info("Finished test_snapshot_processed_retry.") + + def test_snapshot_no_pod_name(self): + """Test snapshot when pod name is not set.""" + logging.info("Starting test_snapshot_no_pod_name...") + self.client.snapshot_crd_installed = True + self.client.pod_name = None + result = self.client.snapshot("test-trigger") + + self.assertEqual(result.error_code, 1) + self.assertFalse(result.success, result.error_reason) + self.assertIn("test-trigger", result.trigger_name) + self.assertIn("Sandbox pod name not found", result.error_reason) + logging.info("Finished test_snapshot_no_pod_name.") + + def test_snapshot_creation_api_exception(self): + """Test snapshot handling of API exception during creation.""" + logging.info("Starting test_snapshot_creation_api_exception...") + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + + self.client.custom_objects_api.create_namespaced_custom_object.side_effect = ( + ApiException("Create failed") + ) + + result = self.client.snapshot("test-trigger") + + self.assertFalse(result.success, result.error_reason) + self.assertEqual(result.error_code, 1) + self.assertIn("Failed to create PodSnapshotManualTrigger", result.error_reason) + logging.info("Finished test_snapshot_creation_api_exception.") + + @patch("k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch") + @patch( + "k8s_agent_sandbox.gke_extensions.podsnapshot_client.client.CustomObjectsApi" + ) + def test_snapshot_timeout(self, mock_custom_cls, mock_watch_cls): + """Test snapshot timeout scenario.""" + logging.info("Starting test_snapshot_timeout...") + mock_custom = MagicMock() + mock_custom_cls.return_value = mock_custom + + mock_watch = MagicMock() + mock_watch_cls.return_value = mock_watch + + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + self.client.podsnapshot_timeout = 1 + + # Mock empty stream (timeout) + mock_watch.stream.return_value = [] + + result = self.client.snapshot("test-trigger") + + self.assertEqual(result.error_code, 1) + self.assertFalse(result.success, result.error_reason) + self.assertIn("timed out", result.error_reason) + logging.info("Finished test_snapshot_timeout.") + + @patch("k8s_agent_sandbox.gke_extensions.podsnapshot_client.SandboxClient.__exit__") + def test_exit_cleanup(self, mock_super_exit): + """Test __exit__ cleans up created triggers.""" + logging.info("Starting test_exit_cleanup...") + self.client.created_manual_triggers = ["trigger-1", "trigger-2"] + + self.client.__exit__(None, None, None) + + # Check deletion calls + self.assertEqual( + self.client.custom_objects_api.delete_namespaced_custom_object.call_count, 2 + ) + + calls = [ + call( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.client.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + name="trigger-1", + ), + call( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.client.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + name="trigger-2", + ), + ] + self.client.custom_objects_api.delete_namespaced_custom_object.assert_has_calls( + calls, any_order=True + ) + + mock_super_exit.assert_called_once_with(None, None, None) + logging.info("Finished test_exit_cleanup.") + + def test_snapshot_watch_failure_condition(self): + """Test snapshot failure when watch event reports 'False' status.""" + logging.info("Starting test_snapshot_watch_failure_condition...") + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + + # Mock watch to return failure event + mock_watch = MagicMock() + with patch( + "k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch" + ) as mock_watch_cls: + mock_watch_cls.return_value = mock_watch + failure_event = { + "type": "MODIFIED", + "object": { + "status": { + "conditions": [ + { + "type": "Triggered", + "status": "False", + "reason": "Failed", + "message": "Snapshot failed due to timeout", + } + ] + } + }, + } + mock_watch.stream.return_value = [failure_event] + + # Mock create to return resource version + self.client.custom_objects_api.create_namespaced_custom_object.return_value = { + "metadata": {"resourceVersion": "100"} + } + + result = self.client.snapshot("test-trigger-fail") + + self.assertFalse(result.success, result.error_reason) + self.assertEqual(result.error_code, 1) + self.assertIn( + "Snapshot failed. Condition: Snapshot failed due to timeout", + result.error_reason, + ) + logging.info("Finished test_snapshot_watch_failure_condition.") + + def test_snapshot_watch_error_event(self): + """Test snapshot failure on 'ERROR' event type.""" + logging.info("Starting test_snapshot_watch_error_event...") + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + + mock_watch = MagicMock() + with patch( + "k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch" + ) as mock_watch_cls: + mock_watch_cls.return_value = mock_watch + error_event = { + "type": "ERROR", + "object": {"code": 500, "message": "Internal Server Error"}, + } + mock_watch.stream.return_value = [error_event] + + self.client.custom_objects_api.create_namespaced_custom_object.return_value = { + "metadata": {"resourceVersion": "100"} + } + + result = self.client.snapshot("test-trigger-error") + + self.assertFalse(result.success, result.error_reason) + self.assertEqual(result.error_code, 1) + self.assertIn("Snapshot watch error:", result.error_reason) + logging.info("Finished test_snapshot_watch_error_event.") + + def test_snapshot_watch_deleted_event(self): + """Test snapshot failure on 'DELETED' event type.""" + logging.info("Starting test_snapshot_watch_deleted_event...") + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + + mock_watch = MagicMock() + with patch( + "k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch" + ) as mock_watch_cls: + mock_watch_cls.return_value = mock_watch + deleted_event = {"type": "DELETED", "object": {}} + mock_watch.stream.return_value = [deleted_event] + + self.client.custom_objects_api.create_namespaced_custom_object.return_value = { + "metadata": {"resourceVersion": "100"} + } + + result = self.client.snapshot("test-trigger-deleted") + + self.assertFalse(result.success, result.error_reason) + self.assertEqual(result.error_code, 1) + self.assertIn("was deleted", result.error_reason) + logging.info("Finished test_snapshot_watch_deleted_event.") + + def test_snapshot_watch_generic_exception(self): + """Test snapshot failure on generic exception during watch.""" + logging.info("Starting test_snapshot_watch_generic_exception...") + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + + mock_watch = MagicMock() + with patch( + "k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch" + ) as mock_watch_cls: + mock_watch_cls.return_value = mock_watch + # Simulate generic exception + mock_watch.stream.side_effect = Exception("Something went wrong") + + self.client.custom_objects_api.create_namespaced_custom_object.return_value = { + "metadata": {"resourceVersion": "100"} + } + + result = self.client.snapshot("test-trigger-generic") + + self.assertFalse(result.success, result.error_reason) + self.assertEqual(result.error_code, 1) + self.assertIn("Unexpected error: Something went wrong", result.error_reason) + logging.info("Finished test_snapshot_watch_generic_exception.") + + def test_snapshot_invalid_name_api_exception(self): + """Test snapshot failure when trigger name is invalid (ApiException).""" + logging.info("Starting test_snapshot_invalid_name_api_exception...") + self.client.pod_name = "test-pod" + self.client.snapshot_crd_installed = True + + self.client.custom_objects_api.create_namespaced_custom_object.side_effect = ApiException( + status=400, + reason="BadRequest", + http_resp=MagicMock( + data='Invalid value: "Test_Trigger": must be a lowercase RFC 1123 subdomain' + ), + ) + + result = self.client.snapshot("Test_Trigger") + + self.assertFalse(result.success, result.error_reason) + self.assertEqual(result.error_code, 1) + self.assertIn("Failed to create PodSnapshotManualTrigger", result.error_reason) + self.assertIn("Invalid value", result.error_reason) + logging.info("Finished test_snapshot_invalid_name_api_exception.") + if __name__ == "__main__": unittest.main() diff --git a/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py b/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py index 383e12d33..56c047e29 100644 --- a/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py +++ b/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py @@ -12,9 +12,38 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Integration test for PodSnapshotSandboxClient. +""" + import argparse +import time from kubernetes import config from k8s_agent_sandbox.gke_extensions import PodSnapshotSandboxClient +from k8s_agent_sandbox.gke_extensions.podsnapshot_client import SnapshotResponse + + +WAIT_TIME_SECONDS = 10 + + +def test_snapshot_response(snapshot_response: SnapshotResponse, snapshot_name: str): + assert hasattr( + snapshot_response, "trigger_name" + ), "snapshot response missing 'trigger_name' attribute" + + print(f"Trigger Name: {snapshot_response.trigger_name}") + print(f"Snapshot UID: {snapshot_response.snapshot_uid}") + print(f"Success: {snapshot_response.success}") + print(f"Error Code: {snapshot_response.error_code}") + print(f"Error Reason: {snapshot_response.error_reason}") + + assert snapshot_response.trigger_name.startswith( + snapshot_name + ), f"Expected trigger name prefix '{snapshot_name}', but got '{snapshot_response.trigger_name}'" + assert ( + snapshot_response.success + ), f"Expected success=True, but got False. Reason: {snapshot_response.error_reason}" + assert snapshot_response.error_code == 0 def main( @@ -38,6 +67,9 @@ def main( except config.ConfigException: config.load_kube_config() + first_snapshot_name = "test-snapshot-10" + second_snapshot_name = "test-snapshot-20" + try: print("\n***** Phase 1: Starting Counter *****") @@ -48,9 +80,25 @@ def main( server_port=server_port, ) as sandbox: print("\n======= Testing Pod Snapshot Extension =======") - assert ( - sandbox.snapshot_crd_installed == True - ), "Pod Snapshot CRD is not installed." + assert sandbox.snapshot_crd_installed, "Pod Snapshot CRD is not installed." + time.sleep(WAIT_TIME_SECONDS) + print( + f"Creating first pod snapshot '{first_snapshot_name}' after {WAIT_TIME_SECONDS} seconds..." + ) + snapshot_response = sandbox.snapshot(first_snapshot_name) + test_snapshot_response(snapshot_response, first_snapshot_name) + + time.sleep(WAIT_TIME_SECONDS) + + print( + f"\nCreating second pod snapshot '{second_snapshot_name}' after {WAIT_TIME_SECONDS} seconds..." + ) + snapshot_response = sandbox.snapshot(second_snapshot_name) + test_snapshot_response(snapshot_response, second_snapshot_name) + recent_snapshot_uid = snapshot_response.snapshot_uid + print(f"Recent snapshot UID: {recent_snapshot_uid}") + + print("--- Pod Snapshot Test Passed! ---") except Exception as e: print(f"\n--- An error occurred during the test: {e} ---")