diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py new file mode 100644 index 000000000..055684f7d --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/constants.py @@ -0,0 +1,38 @@ +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Constants for API Groups and Resources +GATEWAY_API_GROUP = "gateway.networking.k8s.io" +GATEWAY_API_VERSION = "v1" +GATEWAY_PLURAL = "gateways" + +CLAIM_API_GROUP = "extensions.agents.x-k8s.io" +CLAIM_API_VERSION = "v1alpha1" +CLAIM_PLURAL_NAME = "sandboxclaims" + +SANDBOX_API_GROUP = "agents.x-k8s.io" +SANDBOX_API_VERSION = "v1alpha1" +SANDBOX_PLURAL_NAME = "sandboxes" + +POD_NAME_ANNOTATION = "agents.x-k8s.io/pod-name" + +PODSNAPSHOT_API_GROUP = "podsnapshot.gke.io" +PODSNAPSHOT_API_VERSION = "v1alpha1" +PODSNAPSHOT_PLURAL = "podsnapshots" +PODSNAPSHOTMANUALTRIGGER_PLURAL = "podsnapshotmanualtriggers" +PODSNAPSHOT_API_KIND = "PodSnapshotManualTrigger" +PODSNAPSHOTPOLICY_PLURAL = "podsnapshotpolicies" + +SNAPSHOT_NAMESPACE_MANAGED = "gke-managed-pod-snapshots" +SNAPSHOT_AGENT = "pod-snapshot-agent" \ No newline at end of file diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/__init__.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/__init__.py new file mode 100644 index 000000000..9aad1555a --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .podsnapshot_client import PodSnapshotSandboxClient \ No newline at end of file diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot.md b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot.md new file mode 100644 index 000000000..6b61ccd21 --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot.md @@ -0,0 +1,104 @@ +# Agentic Sandbox Pod Snapshot Extension + +This directory contains the Python client extension for interacting with the Agentic Sandbox to manage Pod Snapshots. This extension allows you to trigger snapshots of a running sandbox and restore a new sandbox from the recently created snapshot. + +## `podsnapshot_client.py` + +This file defines the `PodSnapshotSandboxClient` class, which extend the base `SandboxClient` to provide snapshot capabilities. + +### `PodSnapshotSandboxClient` + +A specialized Sandbox client for interacting with the gke pod snapshot controller. + +### Key Features: + +* **`PodSnapshotSandboxClient(template_name: str, podsnapshot_timeout: int = 180, server_port: int = 8080, ...)`**: + * Initializes the client with optional podsnapshot timeout and server port. + * If snapshot exists, the pod snapshot controller restores from the most recent snapshot matching the label of the `SandboxTemplate`, otherwise creates a new `Sandbox`. +* **`snapshot_controller_ready(self) -> bool`**: + * Checks if the snapshot agent (GKE managed) is running and ready. +* **`snapshot(self, trigger_name: str) -> SnapshotResponse`**: + * Triggers a manual snapshot of the current sandbox pod by creating a `PodSnapshotManualTrigger` resource. + * The `trigger_name` is suffixed with unique hash. + * Waits for the snapshot to be processed. + * The pod snapshot controller creates a `PodSnapshot` resource automatically. + * Returns the SnapshotResponse object(success, error_code, error_reason, trigger_name, snapshot_uid). +* **`is_restored_from_snapshot(self, snapshot_uid: str) -> RestoreResult`**: + * Checks if the sandbox pod was restored from the specified snapshot. + * Verifies restoration by checking the 'PodRestored' condition in the pod status and confirming the message contains the expected snapshot UID. + * Returns RestoreResult object(success, error_code, error_reason). +* **`list_snapshots(self, policy_name: str, ready_only: bool = True) -> list | None`**: + * Lists valid snapshots found in the local metadata storage (`~/.snapshot_metadata/.snapshots.json`). + * Filters by `policy_name` and `ready_only` status (default: True). + * Returns a list of dictionaries containing snapshot details (id, source_pod, uid, creationTimestamp, status, policy_name) sorted by creation timestamp (newest first). +* **`delete_snapshots(self, snapshot_uid: str | None = None, policy_name: str | None = None) -> int`**: + * Deletes snapshots and their corresponding `PodSnapshotManualTrigger` resources. + * If `snapshot_uid` is provided, deletes that specific snapshot. + * If `policy_name` is provided, deletes all snapshots associated with that policy. + * If neither is provided, deletes **ALL** snapshots found in the local metadata. + * Cleans up local metadata after successful deletion from K8s. + * Returns the count of successfully deleted snapshots. +* **`__exit__(self)`**: + * Cleans up the `PodSnapshotManualTrigger` resources. + * Cleans up the `SandboxClaim` resources. + +### `SnapshotPersistenceManager` + +Manages local persistence of snapshot metadata in a secure directory. + +* **File Location**: `~/.snapshot_metadata/.snapshots.json` +* **Security**: Ensures the directory has `0o700` permissions and the file has `0o600` permissions. +* **Storage**: Metadata is stored as a JSON object keyed by `snapshot_uid`. +* **Functionality**: + * **`save_snapshot_metadata`**: Saves a snapshot record. + * **`delete_snapshot_metadata`**: Deletes a snapshot record by UID. + * **`_load_metadata`**: Loads and returns the metadata dictionary. + +## `test_podsnapshot_extension.py` + +This file, located in the parent directory (`clients/python/agentic-sandbox-client/`), contains an integration test script for the `PodSnapshotSandboxClient` extension. It verifies the snapshot and restore functionality. + +### Test Phases: + +1. **Phase 1: Starting Counter & Snapshotting**: + * Starts a sandbox with a counter application. + * Takes a snapshot (`test-snapshot-10`) after ~10 seconds. + * Takes a second snapshot (`test-snapshot-20`) after another ~10 seconds. +2. **Phase 2: Restoring from Recent Snapshot**: + * Restores a sandbox from the second snapshot. + * Verifies that sandbox has been restored from the recent snapshot. + +### Prerequisites + +1. **Python Virtual Environment**: + ```bash + python3 -m venv .venv + source .venv/bin/activate + ``` + +2. **Install Dependencies**: + ```bash + pip install kubernetes + pip install -e clients/python/agentic-sandbox-client/ + ``` + +3. **Pod Snapshot Controller**: The Pod Snapshot controller must be installed in a **GKE standard cluster** running with **gVisor**. + * For detailed setup instructions, refer to the [GKE Pod Snapshots public documentation](https://docs.cloud.google.com/kubernetes-engine/docs/how-to/pod-snapshots). + * Ensure a GCS bucket is configured to store the pod snapshot states and that the necessary IAM permissions are applied. + +4. **CRDs**: `PodSnapshotStorageConfig`, `PodSnapshotPolicy` CRDs must be applied. `PodSnapshotPolicy` should specify the selector match labels. + +5. **Sandbox Template**: A `SandboxTemplate` (e.g., `python-counter-template`) with runtime gVisor, appropriate KSA and label that matches that selector label in `PodSnapshotPolicy` must be available in the cluster. + +### Running Tests: + +To run the integration test, execute the script with the appropriate arguments: + +```bash +python3 clients/python/agentic-sandbox-client/test_podsnapshot_extension.py \ + --labels app=agent-sandbox-workload \ + --template-name python-counter-template \ + --namespace sandbox-test +``` + +Adjust the `--namespace`, `--template-name`, and `--labels` as needed for your environment. \ No newline at end of file diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot_client.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot_client.py new file mode 100644 index 000000000..b6eb7037e --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/podsnapshot_client.py @@ -0,0 +1,671 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys +import os +from typing import Any +from pathlib import Path +import json +from dataclasses import dataclass +from kubernetes import client, watch +from kubernetes.client import ApiException +from ..sandbox_client import SandboxClient, ExecutionResult +from ..constants import * + +logger = logging.getLogger(__name__) + + +@dataclass +class SnapshotResult: + """Result of a snapshot processing operation.""" + + snapshot_uid: str + snapshot_timestamp: str + + +@dataclass +class SnapshotResponse: + """Structured response for snapshot operations.""" + + success: bool + trigger_name: str + snapshot_uid: str + error_reason: str + error_code: int + + +@dataclass +class RestoreResult: + """Result of a restore operation.""" + + success: bool + error_reason: str + error_code: int + + +@dataclass +class PolicyMetadata: + policy_name: str + policy_labels: dict[str, str] + + +class SnapshotPersistenceManager: + """ + Manages local persistence of snapshot metadata in a secure directory. + Stores metadata as a dictionary keyed by trigger_name. + """ + + def __init__(self): + """Initializes the persistence manager and ensures the secure directory exists.""" + self.secure_dir = Path.home() / ".snapshot_metadata" + self._ensure_secure_dir() + self.metadata_file = self.secure_dir / ".snapshots.json" + + def _ensure_secure_dir(self): + """Ensures the directory exists with 700 permissions.""" + if not self.secure_dir.exists(): + self.secure_dir.mkdir(parents=True) + self.secure_dir.chmod(0o700) + + def _load_metadata(self) -> dict[str, Any]: + """Loads metadata. Returns an empty dict if file doesn't exist or is invalid.""" + if not self.metadata_file.exists(): + return {} + try: + with open(self.metadata_file, "r") as f: + data = json.load(f) + if isinstance(data, list): + # Handle legacy list format by clearing it (or converting if preferred, but identifying key is tricky if not consistent) + logging.warning( + "Found legacy list-format metadata. Resetting to empty dict." + ) + return {} + return data + except (json.JSONDecodeError, IOError) as e: + logging.warning(f"Failed to load snapshot metadata: {e}") + return {} + + def save_snapshot_metadata(self, record: dict[str, Any]): + """Saves a snapshot record to the local registry keyed by snapshot_uid.""" + snapshot_uid = record.get("snapshot_uid") + if not snapshot_uid: + logging.error("Cannot save metadata: missing 'snapshot_uid'.") + return + + snapshots = self._load_metadata() + snapshots[snapshot_uid] = record + + try: + with open(self.metadata_file, "w") as f: + json.dump(snapshots, f, indent=4) + self.metadata_file.chmod(0o600) + logging.info(f"Snapshot metadata saved to {self.metadata_file}") + except IOError as e: + logging.error(f"Failed to save snapshot metadata: {e}") + + def delete_snapshot_metadata(self, snapshot_uid: str): + """Deletes a snapshot record from the local registry.""" + snapshots = self._load_metadata() + if snapshot_uid in snapshots: + del snapshots[snapshot_uid] + try: + with open(self.metadata_file, "w") as f: + json.dump(snapshots, f, indent=4) + self.metadata_file.chmod(0o600) + logging.info(f"Snapshot metadata deleted for '{snapshot_uid}'") + except IOError as e: + logging.error(f"Failed to save metadata after deletion: {e}") + else: + logging.info( + f"No local metadata found for snapshot '{snapshot_uid}' to delete." + ) + + +class PodSnapshotSandboxClient(SandboxClient): + """ + A specialized Sandbox client for interacting with the gke pod snapshot controller. + Currently supports manual triggering via PodSnapshotManualTrigger. + """ + + def __init__( + self, + template_name: str, + podsnapshot_timeout: int = 180, + server_port: int = 8080, + **kwargs, + ): + super().__init__(template_name, server_port=server_port, **kwargs) + + self.controller_ready = False + self.podsnapshot_timeout = podsnapshot_timeout + self.persistence_manager = SnapshotPersistenceManager() + self.core_v1_api = client.CoreV1Api() + + self.created_manual_triggers = [] + + def __enter__(self) -> "PodSnapshotSandboxClient": + self.controller_ready = self.snapshot_controller_ready() + super().__enter__() + return self + + def _get_policy_info(self, snapshot_uid: str) -> PolicyMetadata: + """ + Retrieves the policy name and labels of the specified PodSnapshot resource. + """ + try: + snapshot = self.custom_objects_api.get_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOT_PLURAL, + name=snapshot_uid, + ) + policy_name = snapshot.get("spec", {}).get("policyName", "") + if not policy_name: + logging.warning(f"No policyName found for snapshot {snapshot_uid}") + return PolicyMetadata(policy_name="", policy_labels={}) + + policy = self.custom_objects_api.get_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOTPOLICY_PLURAL, + name=policy_name, + ) + policy_metadata = PolicyMetadata( + policy_name=policy_name, + policy_labels=policy.get("spec", {}) + .get("selector", {}) + .get("matchLabels", {}), + ) + return policy_metadata + except ApiException as e: + logging.error(f"Failed to retrieve PodSnapshot '{snapshot_uid}': {e}") + raise + + def _parse_snapshot_result(self, obj) -> SnapshotResult | None: + """Parses the object to see if snapshot is complete.""" + status = obj.get("status", {}) + conditions = status.get("conditions", []) + for condition in conditions: + if ( + condition.get("type") == "Triggered" + and condition.get("status") == "True" + and condition.get("reason") == "Complete" + ): + snapshot_uid = status.get("snapshotCreated", {}).get("name") + snapshot_timestamp = condition.get("lastTransitionTime") + return SnapshotResult( + snapshot_uid=snapshot_uid, + snapshot_timestamp=snapshot_timestamp, + ) + return None + + def _wait_for_snapshot_processed( + self, trigger_name: str, resource_version: str | None = None + ) -> SnapshotResult: + """ + Waits for the PodSnapshotManualTrigger to be processed and returns SnapshotResult. + """ + w = watch.Watch() + logger.info( + f"Waiting for snapshot manual trigger '{trigger_name}' to be processed..." + ) + + kwargs = {} + if resource_version: + kwargs["resource_version"] = resource_version + + try: + for event in w.stream( + func=self.custom_objects_api.list_namespaced_custom_object, + namespace=self.namespace, + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + field_selector=f"metadata.name={trigger_name}", + timeout_seconds=self.podsnapshot_timeout, + **kwargs, + ): + if event["type"] in ["ADDED", "MODIFIED"]: + obj = event["object"] + result = self._parse_snapshot_result(obj) + if result: + logger.info( + f"Snapshot manual trigger '{trigger_name}' processed successfully. Created Snapshot UID: {result.snapshot_uid}" + ) + w.stop() + return result + except Exception as e: + logger.error(f"Error watching snapshot: {e}") + raise + + raise TimeoutError( + f"Snapshot manual trigger '{trigger_name}' was not processed within {self.podsnapshot_timeout} seconds." + ) + + def snapshot_controller_ready(self) -> bool: + """ + Checks if the snapshot agent pods are running in a GKE-managed pod snapshot cluster. + Falls back to checking CRD existence if pod listing is forbidden. + """ + + if self.controller_ready: + return True + + def check_crd_installed() -> bool: + try: + # Check directly if the API resource exists using CustomObjectsApi + resource_list = self.custom_objects_api.get_api_resources( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + ) + + if not resource_list or not resource_list.resources: + return False + + for resource in resource_list.resources: + if resource.kind == PODSNAPSHOT_API_KIND: + return True + return False + except ApiException as e: + # If discovery fails with 403/404, we assume not ready/accessible + if e.status == 403 or e.status == 404: + return False + raise + + def check_namespace(namespace: str, required_components: list[str]) -> bool: + try: + pods = self.core_v1_api.list_namespaced_pod(namespace) + found_components = { + component: False for component in required_components + } + + for pod in pods.items: + if pod.status.phase == "Running": + name = pod.metadata.name + for component in required_components: + if component in name: + found_components[component] = True + + return all(found_components.values()) + except ApiException as e: + if e.status == 403: + logger.info( + f"Permission denied listing pods in {namespace}. Checking CRD existence." + ) + return check_crd_installed() + if e.status == 404: + return False + raise + + # Check managed: requires only agent in gke-managed-pod-snapshots + if check_namespace(SNAPSHOT_NAMESPACE_MANAGED, [SNAPSHOT_AGENT]): + self.controller_ready = True + return True + + self.controller_ready = False + return self.controller_ready + + def snapshot(self, trigger_name: str) -> SnapshotResponse: + """ + Triggers a snapshot of the specified pod by creating a PodSnapshotManualTrigger resource. + The trigger_name will be suffixed with the current datetime. + Returns: + tuple[ExecutionResult, str]: The result of the operation and the final trigger name (with suffix). + """ + trigger_name = f"{trigger_name}-{os.urandom(4).hex()}" + + if not self.controller_ready: + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + error_reason="Snapshot controller is not ready. Ensure it is installed and running.", + error_code=1, + ) + if not self.pod_name: + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + error_reason="Sandbox pod name not found. Ensure sandbox is created.", + error_code=1, + ) + + manifest = { + "apiVersion": f"{PODSNAPSHOT_API_GROUP}/{PODSNAPSHOT_API_VERSION}", + "kind": f"{PODSNAPSHOT_API_KIND}", + "metadata": {"name": trigger_name, "namespace": self.namespace}, + "spec": {"targetPod": self.pod_name}, + } + + try: + created_obj = self.custom_objects_api.create_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + body=manifest, + ) + + # Start watching from the version we just created to avoid missing updates + resource_version = created_obj.get("metadata", {}).get("resourceVersion") + snapshot_result = self._wait_for_snapshot_processed( + trigger_name, resource_version + ) + + self.created_manual_triggers.append(trigger_name) + policy_metadata = self._get_policy_info(snapshot_result.snapshot_uid) + # Save metadata locally + try: + record = { + "snapshot_uid": snapshot_result.snapshot_uid, + "template_name": self.template_name, + "policy_name": policy_metadata.policy_name, + "policy_labels": policy_metadata.policy_labels, + "namespace": self.namespace, + "claim_name": self.claim_name, + "timestamp": snapshot_result.snapshot_timestamp, + } + self.persistence_manager.save_snapshot_metadata(record) + except Exception as e: + logging.warning(f"Failed to save snapshot metadata locally: {e}") + + return SnapshotResponse( + success=True, + trigger_name=trigger_name, + snapshot_uid=snapshot_result.snapshot_uid, + error_reason="", + error_code=0, + ) + except ApiException as e: + logger.exception( + f"Failed to create PodSnapshotManualTrigger '{trigger_name}': {e}" + ) + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + error_reason=f"Failed to create PodSnapshotManualTrigger: {e}", + error_code=1, + ) + except TimeoutError as e: + logger.exception( + f"Snapshot creation timed out for trigger '{trigger_name}': {e}" + ) + return SnapshotResponse( + success=False, + trigger_name=trigger_name, + snapshot_uid=None, + error_reason=f"Snapshot creation timed out: {e}", + error_code=1, + ) + + def is_restored_from_snapshot(self, snapshot_uid: str) -> RestoreResult: + """ + Checks if the sandbox pod was restored from the specified snapshot. + + This is verified by inspecting the 'PodRestored' condition in the pod status + and confirming that the condition's message contains the provided snapshot UID. + + Returns: + RestoreResult: The result of the restore operation. + """ + if not snapshot_uid: + return RestoreResult( + success=False, + error_reason="Snapshot UID cannot be empty.", + error_code=1, + ) + + if not self.pod_name: + logger.warning("Cannot check restore status: pod_name is unknown.") + return RestoreResult( + success=False, + error_reason="Pod name not found. Ensure sandbox is created.", + error_code=1, + ) + + try: + pod = self.core_v1_api.read_namespaced_pod(self.pod_name, self.namespace) + + if not pod.status or not pod.status.conditions: + return RestoreResult( + success=False, + error_reason="Pod status or conditions not found.", + error_code=1, + ) + + for condition in pod.status.conditions: + if condition.type == "PodRestored" and condition.status == "True": + # Check if Snapshot UUID is present in the condition.message + if condition.message and snapshot_uid in condition.message: + return RestoreResult( + success=True, + error_reason="", + error_code=0, + ) + else: + return RestoreResult( + success=False, + error_reason="Pod was not restored from the given snapshot", + error_code=1, + ) + + return RestoreResult( + success=False, + error_reason="Pod was not restored from any snapshot", + error_code=1, + ) + + except ApiException as e: + logger.error(f"Failed to check pod restore status: {e}") + return RestoreResult( + success=False, + error_reason=f"Failed to check pod restore status: {e}", + error_code=1, + ) + + def list_snapshots(self, policy_name: str, ready_only: bool = True) -> list | None: + """ + Checks for existing snapshots matching the policy name. + Returns a list of valid snapshots sorted by creation timestamp (newest first). + + policy_name: Filters snapshots by their spec.policyName. + ready_only: If True, filters out snapshots that are only in 'Ready' state. + """ + local_meta = self.persistence_manager._load_metadata() + if not local_meta: + logging.info("No local snapshot metadata found.") + return None + + valid_snapshots = [] + + for snapshot_id, record in local_meta.items(): + uid = record.get("snapshot_uid") + if not uid: + # Fallback if key is not uid (legacy) or uid missing in record + uid = snapshot_id + + # Optimized Filtering: Check policy_name from metadata if available + meta_policy_name = record.get("policy_name") + if policy_name and meta_policy_name and meta_policy_name != policy_name: + continue + + try: + # Fetch the actual PodSnapshot resource to verify status and details + snapshot = self.custom_objects_api.get_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOT_PLURAL, + name=uid, + ) + except ApiException as e: + # If 404, it means it's in metadata but deleted from K8s. Skip it. + if e.status != 404: + logging.warning(f"Failed to fetch PodSnapshot '{uid}': {e}") + else: + # delete from the local metadata + self.persistence_manager.delete_snapshot_metadata(uid) + continue + + spec = snapshot.get("spec", {}) + status = snapshot.get("status", {}) + conditions = status.get("conditions", []) + metadata = snapshot.get("metadata", {}) + + # Filter by policy_name if provided and not already checked + crd_policy_name = spec.get("policyName") + if policy_name and crd_policy_name != policy_name: + continue + + # Update metadata if policy_name was missing + if not meta_policy_name and crd_policy_name: + record["policy_name"] = crd_policy_name + self.persistence_manager.save_snapshot_metadata(record) + + # Check for Ready=True + is_ready = False + for cond in conditions: + if cond.get("type") == "Ready" and cond.get("status") == "True": + is_ready = True + break + + # Skip if only ready snapshots are requested + if ready_only and not is_ready: + continue + + valid_snapshots.append( + { + "snapshot_id": metadata.get("name"), + "source_pod": metadata.get("labels", {}).get( + "podsnapshot.gke.io/pod-name", "Unknown" + ), + "uid": metadata.get("uid"), + "creationTimestamp": metadata.get("creationTimestamp", ""), + "status": "Ready" if is_ready else "NotReady", + "policy_name": spec.get("policyName"), + } + ) + + if not valid_snapshots: + logging.info( + "No Ready snapshots found matching criteria in local metadata." + ) + return None + + # Sort snapshots by creation timestamp descending + valid_snapshots.sort(key=lambda x: x["creationTimestamp"], reverse=True) + logging.info( + f"Found {len(valid_snapshots)} ready snapshots from local metadata." + ) + return valid_snapshots + + def delete_snapshots( + self, snapshot_uid: str | None = None, policy_name: str | None = None + ) -> int: + """ + Deletes snapshots. + - If snapshot_uid is provided, deletes that specific snapshot. + - If policy_name is provided, deletes all snapshots matching that policy. + - If neither is provided, deletes ALL snapshots in local metadata. + Returns the count of successfully deleted snapshots. + """ + local_meta = self.persistence_manager._load_metadata() + + snapshots_to_delete = [] # List of uids + + if snapshot_uid: + if snapshot_uid in local_meta: + snapshots_to_delete.append(snapshot_uid) + else: + logging.warning( + f"Snapshot '{snapshot_uid}' not found in local metadata. Checking other filters or skipping." + ) + # We could try to delete from K8s even if not in metadata, but for now strict consistency + + if policy_name: + for uid, record in local_meta.items(): + if record.get("policy_name") == policy_name: + if uid not in snapshots_to_delete: + snapshots_to_delete.append(uid) + + if not snapshot_uid and not policy_name: + logging.info( + "No snapshot_uid or policy_name provided. Deleting ALL snapshots found in local metadata." + ) + snapshots_to_delete = list(local_meta.keys()) + + if not snapshots_to_delete: + logging.info("No snapshots found matching criteria to delete.") + return 0 + + delete_count = 0 + for uid in snapshots_to_delete: + record = local_meta.get(uid) + if not record: + continue + + # Delete PodSnapshot + try: + logging.info(f"Deleting PodSnapshot '{uid}'...") + self.custom_objects_api.delete_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOT_PLURAL, + name=uid, + ) + logging.info(f"PodSnapshot '{uid}' deleted.") + except ApiException as e: + if e.status == 404: + logging.info( + f"PodSnapshot '{uid}' not found in K8s (already deleted?)." + ) + else: + logging.error(f"Failed to delete PodSnapshot '{uid}': {e}") + + # Cleanup Local Metadata + self.persistence_manager.delete_snapshot_metadata(uid) + delete_count += 1 + + logging.info( + f"Snapshot deletion process completed. Deleted {delete_count} snapshots." + ) + return delete_count + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Cleans up the PodSnapshotManualTrigger Resources. + Automatically cleans up the Sandbox. + """ + for trigger_name in self.created_manual_triggers: + try: + self.custom_objects_api.delete_namespaced_custom_object( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + name=trigger_name, + ) + logger.info(f"Deleted PodSnapshotManualTrigger '{trigger_name}'") + except ApiException as e: + logger.error( + f"Failed to delete PodSnapshotManualTrigger '{trigger_name}': {e}" + ) + + super().__exit__(exc_type, exc_val, exc_tb) diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/test_podsnapshot_client.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/test_podsnapshot_client.py new file mode 100644 index 000000000..2e39f6219 --- /dev/null +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/gke_extensions/test_podsnapshot_client.py @@ -0,0 +1,500 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os +import logging +from unittest.mock import MagicMock, patch, call +from datetime import datetime +from k8s_agent_sandbox.gke_extensions.podsnapshot_client import ( + PodSnapshotSandboxClient, + PolicyMetadata, +) +from k8s_agent_sandbox.constants import * +from kubernetes.client import ApiException + +from kubernetes import config + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +def load_kubernetes_config(): + """Loads Kubernetes configuration, prioritizing kubeconfig and falling back to an environment variable.""" + try: + config.load_kube_config() + logging.info("Kubernetes config loaded from kubeconfig file.") + except config.ConfigException: + logging.info( + "Kubeconfig file not found, attempting to load from environment variable." + ) + try: + config.load_kube_config(config_file=os.getenv("KUBECONFIG_FILE")) + logging.info( + "Kubernetes config loaded from KUBECONFIG_FILE environment variable." + ) + except Exception as e: + logging.error(f"Could not load Kubernetes config: {e}", exc_info=True) + raise + + +class TestPodSnapshotSandboxClient(unittest.TestCase): + + @patch("kubernetes.config") + @patch( + "k8s_agent_sandbox.gke_extensions.podsnapshot_client.SnapshotPersistenceManager" + ) + def setUp(self, mock_persistence_cls, mock_config): + logging.info("Setting up TestPodSnapshotSandboxClient...") + # Mock kubernetes config loading + mock_config.load_incluster_config.side_effect = config.ConfigException( + "Not in cluster" + ) + mock_config.load_kube_config.return_value = None + + self.mock_persistence_manager = mock_persistence_cls.return_value + + # Create client without patching super, as it's tested separately + with patch.object( + PodSnapshotSandboxClient, "snapshot_controller_ready", return_value=True + ): + self.client = PodSnapshotSandboxClient("test-template") + + # Verify persistence manager is mocked + self.client.persistence_manager = self.mock_persistence_manager + + # Mock the kubernetes APIs on the client instance + self.client.custom_objects_api = MagicMock() + self.client.core_v1_api = MagicMock() + + logging.info("Finished setting up TestPodSnapshotSandboxClient.") + + def test_init(self): + """Test initialization of PodSnapshotSandboxClient.""" + logging.info("Starting test_init...") + with patch( + "k8s_agent_sandbox.sandbox_client.SandboxClient.__init__", return_value=None + ) as mock_super: + with patch.object( + PodSnapshotSandboxClient, "snapshot_controller_ready", return_value=True + ): + client = PodSnapshotSandboxClient("test-template") + mock_super.assert_called_once_with("test-template", server_port=8080) + self.assertFalse(client.controller_ready) + self.assertEqual(client.podsnapshot_timeout, 180) + logging.info("Finished test_init.") + + def test_snapshot_controller_ready_managed(self): + """Test snapshot_controller_ready for managed scenario.""" + logging.info("Starting test_snapshot_controller_ready_managed...") + mock_v1 = self.client.core_v1_api + + # Mock pods in gke-managed-pod-snapshots + mock_pod_agent = MagicMock() + mock_pod_agent.metadata.name = "pod-snapshot-agent" + mock_pod_agent.status.phase = "Running" + + mock_pods = MagicMock() + mock_pods.items = [mock_pod_agent] + mock_v1.list_namespaced_pod.return_value = mock_pods + + self.client.controller_ready = False + result = self.client.snapshot_controller_ready() + + self.assertTrue(result) + self.assertTrue(self.client.controller_ready) + mock_v1.list_namespaced_pod.assert_called_with(SNAPSHOT_NAMESPACE_MANAGED) + logging.info("Finished test_snapshot_controller_ready_managed.") + + def test_snapshot_controller_ready_status_not_ready(self): + """Test snapshot_controller_ready when not ready (pod missing).""" + logging.info("Starting test_snapshot_controller_ready_status_not_ready...") + mock_v1 = self.client.core_v1_api + + mock_pods = MagicMock() + mock_pods.items = [] + mock_v1.list_namespaced_pod.return_value = mock_pods + + self.client.controller_ready = False + result = self.client.snapshot_controller_ready() + + self.assertFalse(result) + self.assertFalse(self.client.controller_ready) + logging.info("Finished test_snapshot_controller_ready_status_not_ready.") + + def test_snapshot_controller_ready_forbidden_with_crd(self): + """Test fallback to CRD check when pod listing is forbidden.""" + logging.info("Starting test_snapshot_controller_ready_forbidden_with_crd...") + mock_v1 = self.client.core_v1_api + mock_v1.list_namespaced_pod.side_effect = ApiException(status=403) + + # Mock CustomObjectsApi.get_api_resources + mock_resource_list = MagicMock() + mock_resource = MagicMock() + mock_resource.kind = PODSNAPSHOT_API_KIND + mock_resource_list.resources = [mock_resource] + self.client.custom_objects_api.get_api_resources.return_value = ( + mock_resource_list + ) + + self.client.controller_ready = False + result = self.client.snapshot_controller_ready() + + self.assertTrue(result) + self.assertTrue(self.client.controller_ready) + self.client.custom_objects_api.get_api_resources.assert_called_with( + group=PODSNAPSHOT_API_GROUP, version=PODSNAPSHOT_API_VERSION + ) + logging.info("Finished test_snapshot_controller_ready_forbidden_with_crd.") + + def test_snapshot_controller_ready_forbidden_no_crd(self): + """Test fallback to CRD check fails when CRD is missing.""" + logging.info("Starting test_snapshot_controller_ready_forbidden_no_crd...") + mock_v1 = self.client.core_v1_api + mock_v1.list_namespaced_pod.side_effect = ApiException(status=403) + + # Mock CustomObjectsApi.get_api_resources returning empty + self.client.custom_objects_api.get_api_resources.return_value = None + + self.client.controller_ready = False + result = self.client.snapshot_controller_ready() + + self.assertFalse(result) + self.assertFalse(self.client.controller_ready) + logging.info("Finished test_snapshot_controller_ready_forbidden_no_crd.") + + def test_snapshot_controller_ready_404(self): + """Test snapshot_controller_ready returns False on 404.""" + logging.info("Starting test_snapshot_controller_ready_404...") + mock_v1 = self.client.core_v1_api + mock_v1.list_namespaced_pod.side_effect = ApiException(status=404) + + self.client.controller_ready = False + result = self.client.snapshot_controller_ready() + + self.assertFalse(result) + self.assertFalse(self.client.controller_ready) + logging.info("Finished test_snapshot_controller_ready_404.") + + @patch("k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch") + def test_snapshot_success(self, mock_watch_class): + """Test successful snapshot creation.""" + logging.info("Starting test_snapshot_success...") + + # Mock the watch + mock_watch = MagicMock() + mock_watch_class.return_value = mock_watch + + self.client.pod_name = "test-pod" + self.client.controller_ready = True + self.client.namespace = "test-ns" + + # Mock the watch stream + mock_event = { + "type": "MODIFIED", + "object": { + "status": { + "conditions": [ + { + "type": "Triggered", + "status": "True", + "reason": "Complete", + "lastTransitionTime": "2023-01-01T00:00:00Z", + } + ], + "snapshotCreated": {"name": "snapshot-uid"}, + } + }, + } + mock_watch.stream.return_value = [mock_event] + + # Mock create to return an object with resourceVersion + mock_created_obj = {"metadata": {"resourceVersion": "123"}, "status": {}} + self.client.custom_objects_api.create_namespaced_custom_object.return_value = ( + mock_created_obj + ) + + # Mock _get_policy_info to return valid data + policy_metadata = PolicyMetadata( + policy_name="test-policy", policy_labels={"app": "foo"} + ) + with patch.object( + self.client, "_get_policy_info", return_value=policy_metadata + ): + result = self.client.snapshot("test-trigger") + + self.assertEqual(result.error_code, 0) + self.assertTrue(result.success) + self.assertIn("test-trigger", result.trigger_name) + + # Verify create call was made + self.client.custom_objects_api.create_namespaced_custom_object.assert_called_once() + # Verify watch was called with resource_version + mock_watch.stream.assert_called_once() + _, kwargs = mock_watch.stream.call_args + self.assertEqual(kwargs.get("resource_version"), "123") + + # Verify metadata saved + self.mock_persistence_manager.save_snapshot_metadata.assert_called_once() + args, _ = self.mock_persistence_manager.save_snapshot_metadata.call_args + record = args[0] + self.assertEqual(record["snapshot_uid"], "snapshot-uid") + self.assertEqual(record["policy_name"], "test-policy") + + logging.info("Finished test_snapshot_success.") + + def test_snapshot_controller_not_ready(self): + """Test snapshot when controller is not ready.""" + logging.info("Starting test_snapshot_controller_not_ready...") + self.client.controller_ready = False + result = self.client.snapshot("test-trigger") + + self.assertEqual(result.error_code, 1) + self.assertFalse(result.success) + self.assertIn("test-trigger", result.trigger_name) + self.assertIn("Snapshot controller is not ready", result.error_reason) + logging.info("Finished test_snapshot_controller_not_ready.") + + def test_snapshot_no_pod_name(self): + """Test snapshot when pod name is not set.""" + logging.info("Starting test_snapshot_no_pod_name...") + self.client.controller_ready = True + self.client.pod_name = None + result = self.client.snapshot("test-trigger") + + self.assertEqual(result.error_code, 1) + self.assertFalse(result.success) + self.assertIn("test-trigger", result.trigger_name) + self.assertIn("Sandbox pod name not found", result.error_reason) + logging.info("Finished test_snapshot_no_pod_name.") + + def test_snapshot_creation_api_exception(self): + """Test snapshot handling of API exception during creation.""" + logging.info("Starting test_snapshot_creation_api_exception...") + self.client.pod_name = "test-pod" + self.client.controller_ready = True + + self.client.custom_objects_api.create_namespaced_custom_object.side_effect = ( + ApiException("Create failed") + ) + + result = self.client.snapshot("test-trigger") + + self.assertFalse(result.success) + self.assertEqual(result.error_code, 1) + self.assertIn("Failed to create PodSnapshotManualTrigger", result.error_reason) + logging.info("Finished test_snapshot_creation_api_exception.") + + @patch("k8s_agent_sandbox.gke_extensions.podsnapshot_client.watch.Watch") + @patch( + "k8s_agent_sandbox.gke_extensions.podsnapshot_client.client.CustomObjectsApi" + ) + def test_snapshot_timeout(self, mock_custom_class, mock_watch_class): + """Test snapshot timeout scenario.""" + logging.info("Starting test_snapshot_timeout...") + mock_custom = MagicMock() + mock_custom_class.return_value = mock_custom + + mock_watch = MagicMock() + mock_watch_class.return_value = mock_watch + + self.client.pod_name = "test-pod" + self.client.controller_ready = True + self.client.podsnapshot_timeout = 1 + + # Mock empty stream (timeout) + mock_watch.stream.return_value = [] + + result = self.client.snapshot("test-trigger") + + self.assertEqual(result.error_code, 1) + self.assertFalse(result.success) + self.assertIn("timed out", result.error_reason) + logging.info("Finished test_snapshot_timeout.") + + @patch("k8s_agent_sandbox.gke_extensions.podsnapshot_client.SandboxClient.__exit__") + def test_exit_cleanup(self, mock_super_exit): + """Test __exit__ cleans up created triggers.""" + logging.info("Starting test_exit_cleanup...") + self.client.created_manual_triggers = ["trigger-1", "trigger-2"] + + self.client.__exit__(None, None, None) + + # Check deletion calls + self.assertEqual( + self.client.custom_objects_api.delete_namespaced_custom_object.call_count, 2 + ) + + calls = [ + call( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.client.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + name="trigger-1", + ), + call( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.client.namespace, + plural=PODSNAPSHOTMANUALTRIGGER_PLURAL, + name="trigger-2", + ), + ] + self.client.custom_objects_api.delete_namespaced_custom_object.assert_has_calls( + calls, any_order=True + ) + + mock_super_exit.assert_called_once_with(None, None, None) + logging.info("Finished test_exit_cleanup.") + + def test_is_restored_from_snapshot_success(self): + """Test is_restored_from_snapshot success case.""" + logging.info("Starting test_is_restored_from_snapshot_success...") + self.client.pod_name = "test-pod" + + mock_pod = MagicMock() + mock_condition = MagicMock() + mock_condition.type = "PodRestored" + mock_condition.status = "True" + mock_condition.message = "Restored from snapshot-uid-123" + mock_pod.status.conditions = [mock_condition] + + self.client.core_v1_api.read_namespaced_pod.return_value = mock_pod + + result = self.client.is_restored_from_snapshot("snapshot-uid-123") + + self.assertTrue(result.success) + self.assertEqual(result.error_code, 0) + logging.info("Finished test_is_restored_from_snapshot_success.") + + def test_is_restored_from_snapshot_mismatch(self): + """Test is_restored_from_snapshot when UID matches another snapshot.""" + logging.info("Starting test_is_restored_from_snapshot_mismatch...") + self.client.pod_name = "test-pod" + + mock_pod = MagicMock() + mock_condition = MagicMock() + mock_condition.type = "PodRestored" + mock_condition.status = "True" + mock_condition.message = "Restored from snapshot-uid-456" + mock_pod.status.conditions = [mock_condition] + + self.client.core_v1_api.read_namespaced_pod.return_value = mock_pod + + result = self.client.is_restored_from_snapshot("snapshot-uid-123") + + self.assertFalse(result.success) + self.assertEqual(result.error_code, 1) + self.assertIn("not restored from the given snapshot", result.error_reason) + logging.info("Finished test_is_restored_from_snapshot_mismatch.") + + def test_is_restored_from_snapshot_no_condition(self): + """Test is_restored_from_snapshot when PodRestored condition is missing.""" + logging.info("Starting test_is_restored_from_snapshot_no_condition...") + self.client.pod_name = "test-pod" + + mock_pod = MagicMock() + mock_pod.status.conditions = [] + + self.client.core_v1_api.read_namespaced_pod.return_value = mock_pod + + result = self.client.is_restored_from_snapshot("snapshot-uid-123") + + self.assertFalse(result.success) + self.assertEqual(result.error_code, 1) + self.assertIn("Pod status or conditions not found", result.error_reason) + logging.info("Finished test_is_restored_from_snapshot_no_condition.") + + def test_is_restored_from_snapshot_api_error(self): + """Test is_restored_from_snapshot API exception handling.""" + logging.info("Starting test_is_restored_from_snapshot_api_error...") + self.client.pod_name = "test-pod" + self.client.core_v1_api.read_namespaced_pod.side_effect = ApiException( + "API Error" + ) + + result = self.client.is_restored_from_snapshot("snapshot-uid-123") + + self.assertFalse(result.success) + self.assertEqual(result.error_code, 1) + self.assertIn("Failed to check pod restore status", result.error_reason) + logging.info("Finished test_is_restored_from_snapshot_api_error.") + + def test_is_restored_from_snapshot_no_pod_name(self): + """Test is_restored_from_snapshot when pod_name is missing.""" + logging.info("Starting test_is_restored_from_snapshot_no_pod_name...") + self.client.pod_name = None + result = self.client.is_restored_from_snapshot("snapshot-uid-123") + + self.assertFalse(result.success) + self.assertEqual(result.error_code, 1) + self.assertIn("Pod name not found", result.error_reason) + logging.info("Finished test_is_restored_from_snapshot_no_pod_name.") + + def test_is_restored_from_snapshot_empty_uid(self): + """Test is_restored_from_snapshot with empty UID.""" + logging.info("Starting test_is_restored_from_snapshot_empty_uid...") + result = self.client.is_restored_from_snapshot("") + + self.assertFalse(result.success) + self.assertEqual(result.error_code, 1) + self.assertIn("Snapshot UID cannot be empty", result.error_reason) + logging.info("Finished test_is_restored_from_snapshot_empty_uid.") + + def test_delete_snapshots_by_uid(self): + """Test delete_snapshots by specific UID.""" + logging.info("Starting test_delete_snapshots_by_uid...") + self.mock_persistence_manager._load_metadata.return_value = { + "uid-1": {"uid": "uid-1", "policy_name": "p1"}, + "uid-2": {"uid": "uid-2", "policy_name": "p2"}, + } + + count = self.client.delete_snapshots(snapshot_uid="uid-1") + + self.assertEqual(count, 1) + # Verify deletions + self.mock_persistence_manager.delete_snapshot_metadata.assert_called_with( + "uid-1" + ) + self.client.custom_objects_api.delete_namespaced_custom_object.assert_any_call( + group=PODSNAPSHOT_API_GROUP, + version=PODSNAPSHOT_API_VERSION, + namespace=self.client.namespace, + plural=PODSNAPSHOT_PLURAL, + name="uid-1", + ) + logging.info("Finished test_delete_snapshots_by_uid.") + + def test_delete_snapshots_by_policy(self): + """Test delete_snapshots by policy name.""" + logging.info("Starting test_delete_snapshots_by_policy...") + self.mock_persistence_manager._load_metadata.return_value = { + "uid-1": {"uid": "uid-1", "policy_name": "p1"}, + "uid-2": {"uid": "uid-2", "policy_name": "p1"}, # Same policy + "uid-3": {"uid": "uid-3", "policy_name": "p2"}, + } + + count = self.client.delete_snapshots(policy_name="p1") + + self.assertEqual(count, 2) + # Verify deletions for both + self.mock_persistence_manager.delete_snapshot_metadata.assert_any_call("uid-1") + self.mock_persistence_manager.delete_snapshot_metadata.assert_any_call("uid-2") + logging.info("Finished test_delete_snapshots_by_policy.") + + +if __name__ == "__main__": + unittest.main() diff --git a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/sandbox_client.py b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/sandbox_client.py index ea42b0739..7c4a9d713 100644 --- a/clients/python/agentic-sandbox-client/k8s_agent_sandbox/sandbox_client.py +++ b/clients/python/agentic-sandbox-client/k8s_agent_sandbox/sandbox_client.py @@ -38,20 +38,7 @@ initialize_tracer, TracerManager, trace_span, trace, OPENTELEMETRY_AVAILABLE ) -# Constants for API Groups and Resources -GATEWAY_API_GROUP = "gateway.networking.k8s.io" -GATEWAY_API_VERSION = "v1" -GATEWAY_PLURAL = "gateways" - -CLAIM_API_GROUP = "extensions.agents.x-k8s.io" -CLAIM_API_VERSION = "v1alpha1" -CLAIM_PLURAL_NAME = "sandboxclaims" - -SANDBOX_API_GROUP = "agents.x-k8s.io" -SANDBOX_API_VERSION = "v1alpha1" -SANDBOX_PLURAL_NAME = "sandboxes" - -POD_NAME_ANNOTATION = "agents.x-k8s.io/pod-name" +from .constants import * logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', diff --git a/clients/python/agentic-sandbox-client/python-counter-template.yaml b/clients/python/agentic-sandbox-client/python-counter-template.yaml new file mode 100644 index 000000000..19407c1b2 --- /dev/null +++ b/clients/python/agentic-sandbox-client/python-counter-template.yaml @@ -0,0 +1,29 @@ +apiVersion: extensions.agents.x-k8s.io/v1alpha1 +kind: SandboxTemplate +metadata: + name: python-counter-template + namespace: sandbox-test + labels: + language: python +spec: + #enableDisruptionControl: true + #shutdownTime: "2025-12-31T23:59:59Z" + podTemplate: + metadata: + labels: + app: agent-sandbox-workload + spec: + serviceAccountName: sandbox-test + runtimeClassName: gvisor + containers: + - name: my-container1 + image: python:3.10-slim + command: ["python3", "-c"] + args: + - | + import time + i = 0 + while True: + print(f"Count: {i}", flush=True) + i += 1 + time.sleep(1) diff --git a/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py b/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py new file mode 100644 index 000000000..38a73a139 --- /dev/null +++ b/clients/python/agentic-sandbox-client/test_podsnapshot_extension.py @@ -0,0 +1,200 @@ +# Copyright 2026 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import asyncio +import time +import sys +from kubernetes import client, config +import re +from k8s_agent_sandbox.gke_extensions import PodSnapshotSandboxClient + + +def test_snapshot_response(snapshot_response, snapshot_name): + assert hasattr( + snapshot_response, "trigger_name" + ), "snapshot response missing 'trigger_name' attribute" + + print(f"Trigger Name: {snapshot_response.trigger_name}") + print(f"Snapshot UID: {snapshot_response.snapshot_uid}") + print(f"Success: {snapshot_response.success}") + print(f"Error Code: {snapshot_response.error_code}") + print(f"Error Reason: {snapshot_response.error_reason}") + + assert snapshot_response.trigger_name.startswith( + snapshot_name + ), f"Expected trigger name prefix '{snapshot_name}', but got '{snapshot_response.trigger_name}'" + assert ( + snapshot_response.success + ), f"Expected success=True, but got False. Reason: {snapshot_response.error_reason}" + assert snapshot_response.error_code == 0 + + +async def main( + template_name: str, + api_url: str | None, + namespace: str, + server_port: int, + labels: dict[str, str], +): + """ + Tests the Sandbox client by creating a sandbox, running a command, + and then cleaning up. + """ + + print( + f"--- Starting Sandbox Client Test (Namespace: {namespace}, Port: {server_port}) ---" + ) + + # Load kube config + try: + config.load_incluster_config() + except config.ConfigException: + config.load_kube_config() + + wait_time = 10 + first_snapshot_name = "test-snapshot-10" + second_snapshot_name = "test-snapshot-20" + policy_name = "example-psp-workload" # PodSnapshotPolicy defined in the cluster. + + try: + print("\n***** Phase 1: Starting Counter *****") + + with PodSnapshotSandboxClient( + template_name=template_name, + namespace=namespace, + api_url=api_url, + server_port=server_port, + ) as sandbox: + print("\n======= Testing Pod Snapshot Extension =======") + assert sandbox.controller_ready == True, "Sandbox controller is not ready." + + time.sleep(wait_time) + print( + f"Creating first pod snapshot '{first_snapshot_name}' after {wait_time} seconds..." + ) + snapshot_response = sandbox.snapshot(first_snapshot_name) + test_snapshot_response(snapshot_response, first_snapshot_name) + first_snapshot_uid = snapshot_response.snapshot_uid + + time.sleep(wait_time) + + print( + f"\nCreating second pod snapshot '{second_snapshot_name}' after {wait_time} seconds..." + ) + snapshot_response = sandbox.snapshot(second_snapshot_name) + test_snapshot_response(snapshot_response, second_snapshot_name) + recent_snapshot_uid = snapshot_response.snapshot_uid + print(f"Recent snapshot UID: {recent_snapshot_uid}") + + print( + "\n***** List all existing ready snapshots with the policy name. *****" + ) + snapshots = sandbox.list_snapshots(policy_name=policy_name) + for snap in snapshots: + print( + f"Snapshot ID: {snap['snapshot_id']}, Source Pod: {snap['source_pod']}, Creation Time: {snap['creationTimestamp']}, Policy Name: {snap['policy_name']}" + ) + + print("\n***** Phase 2: Restoring from most recent snapshot & Verifying *****") + with PodSnapshotSandboxClient( + template_name=template_name, + namespace=namespace, + api_url=api_url, + server_port=server_port, + ) as sandbox_restored: # restores from second_snapshot_name by default + + print("\nWaiting 5 seconds for restored pod to resume printing...") + time.sleep(5) + + restore_result = sandbox_restored.is_restored_from_snapshot( + recent_snapshot_uid + ) + assert restore_result.success, "Pod was not restored from a snapshot." + print("Pod was restored from the most recent snapshot.") + + print("\n**** Deleting snapshots *****") + deleted_snapshots = sandbox_restored.delete_snapshots() + print(f"Deleted Snapshots: {deleted_snapshots}") + + print("--- Pod Snapshot Test Passed! ---") + + except Exception as e: + print(f"\n--- An error occurred during the test: {e} ---") + # The __exit__ method of the Sandbox class will handle cleanup. + finally: + print("\n--- Sandbox Client Test Finished ---") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Test the Sandbox client.") + parser.add_argument( + "--template-name", + default="python-sandbox-template", + help="The name of the sandbox template to use for the test.", + ) + + # Default is None to allow testing the Port-Forward fallback + parser.add_argument( + "--gateway-name", + default=None, + help="The name of the Gateway resource. If omitted, defaults to local port-forward mode.", + ) + + parser.add_argument( + "--gateway-namespace", + default=None, + help="The namespace of the Gateway resource. If omitted, defaults to local port-forward mode.", + ) + + parser.add_argument( + "--api-url", + help="Direct URL to router (e.g. http://localhost:8080)", + default=None, + ) + parser.add_argument( + "--namespace", default="default", help="Namespace to create sandbox in" + ) + parser.add_argument( + "--server-port", + type=int, + default=8888, + help="Port the sandbox container listens on", + ) + parser.add_argument( + "--labels", + nargs="+", + default=["app=sandbox-test"], + help="Labels for the sandbox pod/claim in key=value format (e.g. app=sandbox-test env=dev)", + ) + + args = parser.parse_args() + + labels_dict = {} + for l in args.labels: + if "=" in l: + k, v = l.split("=", 1) + labels_dict[k] = v + else: + print(f"Warning: Ignoring invalid label format '{l}'. Use key=value.") + + asyncio.run( + main( + template_name=args.template_name, + api_url=args.api_url, + namespace=args.namespace, + server_port=args.server_port, + labels=labels_dict, + ) + )