diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py index 9068a2227..201fc06d9 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py @@ -26,6 +26,7 @@ SANDBOX_PLURAL_NAME = "sandboxes" POD_NAME_ANNOTATION = "agents.x-k8s.io/pod-name" +PODSNAPSHOT_POD_NAME_LABEL = "podsnapshot.gke.io/pod-name" PODSNAPSHOT_API_GROUP = "podsnapshot.gke.io" PODSNAPSHOT_API_VERSION = "v1alpha1" diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/README.md b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/README.md index 30f9b931f..ee1f7f2d8 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/README.md +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/README.md @@ -1,6 +1,6 @@ # Agentic Sandbox Pod Snapshot Extension -This directory contains the Python client extension for interacting with the Agentic Sandbox to manage Pod Snapshots. This extension allows you to trigger snapshots of a running sandbox and restore a new sandbox from the recently created snapshot. +This directory contains the Python client extension for interacting with the Agentic Sandbox to manage Pod Snapshots. This extension allows you to trigger snapshots of a running sandbox and restore a new sandbox from a recently created snapshot. ## Components @@ -14,10 +14,11 @@ This class wraps the base `Sandbox` to seamlessly provide snapshot capabilities. ### `SnapshotEngine` The core engine responsible for interacting with the GKE Pod Snapshot Controller. -* Creates `PodSnapshotManualTrigger` custom resources. -* Watches for the snapshot controller to process the trigger and create a `PodSnapshot` resource. -* Returns a structured `SnapshotResponse` containing the success status, error details, and `snapshot_uid`. -* Ensures that manual trigger resources are cleanly deleted when the sandbox context exits. +* **Create**: Creates `PodSnapshotManualTrigger` custom resources and waits for the snapshot to be completed. +* **List**: Lists existing snapshots for a sandbox, with optional filtering by grouping labels and a flag to return ready-only snapshots. +* **Delete**: Deletes a specific snapshot by UID. +* **Delete All**: Deletes snapshots based on a strategy: either all snapshots for the pod, or filtered by grouping labels. +* **Cleanup**: Ensures that manual trigger resources are cleanly deleted when the sandbox context exits. ## Usage Example @@ -59,7 +60,7 @@ This file, located in the parent directory (`clients/python/agentic-sandbox-clie * Takes a snapshot (`test-snapshot-20`) after ~20 seconds. 2. **Phase 2: Restoring from Recent Snapshot**: * Restores a sandbox from the second snapshot. - * Verifies that sandbox has been restored from the recent snapshot. + * Verifies that the sandbox has been restored from the recent snapshot. ### Prerequisites @@ -79,7 +80,7 @@ This file, located in the parent directory (`clients/python/agentic-sandbox-clie * For detailed setup instructions, refer to the [GKE Pod Snapshots public documentation](https://docs.cloud.google.com/kubernetes-engine/docs/how-to/pod-snapshots). * Ensure a GCS bucket is configured to store the pod snapshot states and that the necessary IAM permissions are applied. -4. **CRDs**: `PodSnapshotStorageConfig`, `PodSnapshotPolicy` CRDs must be applied. `PodSnapshotPolicy` should specify the selector match labels. +4. **CRDs**: `PodSnapshotStorageConfig`, `PodSnapshotPolicy` CRDs must be applied. `PodSnapshotPolicy` should specify the selector match labels. (Note: For the test file to work, `maxSnapshotCountPerGroup` in `PodSnapshotPolicy` must be set to 2 or more, and the grouping labels must include `tenant-id` and `user-id`.) 5. **Sandbox Template**: A `SandboxTemplate` (e.g., `python-counter-template`) with runtime gVisor, appropriate KSA and label that matches that selector label in `PodSnapshotPolicy` must be available in the cluster. diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/snapshot_engine.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/snapshot_engine.py index 0765974bf..2322d2ba7 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/snapshot_engine.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/snapshot_engine.py @@ -15,18 +15,20 @@ import logging import uuid import time -from typing import Callable +from typing import Callable, Literal from datetime import datetime, timezone from kubernetes.client import ApiException -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict, ValidationError from k8s_agent_sandbox.constants import ( + PODSNAPSHOT_POD_NAME_LABEL, + PODSNAPSHOT_PLURAL, PODSNAPSHOT_API_GROUP, PODSNAPSHOT_API_VERSION, PODSNAPSHOTMANUALTRIGGER_API_KIND, PODSNAPSHOTMANUALTRIGGER_PLURAL, ) -from .utils import wait_for_snapshot_to_be_completed +from .utils import wait_for_snapshot_to_be_completed, wait_for_snapshot_deletion SNAPSHOT_SUCCESS_CODE = 0 SNAPSHOT_ERROR_CODE = 1 @@ -36,6 +38,7 @@ class SnapshotResponse(BaseModel): """Structured response for snapshot operations.""" + success: bool trigger_name: str snapshot_uid: str | None @@ -44,6 +47,42 @@ class SnapshotResponse(BaseModel): error_code: int +class SnapshotDetail(BaseModel): + """Detailed information about a snapshot.""" + + snapshot_uid: str + source_pod: str + creation_timestamp: str | None + status: str + + +class ListSnapshotResult(BaseModel): + """Result of a list snapshots operation.""" + + success: bool + snapshots: list[SnapshotDetail] + error_reason: str + error_code: int + + +class DeleteSnapshotResult(BaseModel): + """Result of a delete snapshot operation.""" + + success: bool + deleted_snapshots: list[str] + error_reason: str + error_code: int + + +class SnapshotFilter(BaseModel): + """Filter for listing snapshots.""" + + model_config = ConfigDict(extra="forbid") + + ready_only: bool = True + grouping_labels: dict[str, str] | None = None + + class SnapshotEngine: """Engine for managing Sandbox snapshots.""" @@ -58,7 +97,9 @@ def __init__( self.get_pod_name_func = get_pod_name_func self.created_manual_triggers = [] - def create(self, trigger_name: str, podsnapshot_timeout: int = 180) -> SnapshotResponse: + def create( + self, trigger_name: str, podsnapshot_timeout: int = 180 + ) -> SnapshotResponse: """ Creates a snapshot of the Sandbox. """ @@ -83,12 +124,14 @@ def create(self, trigger_name: str, podsnapshot_timeout: int = 180) -> SnapshotR } try: - pod_snapshot_manual_trigger_cr = self.k8s_helper.custom_objects_api.create_namespaced_custom_object( - group=PODSNAPSHOT_API_GROUP, - version=PODSNAPSHOT_API_VERSION, - namespace=self.namespace, - plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, - body=manifest, + pod_snapshot_manual_trigger_cr = ( + self.k8s_helper.custom_objects_api.create_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + body=manifest, + ) ) self.created_manual_triggers.append(trigger_name) except ApiException as e: @@ -108,10 +151,12 @@ def create(self, trigger_name: str, podsnapshot_timeout: int = 180) -> SnapshotR error_reason=error_message, error_code=SNAPSHOT_ERROR_CODE, ) - + try: # Start watching from the version we just created to avoid missing updates - resource_version = pod_snapshot_manual_trigger_cr.get("metadata", {}).get("resourceVersion") + resource_version = pod_snapshot_manual_trigger_cr.get("metadata", {}).get( + "resourceVersion" + ) snapshot_result = wait_for_snapshot_to_be_completed( k8s_helper=self.k8s_helper, namespace=self.namespace, @@ -211,3 +256,275 @@ def delete_manual_triggers(self, max_retries: int = 3): f"after {max_retries} attempts: {', '.join(self.created_manual_triggers)}. " "These resources may be leaked in Kubernetes and require manual cleanup." ) + + def list( + self, filter_by: SnapshotFilter | dict | None = None + ) -> ListSnapshotResult: + """ + Checks for existing snapshots matching the grouping labels associated with the sandbox. + Returns a ListSnapshotResult containing valid snapshots sorted by creation timestamp (newest first). + + filter_by: Structure containing filters (status and grouping_labels). + """ + if filter_by is None: + filter_by = SnapshotFilter() + elif isinstance(filter_by, dict): + try: + filter_by = SnapshotFilter(**filter_by) + except ValidationError as e: + logger.error(f"Invalid filter parameters: {e}") + return ListSnapshotResult( + success=False, + snapshots=[], + error_reason=f"Invalid filter parameters: {e}", + error_code=SNAPSHOT_ERROR_CODE, + ) + + valid_snapshots = [] + pod_name = self.get_pod_name_func() + + selectors = [] + if not pod_name: + logger.warning("Pod name not found.") + return ListSnapshotResult( + success=False, + snapshots=[], + error_reason="Pod name not found.", + error_code=SNAPSHOT_ERROR_CODE, + ) + + selectors.append(f"{PODSNAPSHOT_POD_NAME_LABEL}={pod_name}") + + if filter_by.grouping_labels: + for k, v in filter_by.grouping_labels.items(): + selectors.append(f"{k}={v}") + + label_selector = ",".join(selectors) + + logger.info(f"Listing snapshots with label selector: {label_selector}") + try: + # Fetch the PodSnapshots using label selector directly + response = self.k8s_helper.custom_objects_api.list_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOT_PLURAL, + label_selector=label_selector, + ) + + for snapshot in response.get("items") or []: + status = snapshot.get("status") or {} + conditions = status.get("conditions") or [] + metadata = snapshot.get("metadata") or {} + + # Check for Ready=True + is_ready = False + for cond in conditions: + if cond.get("type") == "Ready" and cond.get("status") == "True": + is_ready = True + break + # Skip if only ready snapshots are requested + if filter_by.ready_only and not is_ready: + continue + + try: + valid_snapshots.append( + SnapshotDetail( + snapshot_uid=metadata.get("name"), + source_pod=metadata.get("labels", {}).get( + PODSNAPSHOT_POD_NAME_LABEL, "Unknown" + ), + creation_timestamp=metadata.get("creationTimestamp"), + status="Ready" if is_ready else "NotReady", + ) + ) + except ValidationError as e: + logger.warning( + f"Skipping malformed snapshot {metadata.get('name', 'Unknown')}: {e}" + ) + continue + except ApiException as e: + logger.error(f"Failed to list PodSnapshots: {e}") + return ListSnapshotResult( + success=False, + snapshots=[], + error_reason=f"Failed to list PodSnapshots: {e}", + error_code=SNAPSHOT_ERROR_CODE, + ) + except Exception as e: + logger.exception( + f"Unexpected error during list snapshots for filter '{filter_by}': {e}" + ) + return ListSnapshotResult( + success=False, + snapshots=[], + error_reason=f"Unexpected error: {e}", + error_code=SNAPSHOT_ERROR_CODE, + ) + + if not valid_snapshots: + logger.info("No snapshots found matching criteria.") + return ListSnapshotResult( + success=True, + snapshots=[], + error_reason="", + error_code=SNAPSHOT_SUCCESS_CODE, + ) + + # Sort snapshots by creation timestamp descending + valid_snapshots.sort(key=lambda x: x.creation_timestamp or "", reverse=True) + logger.info(f"Found {len(valid_snapshots)} snapshots.") + return ListSnapshotResult( + success=True, + snapshots=valid_snapshots, + error_reason="", + error_code=SNAPSHOT_SUCCESS_CODE, + ) + + def _execute_deletion( + self, + snapshot_uid: str | None = None, + scope: str | None = None, + labels: dict | None = None, + ) -> DeleteSnapshotResult: + """Helper method to execute deletion of snapshots.""" + snapshots_to_delete = [] + + if snapshot_uid: + snapshots_to_delete.append(snapshot_uid) + elif scope == "global": + logger.info("Deleting ALL snapshots for this pod.") + snapshots_result = self.list(filter_by={"ready_only": False}) + if not snapshots_result.success: + return DeleteSnapshotResult( + success=False, + deleted_snapshots=[], + error_reason=f"Failed to list snapshots before deletion: {snapshots_result.error_reason}", + error_code=SNAPSHOT_ERROR_CODE, + ) + snapshots_to_delete = [s.snapshot_uid for s in snapshots_result.snapshots] + elif labels: + logger.info(f"Deleting snapshots matching labels: {labels}") + snapshots_result = self.list( + filter_by={"grouping_labels": labels, "ready_only": False} + ) + if not snapshots_result.success: + return DeleteSnapshotResult( + success=False, + deleted_snapshots=[], + error_reason=f"Failed to list snapshots before deletion: {snapshots_result.error_reason}", + error_code=SNAPSHOT_ERROR_CODE, + ) + snapshots_to_delete = [s.snapshot_uid for s in snapshots_result.snapshots] + + if not snapshots_to_delete: + logger.info("No snapshots found matching criteria to delete.") + return DeleteSnapshotResult( + success=True, + deleted_snapshots=[], + error_reason="", + error_code=SNAPSHOT_SUCCESS_CODE, + ) + + deleted_snapshots = [] + errors = [] + for uid in snapshots_to_delete: + try: + logger.info(f"Deleting PodSnapshot '{uid}'...") + delete_resp = ( + self.k8s_helper.custom_objects_api.delete_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOT_PLURAL, + name=uid, + ) + ) + logger.info( + f"PodSnapshot '{uid}' deletion requested. Waiting for confirmation..." + ) + + resource_version = None + if isinstance(delete_resp, dict): + resource_version = delete_resp.get("metadata", {}).get( + "resourceVersion" + ) + + if wait_for_snapshot_deletion( + k8s_helper=self.k8s_helper, + namespace=self.namespace, + snapshot_uid=uid, + resource_version=resource_version, + ): + deleted_snapshots.append(uid) + else: + msg = f"Timed out waiting for confirmation of deletion for snapshot '{uid}'" + logger.error(msg) + errors.append(msg) + except ApiException as e: + if e.status == 404: + logger.info( + f"PodSnapshot '{uid}' not found in K8s (already deleted?)." + ) + else: + msg = f"Failed to delete PodSnapshot '{uid}': {e}" + logger.error(msg) + errors.append(msg) + except Exception as e: + msg = f"Unexpected error deleting PodSnapshot '{uid}': {e}" + logger.exception(msg) + errors.append(msg) + + logger.info( + f"Snapshot deletion process completed. Deleted {len(deleted_snapshots)} snapshots." + ) + + if errors: + error_msg = "; ".join(errors) + if deleted_snapshots: + error_msg = f"Partial failure: deleted {len(deleted_snapshots)}/{len(snapshots_to_delete)} snapshots. Errors: {error_msg}" + return DeleteSnapshotResult( + success=False, + deleted_snapshots=deleted_snapshots, + error_reason=error_msg, + error_code=SNAPSHOT_ERROR_CODE, + ) + + return DeleteSnapshotResult( + success=True, + deleted_snapshots=deleted_snapshots, + error_reason="", + error_code=SNAPSHOT_SUCCESS_CODE, + ) + + def delete(self, snapshot_uid: str) -> DeleteSnapshotResult: + """Delete a single snapshot by UID.""" + return self._execute_deletion(snapshot_uid=snapshot_uid) + + def delete_all( + self, + delete_by: Literal["all", "labels"] = "all", + label_value: dict[str, str] | None = None, + ) -> DeleteSnapshotResult: + """Deletes snapshots based on a specific strategy. + + Args: + delete_by: The criteria to use ('all', 'labels'). + label_value: The value associated with the criteria (e.g., a dict + for labels). + """ + match delete_by: + case "all": + logger.info("Deleting every snapshot for this pod...") + return self._execute_deletion(scope="global") + + case "labels": + if not isinstance(label_value, dict): + raise ValueError( + "label_value must be a dict when deleting by labels" + ) + logger.info(f"Deleting snapshots matching labels: {label_value}") + return self._execute_deletion(labels=label_value) + + case _: + raise ValueError(f"Unsupported deletion strategy: {delete_by}") diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/test/unit/test_sandbox_with_snapshot_support.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/test/unit/test_sandbox_with_snapshot_support.py index 683038519..db3dbee9f 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/test/unit/test_sandbox_with_snapshot_support.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/test/unit/test_sandbox_with_snapshot_support.py @@ -23,22 +23,30 @@ SNAPSHOT_ERROR_CODE, ) from k8s_agent_sandbox.constants import ( + PODSNAPSHOT_POD_NAME_LABEL, PODSNAPSHOT_API_GROUP, PODSNAPSHOT_API_VERSION, PODSNAPSHOTMANUALTRIGGER_PLURAL, POD_NAME_ANNOTATION, + PODSNAPSHOT_PLURAL, +) +from k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine import ( + ListSnapshotResult, + SnapshotDetail, + DeleteSnapshotResult, ) logger = logging.getLogger(__name__) + class TestSandboxWithSnapshotSupport(unittest.TestCase): - @patch('k8s_agent_sandbox.sandbox.SandboxConnector') - @patch('k8s_agent_sandbox.sandbox.create_tracer_manager') - @patch('k8s_agent_sandbox.sandbox.CommandExecutor') - @patch('k8s_agent_sandbox.sandbox.Filesystem') + @patch("k8s_agent_sandbox.sandbox.SandboxConnector") + @patch("k8s_agent_sandbox.sandbox.create_tracer_manager") + @patch("k8s_agent_sandbox.sandbox.CommandExecutor") + @patch("k8s_agent_sandbox.sandbox.Filesystem") def setUp(self, mock_fs, mock_ce, mock_ctm, mock_conn): mock_ctm.return_value = (None, None) - + self.mock_k8s_helper = MagicMock() # Create SandboxWithSnapshotSupport @@ -77,7 +85,9 @@ def test_snapshots_create_success(self, mock_watch_cls): mock_watch.stream.return_value = [mock_event] mock_created_obj = {"metadata": {"resourceVersion": "123"}, "status": {}} - self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.return_value = mock_created_obj + self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.return_value = ( + mock_created_obj + ) result = self.engine.create("test-trigger") @@ -89,9 +99,11 @@ def test_snapshots_create_success(self, mock_watch_cls): self.assertIn("test-trigger", result.trigger_name) self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.assert_called_once() - _, kwargs = self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.call_args - self.assertEqual(kwargs['group'], PODSNAPSHOT_API_GROUP) - self.assertEqual(kwargs['body']['spec']['targetPod'], "test-pod") + _, kwargs = ( + self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.call_args + ) + self.assertEqual(kwargs["group"], PODSNAPSHOT_API_GROUP) + self.assertEqual(kwargs["body"]["spec"]["targetPod"], "test-pod") mock_watch.stream.assert_called_once() _, stream_kwargs = mock_watch.stream.call_args @@ -143,7 +155,9 @@ def test_snapshots_create_processed_retry(self, mock_watch_cls): self.assertEqual(result.snapshot_uid, "snapshot-uid-retry") def test_snapshots_create_api_exception(self): - self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.side_effect = ApiException("Create failed") + self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.side_effect = ApiException( + "Create failed" + ) result = self.engine.create("test-trigger") @@ -191,7 +205,10 @@ def test_snapshots_create_watch_failure(self, mock_watch_cls): self.assertFalse(result.success) self.assertEqual(result.error_code, 1) - self.assertIn("Snapshot failed. Condition: Snapshot failed due to timeout", result.error_reason) + self.assertIn( + "Snapshot failed. Condition: Snapshot failed due to timeout", + result.error_reason, + ) @patch("k8s_agent_sandbox.gke_extensions.snapshots.utils.watch.Watch") def test_snapshots_create_watch_error(self, mock_watch_cls): @@ -244,7 +261,9 @@ def test_snapshots_create_generic_exception(self, mock_watch_cls): self.assertIn("Server error: Something went wrong", result.error_reason) def test_snapshots_create_invalid_name(self): - self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.side_effect = ApiException("Invalid value: 'Test_Trigger'") + self.mock_k8s_helper.custom_objects_api.create_namespaced_custom_object.side_effect = ApiException( + "Invalid value: 'Test_Trigger'" + ) result = self.engine.create("Test_Trigger") @@ -259,7 +278,8 @@ def test_delete_manual_triggers(self): self.engine.delete_manual_triggers() self.assertEqual( - self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.call_count, 2 + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.call_count, + 2, ) calls = [ @@ -282,7 +302,7 @@ def test_delete_manual_triggers(self): calls, any_order=True ) self.assertEqual(len(self.engine.created_manual_triggers), 0) - + def test_is_restored_from_snapshot_success(self): """Test successful identification of restore from snapshot.""" logging.info("Starting test_is_restored_from_snapshot_success...") @@ -447,5 +467,476 @@ def test_is_restored_from_snapshot_generic_exception(self): self.assertIn("Unexpected error", result.error_reason) logging.info("Finished test_is_restored_from_snapshot_generic_exception.") + def test_snapshots_list_success(self): + """Test list snapshots returning properly formatted objects.""" + logging.info("Starting test_snapshots_list_success...") + + mock_response = { + "items": [ + { + "metadata": { + "name": "snap-1", + "uid": "uid-1", + "creationTimestamp": "2023-01-02T00:00:00Z", + "labels": {PODSNAPSHOT_POD_NAME_LABEL: "test-pod"}, + }, + "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + }, + { + "metadata": { + "name": "snap-2", + "uid": "uid-2", + "creationTimestamp": "2023-01-01T00:00:00Z", + "labels": {PODSNAPSHOT_POD_NAME_LABEL: "test-pod"}, + }, + "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + }, + { + "metadata": { + "name": "snap-not-ready", + "uid": "uid-3", + "creationTimestamp": "2023-01-03T00:00:00Z", + }, + "status": {"conditions": [{"type": "Ready", "status": "False"}]}, + }, + ] + } + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.return_value = ( + mock_response + ) + + result = self.engine.list( + filter_by={"grouping_labels": {"test-label": "test-value"}} + ) + + self.assertTrue(result.success) + self.assertEqual(len(result.snapshots), 2) + # Verify it sorted by creationTimestamp newest first + self.assertEqual(result.snapshots[0].snapshot_uid, "snap-1") + self.assertEqual(result.snapshots[1].snapshot_uid, "snap-2") + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.assert_called_once_with( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace="test-ns", + plural=PODSNAPSHOT_PLURAL, + label_selector=f"{PODSNAPSHOT_POD_NAME_LABEL}=test-pod,test-label=test-value", + ) + + def test_snapshots_list_filter_empty(self): + """Test list snapshots with filter_by={} includes non-ready snapshots.""" + mock_response = { + "items": [ + { + "metadata": { + "name": "ready-snap", + "uid": "uid1", + "creationTimestamp": "2023-01-01T00:00:00Z", + }, + "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + }, + { + "metadata": { + "name": "not-ready-snap", + "uid": "uid2", + "creationTimestamp": "2023-01-02T00:00:00Z", + }, + "status": {"conditions": [{"type": "Ready", "status": "False"}]}, + }, + ] + } + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.return_value = ( + mock_response + ) + + result = self.engine.list(filter_by={"ready_only": False}) + self.assertTrue(result.success) + self.assertEqual(len(result.snapshots), 2) + # Sorted by creationTimestamp descending + self.assertEqual(result.snapshots[0].snapshot_uid, "not-ready-snap") + self.assertEqual(result.snapshots[1].snapshot_uid, "ready-snap") + + def test_snapshots_list_filter_incorrect_arguments(self): + """Test list snapshots with a incorrect arguments for filter_by.""" + mock_response = { + "items": [ + { + "metadata": { + "name": "ready-snap", + "uid": "uid1", + "creationTimestamp": "2023-01-01T00:00:00Z", + }, + "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + }, + { + "metadata": { + "name": "not-ready-snap", + "uid": "uid2", + "creationTimestamp": "2023-01-02T00:00:00Z", + }, + "status": {"conditions": [{"type": "Ready", "status": "False"}]}, + }, + ] + } + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.return_value = ( + mock_response + ) + + # Passing a random dict should fail because extra fields are forbidden. + result = self.engine.list(filter_by={"random_key": "random_value"}) + self.assertFalse(result.success) + self.assertEqual(len(result.snapshots), 0) + self.assertIn("Invalid filter parameters", result.error_reason) + + def test_snapshots_list_none_timestamp(self): + """Test list snapshots doesn't crash when creationTimestamp is None.""" + mock_response = { + "items": [ + { + "metadata": { + "name": "snap-1", + "uid": "uid-1", + "creationTimestamp": None, # Test Case: None + "labels": {PODSNAPSHOT_POD_NAME_LABEL: "test-pod"}, + }, + "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + }, + { + "metadata": { + "name": "snap-2", + "uid": "uid-2", + "creationTimestamp": "2023-01-01T00:00:00Z", + "labels": {PODSNAPSHOT_POD_NAME_LABEL: "test-pod"}, + }, + "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + }, + ] + } + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.return_value = ( + mock_response + ) + + result = self.engine.list() + + self.assertTrue(result.success) + self.assertEqual(len(result.snapshots), 2) + # Verify it sorted correctly even with None (None/empty string should come last in reverse sort) + self.assertEqual(result.snapshots[0].snapshot_uid, "snap-2") + self.assertEqual(result.snapshots[1].snapshot_uid, "snap-1") + + def test_snapshots_list_no_results(self): + """Test list snapshots returns successfully with empty list if none found.""" + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.return_value = { + "items": [] + } + result = self.engine.list() + self.assertTrue(result.success) + self.assertEqual(len(result.snapshots), 0) + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.assert_called_once_with( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace="test-ns", + plural=PODSNAPSHOT_PLURAL, + label_selector=f"{PODSNAPSHOT_POD_NAME_LABEL}=test-pod", + ) + + def test_snapshots_list_no_pod_name(self): + """Test list snapshots fails when pod name is missing.""" + self.sandbox.get_pod_name.return_value = None + result = self.engine.list() + self.assertFalse(result.success) + self.assertEqual(result.error_code, SNAPSHOT_ERROR_CODE) + self.assertIn("Pod name not found", result.error_reason) + + def test_snapshots_list_api_exception(self): + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.side_effect = ApiException( + 500, "Internal Server Error" + ) + result = self.engine.list() + self.assertFalse(result.success) + self.assertIn("Failed to list PodSnapshots", result.error_reason) + + def test_snapshots_list_generic_exception(self): + self.mock_k8s_helper.custom_objects_api.list_namespaced_custom_object.side_effect = ValueError( + "Unexpected" + ) + result = self.engine.list() + self.assertFalse(result.success) + self.assertIn("Unexpected error", result.error_reason) + + @patch( + "k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine.wait_for_snapshot_deletion" + ) + def test_snapshots_delete_uid_provided(self, mock_wait): + """Test delete snapshots when a specific snapshot UID is provided.""" + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.return_value = ( + {} + ) + + result = self.engine.delete(snapshot_uid="target-snap") + self.assertTrue(result.success) + self.assertEqual(result.deleted_snapshots, ["target-snap"]) + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.assert_called_once_with( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace="test-ns", + plural=PODSNAPSHOT_PLURAL, + name="target-snap", + ) + mock_wait.assert_called_once_with( + k8s_helper=self.mock_k8s_helper, + namespace="test-ns", + snapshot_uid="target-snap", + resource_version=None, + ) + + def test_snapshots_delete_all_invalid_strategy(self): + """Test delete_all raises ValueError for unsupported strategy.""" + with self.assertRaises(ValueError) as context: + self.engine.delete_all(delete_by="invalid-strategy") + self.assertIn( + "Unsupported deletion strategy: invalid-strategy", + str(context.exception), + ) + + @patch( + "k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine.wait_for_snapshot_deletion" + ) + def test_snapshots_delete_with_list(self, mock_wait): + """Test delete snapshots fetching list of snapshots when uid is not provided.""" + + with patch.object(self.engine, "list") as mock_list: + mock_list.return_value = ListSnapshotResult( + success=True, + snapshots=[ + SnapshotDetail( + snapshot_uid="snap-a", + source_pod="test-pod", + creation_timestamp="2023-01-01T00:00:00Z", + status="Ready", + ) + ], + error_reason="", + error_code=SNAPSHOT_SUCCESS_CODE, + ) + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.return_value = ( + {} + ) + + result = self.engine.delete_all( + delete_by="labels", label_value={"foo": "bar"} + ) + + self.assertTrue(result.success) + self.assertEqual(result.deleted_snapshots, ["snap-a"]) + mock_list.assert_called_once_with( + filter_by={"grouping_labels": {"foo": "bar"}, "ready_only": False} + ) + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.assert_called_once_with( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace="test-ns", + plural=PODSNAPSHOT_PLURAL, + name="snap-a", + ) + mock_wait.assert_called_once_with( + k8s_helper=self.mock_k8s_helper, + namespace="test-ns", + snapshot_uid="snap-a", + resource_version=None, + ) + + @patch( + "k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine.wait_for_snapshot_deletion" + ) + def test_snapshots_delete_api_exception(self, mock_wait): + """Test delete snapshots gracefully handling failure on one of the items.""" + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.side_effect = ApiException( + 500, "Internal error" + ) + result = self.engine.delete(snapshot_uid="target-snap") + self.assertFalse(result.success) + self.assertEqual(result.deleted_snapshots, []) + self.assertIn("Failed to delete PodSnapshot", result.error_reason) + + @patch( + "k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine.wait_for_snapshot_deletion" + ) + def test_snapshots_delete_partial_failure(self, mock_wait): + """Test delete snapshots continuing loop and aggregating errors on partial failure.""" + + # Mock list to return 3 snapshots + with patch.object(self.engine, "list") as mock_list: + mock_list.return_value = ListSnapshotResult( + success=True, + snapshots=[ + SnapshotDetail( + snapshot_uid="snap-1", + source_pod="pod", + creation_timestamp="ts", + status="Ready", + ), + SnapshotDetail( + snapshot_uid="snap-2", + source_pod="pod", + creation_timestamp="ts", + status="Ready", + ), + SnapshotDetail( + snapshot_uid="snap-3", + source_pod="pod", + creation_timestamp="ts", + status="Ready", + ), + ], + error_reason="", + error_code=0, + ) + + # Mock delete calls: + # snap-1: Success + # snap-2: ApiException (500) + # snap-3: Success + def mock_delete(group, version, namespace, plural, name): + if name == "snap-2": + raise ApiException(500, "Internal error") + return {} + + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.side_effect = ( + mock_delete + ) + + result = self.engine.delete_all() + + self.assertFalse(result.success) + self.assertEqual(result.deleted_snapshots, ["snap-1", "snap-3"]) + self.assertIn("Failed to delete PodSnapshot 'snap-2'", result.error_reason) + self.assertEqual( + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.call_count, + 3, + ) + # Verify wait was called for successful deletions + self.assertEqual(mock_wait.call_count, 2) + mock_wait.assert_has_calls( + [ + call( + k8s_helper=self.mock_k8s_helper, + namespace="test-ns", + snapshot_uid="snap-1", + resource_version=None, + ), + call( + k8s_helper=self.mock_k8s_helper, + namespace="test-ns", + snapshot_uid="snap-3", + resource_version=None, + ), + ], + any_order=True, + ) + + @patch( + "k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine.wait_for_snapshot_deletion" + ) + def test_snapshots_delete_generic_exception(self, mock_wait): + """Test delete snapshots handling generic Exception during deletion.""" + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.side_effect = Exception( + "Generic error" + ) + + result = self.engine.delete(snapshot_uid="target-snap") + self.assertFalse(result.success) + self.assertIn("Unexpected error deleting PodSnapshot", result.error_reason) + + @patch( + "k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine.wait_for_snapshot_deletion" + ) + def test_snapshots_delete_api_exception_404(self, mock_wait): + """Test delete snapshots interpreting 404 as successful (already deleted).""" + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.side_effect = ApiException( + 404, "Not Found" + ) + result = self.engine.delete(snapshot_uid="target-snap") + self.assertTrue(result.success) + self.assertEqual(result.deleted_snapshots, []) + mock_wait.assert_not_called() + + def test_snapshots_delete_list_fail(self): + """Test delete snapshots returning early false if list query fails.""" + with patch.object(self.engine, "list") as mock_list: + mock_list.return_value = ListSnapshotResult( + success=False, + snapshots=[], + error_reason="Could not connect", + error_code=SNAPSHOT_ERROR_CODE, + ) + result = self.engine.delete_all() + self.assertFalse(result.success) + self.assertIn( + "Failed to list snapshots before deletion", result.error_reason + ) + self.assertEqual(result.deleted_snapshots, []) + + def test_snapshots_delete_all(self): + """Test delete_all calls _execute_deletion with scope='global'.""" + with patch.object(self.engine, "_execute_deletion") as mock_execute: + mock_execute.return_value = DeleteSnapshotResult( + success=True, + deleted_snapshots=["snap-x"], + error_reason="", + error_code=0, + ) + self.engine.delete_all() + mock_execute.assert_called_once_with(scope="global") + + def test_snapshots_delete_all_by_labels(self): + """Test delete_all calls _execute_deletion with labels.""" + with patch.object(self.engine, "_execute_deletion") as mock_execute: + mock_execute.return_value = DeleteSnapshotResult( + success=True, + deleted_snapshots=["snap-x"], + error_reason="", + error_code=0, + ) + self.engine.delete_all(delete_by="labels", label_value={"foo": "bar"}) + mock_execute.assert_called_once_with(labels={"foo": "bar"}) + + def test_snapshots_delete_empty_fails(self): + """Test delete raises TypeError if snapshot_uid is missing.""" + with self.assertRaises(TypeError): + self.engine.delete() + + @patch( + "k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine.wait_for_snapshot_deletion" + ) + def test_snapshots_delete_timeout(self, mock_wait): + """Test delete snapshots handling timeout in wait.""" + mock_wait.return_value = False + + self.mock_k8s_helper.custom_objects_api.delete_namespaced_custom_object.return_value = ( + {} + ) + + result = self.engine.delete(snapshot_uid="target-snap") + + self.assertFalse(result.success) + self.assertEqual(result.deleted_snapshots, []) + self.assertIn("Timed out waiting for confirmation", result.error_reason) + + def test_snapshots_delete_all_no_snapshots_found(self): + """Test delete_all returns success when no snapshots are found.""" + with patch.object(self.engine, "list") as mock_list: + mock_list.return_value = ListSnapshotResult( + success=True, + snapshots=[], + error_reason="", + error_code=0, + ) + + result = self.engine.delete_all() + + self.assertTrue(result.success) + self.assertEqual(result.deleted_snapshots, []) + + if __name__ == "__main__": unittest.main() diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/utils.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/utils.py index b87904082..6c2340ca2 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/utils.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/snapshots/utils.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import time from typing import Any from kubernetes.client import ApiException from kubernetes import watch @@ -20,6 +21,7 @@ from k8s_agent_sandbox.constants import ( PODSNAPSHOT_API_GROUP, PODSNAPSHOT_API_VERSION, + PODSNAPSHOT_PLURAL, PODSNAPSHOTMANUALTRIGGER_PLURAL, ) @@ -28,14 +30,18 @@ SNAPSHOT_SUCCESS_CODE = 0 SNAPSHOT_ERROR_CODE = 1 + class RestoreCheckResult(BaseModel): """Result of a restore check operation.""" + success: bool error_reason: str error_code: int + class SnapshotResult(BaseModel): """Result of a snapshot processing operation.""" + snapshot_uid: str snapshot_timestamp: str @@ -57,10 +63,15 @@ def _get_snapshot_info(snapshot_obj: dict[str, Any]) -> SnapshotResult: snapshot_uid=snapshot_uid, snapshot_timestamp=snapshot_timestamp, ) - elif condition.get("status") == "False" and condition.get("reason") in [ - "Failed", - "Error", - ]: + elif ( + condition.get("type") == "Triggered" + and condition.get("status") == "False" + and condition.get("reason") + in [ + "Failed", + "Error", + ] + ): raise RuntimeError( f"Snapshot failed. Condition: {condition.get('message', 'Unknown error')}" ) @@ -109,9 +120,7 @@ def wait_for_snapshot_to_be_completed( # Continue watching if snapshot is not yet complete continue elif event["type"] == "ERROR": - logger.error( - f"Snapshot watch received error event: {event['object']}" - ) + logger.error(f"Snapshot watch received error event: {event['object']}") raise RuntimeError(f"Snapshot watch error: {event['object']}") elif event["type"] == "DELETED": logger.error( @@ -130,6 +139,7 @@ def wait_for_snapshot_to_be_completed( f"Snapshot manual trigger '{trigger_name}' was not processed within {podsnapshot_timeout} seconds." ) + def check_pod_restored_from_snapshot( k8s_helper, namespace: str, @@ -195,4 +205,63 @@ def check_pod_restored_from_snapshot( success=False, error_reason=f"Unexpected error: {e}", error_code=SNAPSHOT_ERROR_CODE, - ) \ No newline at end of file + ) + + +def wait_for_snapshot_deletion( + k8s_helper, + namespace: str, + snapshot_uid: str, + timeout: int = 60, + resource_version: str | None = None, +) -> bool: + """Waits for the PodSnapshot to be deleted from the cluster.""" + # Check if already deleted + try: + k8s_helper.custom_objects_api.get_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=namespace, + plural=PODSNAPSHOT_PLURAL, + name=snapshot_uid, + ) + except ApiException as e: + if e.status == 404: + logger.info(f"PodSnapshot '{snapshot_uid}' already deleted.") + return True + raise + + w = watch.Watch() + logger.info(f"Waiting for PodSnapshot '{snapshot_uid}' to be deleted...") + + kwargs = {} + if resource_version: + kwargs["resource_version"] = resource_version + + try: + for event in w.stream( + func=k8s_helper.custom_objects_api.list_namespaced_custom_object, + namespace=namespace, + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + plural=PODSNAPSHOT_PLURAL, + field_selector=f"metadata.name={snapshot_uid}", + timeout_seconds=timeout, + **kwargs, + ): + if event["type"] == "DELETED": + logger.info(f"PodSnapshot '{snapshot_uid}' confirmed deleted.") + return True + elif event["type"] == "ERROR": + logger.error( + f"Snapshot deletion watch received error event: {event['object']}" + ) + raise RuntimeError(f"Snapshot watch error: {event['object']}") + except Exception as e: + logger.error(f"Error watching snapshot deletion: {e}") + raise + finally: + w.stop() + + logger.warning(f"Timed out waiting for PodSnapshot '{snapshot_uid}' to be deleted.") + return False diff --git a/clients/python/agentic-sandbox-client/python-counter-template.yaml b/clients/python/agentic-sandbox-client/python-counter-template.yaml index 2185acbc7..9589fcfd1 100644 --- a/clients/python/agentic-sandbox-client/python-counter-template.yaml +++ b/clients/python/agentic-sandbox-client/python-counter-template.yaml @@ -12,6 +12,8 @@ spec: metadata: labels: app: agent-sandbox-workload + tenant-id: "test-tenant" + user-id: "test-user" spec: serviceAccountName: sandbox-test runtimeClassName: gvisor diff --git a/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py b/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py index dc52fffdf..8140ce7c9 100644 --- a/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py +++ b/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py @@ -20,7 +20,9 @@ import time import logging from kubernetes import config -from k8s_agent_sandbox.gke_extensions.snapshots.podsnapshot_client import PodSnapshotSandboxClient +from k8s_agent_sandbox.gke_extensions.snapshots.podsnapshot_client import ( + PodSnapshotSandboxClient, +) from k8s_agent_sandbox.gke_extensions.snapshots.snapshot_engine import SnapshotResponse from k8s_agent_sandbox.models import ( SandboxDirectConnectionConfig, @@ -30,17 +32,17 @@ WAIT_TIME_SECONDS = 10 -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True +) + + def test_snapshot_response(snapshot_response: SnapshotResponse, snapshot_name: str): assert hasattr( snapshot_response, "trigger_name" ), "snapshot response missing 'trigger_name' attribute" print(f"Trigger Name: {snapshot_response.trigger_name}") - print(f"Snapshot UID: {snapshot_response.snapshot_uid}") - print(f"Success: {snapshot_response.success}") - print(f"Error Code: {snapshot_response.error_code}") - print(f"Error Reason: {snapshot_response.error_reason}") assert snapshot_response.trigger_name.startswith( snapshot_name @@ -72,8 +74,11 @@ def main( except config.ConfigException: config.load_kube_config() - first_snapshot_name = "test-snapshot-10" - second_snapshot_name = "test-snapshot-20" + first_snapshot_trigger_name = "test-snapshot-10" + second_snapshot_trigger_name = "test-snapshot-20" + + # grouping labels used in PodSnapshotPolicy to group snapshots - tenant-id and user-id + grouping_labels = {"tenant-id": "test-tenant", "user-id": "test-user"} client = None try: @@ -81,8 +86,7 @@ def main( if api_url: connection_config = SandboxDirectConnectionConfig( - api_url=api_url, - server_port=server_port + api_url=api_url, server_port=server_port ) else: connection_config = SandboxLocalTunnelConnectionConfig( @@ -92,35 +96,85 @@ def main( client = PodSnapshotSandboxClient(connection_config=connection_config) print("\n======= Testing Pod Snapshot Extension =======") - + sandbox = client.create_sandbox(template_name, namespace=namespace) time.sleep(WAIT_TIME_SECONDS) print( - f"Creating first pod snapshot '{first_snapshot_name}' after {WAIT_TIME_SECONDS} seconds..." + f"Creating first pod snapshot '{first_snapshot_trigger_name}' after {WAIT_TIME_SECONDS} seconds..." ) - snapshot_response = sandbox.snapshots.create(first_snapshot_name) - test_snapshot_response(snapshot_response, first_snapshot_name) + snapshot_response = sandbox.snapshots.create(first_snapshot_trigger_name) + test_snapshot_response(snapshot_response, first_snapshot_trigger_name) + first_snapshot_uid = snapshot_response.snapshot_uid + print(f"First snapshot UID: {first_snapshot_uid}") time.sleep(WAIT_TIME_SECONDS) print( - f"\nCreating second pod snapshot '{second_snapshot_name}' after {WAIT_TIME_SECONDS} seconds..." + f"\nCreating second pod snapshot '{second_snapshot_trigger_name}' after {WAIT_TIME_SECONDS} seconds..." ) - snapshot_response = sandbox.snapshots.create(second_snapshot_name) - test_snapshot_response(snapshot_response, second_snapshot_name) + snapshot_response = sandbox.snapshots.create(second_snapshot_trigger_name) + test_snapshot_response(snapshot_response, second_snapshot_trigger_name) recent_snapshot_uid = snapshot_response.snapshot_uid print(f"Recent snapshot UID: {recent_snapshot_uid}") # Wait a moment for the PodSnapshotPolicy controller's cache to recognize the new snapshot as the latest time.sleep(WAIT_TIME_SECONDS) - - print(f"\nChecking if sandbox was restored from snapshot '{recent_snapshot_uid}'...") + + print( + f"\nChecking if sandbox was restored from snapshot '{recent_snapshot_uid}'..." + ) restored_sandbox = client.create_sandbox(template_name, namespace=namespace) restore_result = restored_sandbox.is_restored_from_snapshot(recent_snapshot_uid) assert restore_result.success, restore_result.error_reason print("Pod was restored from the most recent snapshot.") + print(f"\nListing all snapshots for sandbox '{sandbox.sandbox_id}'...") + list_result = sandbox.snapshots.list( + filter_by={"grouping_labels": grouping_labels} + ) + assert list_result.success, list_result.error_reason + + for snap in list_result.snapshots: + print( + f"Snapshot UID: {snap.snapshot_uid}, Source Pod: {snap.source_pod}, Creation Time: {snap.creation_timestamp}" + ) + assert ( + len(list_result.snapshots) == 2 + ), f"Expected 2 snapshots, but got {len(list_result.snapshots)}" + assert ( + list_result.snapshots[0].snapshot_uid == recent_snapshot_uid + ), f"Expected most recent snapshot UID '{recent_snapshot_uid}', but got '{list_result.snapshots[0].snapshot_uid}'" + assert ( + list_result.snapshots[1].snapshot_uid == first_snapshot_uid + ), f"Expected older snapshot UID '{first_snapshot_uid}', but got '{list_result.snapshots[1].snapshot_uid}'" + + print( + f"\nDeleting snapshot '{recent_snapshot_uid}' of the sandbox '{sandbox.sandbox_id}'..." + ) + delete_result = sandbox.snapshots.delete(snapshot_uid=recent_snapshot_uid) + assert delete_result.success, delete_result.error_reason + assert ( + len(delete_result.deleted_snapshots) == 1 + ), f"Expected 1 deleted snapshot, but got {len(delete_result.deleted_snapshots)}" + assert ( + delete_result.deleted_snapshots[0] == recent_snapshot_uid + ), f"Expected deleted snapshot UID '{recent_snapshot_uid}', but got '{delete_result.deleted_snapshots[0]}''" + print(f"Snapshot '{recent_snapshot_uid}' deleted successfully.") + + print(f"\nDeleting all snapshots for sandbox '{sandbox.sandbox_id}'...") + delete_result = sandbox.snapshots.delete_all( + delete_by="labels", filter_value=grouping_labels + ) + assert delete_result.success, delete_result.error_reason + assert ( + len(delete_result.deleted_snapshots) == 1 + ), f"Expected 1 deleted snapshot, but got {len(delete_result.deleted_snapshots)}" + assert ( + delete_result.deleted_snapshots[0] == first_snapshot_uid + ), f"Expected deleted snapshot UID '{first_snapshot_uid}', but got '{delete_result.deleted_snapshots[0]}''" + print(f"Snapshot '{first_snapshot_uid}' deleted successfully.") + print("--- Pod Snapshot Test Passed! ---") except Exception as e: @@ -128,7 +182,7 @@ def main( finally: print("Cleaning up all sandboxes...") if client: - client.delete_all() + client.delete_all() print("\n--- Sandbox Client Test Finished ---")