Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@

PODSNAPSHOT_API_GROUP = "podsnapshot.gke.io"
PODSNAPSHOT_API_VERSION = "v1alpha1"
PODSNAPSHOT_PLURAL = "podsnapshots"
PODSNAPSHOTMANUALTRIGGER_PLURAL = "podsnapshotmanualtriggers"
PODSNAPSHOTMANUALTRIGGER_API_KIND = "PodSnapshotManualTrigger"
PODSNAPSHOT_API_KIND = "PodSnapshot"
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ This directory contains the Python client extension for interacting with the Age

## `podsnapshot_client.py`

This file defines the `PodSnapshotSandboxClient` class, which extend the base `SandboxClient` to provide snapshot capabilities.
This file defines the `PodSnapshotSandboxClient` class, which extends the base `SandboxClient` to provide snapshot capabilities.

### `PodSnapshotSandboxClient`

A specialized Sandbox client for interacting with the gke pod snapshot controller.
A specialized Sandbox client for interacting with the GKE Pod Snapshot Controller.

### Key Features:

* **`PodSnapshotSandboxClient(template_name: str, ...)`**:
* Initializes the client with optional server port.

* **`PodSnapshotSandboxClient(template_name: str, podsnapshot_timeout: int = 180, ...)`**:
* Initializes the client with optional podsnapshot timeout.
* **`snapshot(self, trigger_name: str) -> SnapshotResponse`**:
* Triggers a manual snapshot of the current sandbox pod by creating a `PodSnapshotManualTrigger` resource.
* The `trigger_name` is suffixed with a timestamp and unique hash.
* Waits for the snapshot to be processed.
* The Pod Snapshot Controller creates a `PodSnapshot` resource automatically.
* Returns the SnapshotResponse object(success, error_code, error_reason, trigger_name, snapshot_uid).
* **`__exit__(self)`**:
* Cleans up the `PodSnapshotManualTrigger` resources.
* Cleans up the `SandboxClaim` resources.

## `test_podsnapshot_extension.py`
Expand All @@ -24,8 +30,10 @@ This file, located in the parent directory (`clients/python/agentic-sandbox-clie

### Test Phases:

1. **Phase 1: Starting Counter Sandbox**:
1. **Phase 1: Starting Counter Sandbox & Snapshotting**:
* Starts a sandbox with a counter application.
* Takes a snapshot (`test-snapshot-10`) after ~10 seconds.
* Takes a snapshot (`test-snapshot-20`) after ~20 seconds.

### Prerequisites

Expand Down Expand Up @@ -59,4 +67,4 @@ python3 clients/python/agentic-sandbox-client/test_podsnapshot_extension.py \
--namespace sandbox-test
```

Adjust the `--namespace`, `--template-name` as needed for your environment.
Adjust the `--namespace`, `--template-name` as needed for your environment.
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,66 @@
# limitations under the License.

import logging
from kubernetes import client
import uuid
from datetime import datetime, timezone
from typing import Any
from dataclasses import dataclass
from kubernetes import client, watch
from kubernetes.client import ApiException
from ..sandbox_client import SandboxClient
from ..constants import (
PODSNAPSHOT_API_GROUP,
PODSNAPSHOT_API_VERSION,
PODSNAPSHOT_API_KIND,
PODSNAPSHOTMANUALTRIGGER_API_KIND,
PODSNAPSHOTMANUALTRIGGER_PLURAL,
)

SNAPSHOT_SUCCESS_CODE = 0
SNAPSHOT_ERROR_CODE = 1

logger = logging.getLogger(__name__)


@dataclass
class SnapshotResult:
"""Result of a snapshot processing operation."""

snapshot_uid: str
snapshot_timestamp: str
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snapshot_timestamp is extracted and stored in SnapshotResult, but it is never utilized or returned in SnapshotResponse. Should we add it to the final SnapshotResponse so the user knows when the snapshot occurred



@dataclass
class SnapshotResponse:
"""Structured response for snapshot operations."""

success: bool
trigger_name: str
snapshot_uid: str
snapshot_timestamp: str
error_reason: str
error_code: int


class PodSnapshotSandboxClient(SandboxClient):
"""
A specialized Sandbox client for interacting with the GKE Pod Snapshot Controller.

TODO: This class enables users to take a snapshot of their sandbox and restore from the taken snapshot.
This class enables users to take a manual trigger snapshot of their sandbox and restore from the taken snapshot.
"""

def __init__(
self,
template_name: str,
podsnapshot_timeout: int = 180,
**kwargs,
):
super().__init__(template_name, **kwargs)

self.snapshot_crd_installed = False
self.core_v1_api = client.CoreV1Api()
self.podsnapshot_timeout = podsnapshot_timeout

self.created_manual_triggers = []

def __enter__(self) -> "PodSnapshotSandboxClient":
try:
Expand All @@ -61,7 +93,9 @@ def __enter__(self) -> "PodSnapshotSandboxClient":

def _check_snapshot_crd_installed(self) -> bool:
"""
Checks if the PodSnapshot CRD is installed in the cluster.
Checks if the PodSnapshot CRD is installed and available in the cluster.
Returns:
bool: True if the CRD is installed, False otherwise.
"""

if self.snapshot_crd_installed:
Expand All @@ -87,8 +121,221 @@ def _check_snapshot_crd_installed(self) -> bool:
return False
raise

def _parse_created_snapshot_info(self, obj: dict[str, Any]) -> SnapshotResult:
"""Parses the object to extract snapshot details."""
status = obj.get("status", {})
conditions = status.get("conditions") or []
for condition in conditions:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the the API responds with an explicit null for conditions ({"conditions": null}), status.get("conditions", []) will return None. Iterating over None on the next line will raise a TypeError. Please use conditions = status.get("conditions") or [] to ensure it always defaults to a list.

if (
condition.get("type") == "Triggered"
and condition.get("status") == "True"
and condition.get("reason") == "Complete"
):
snapshot_created = status.get("snapshotCreated") or {}
snapshot_uid = snapshot_created.get("name")
snapshot_timestamp = condition.get("lastTransitionTime")
return SnapshotResult(
snapshot_uid=snapshot_uid,
snapshot_timestamp=snapshot_timestamp,
)
elif condition.get("status") == "False" and condition.get("reason") in [
"Failed",
"Error",
]:
raise RuntimeError(
f"Snapshot failed. Condition: {condition.get('message', 'Unknown error')}"
)
raise ValueError("Snapshot is not yet complete.")

def _wait_for_snapshot_to_be_completed(
self, trigger_name: str, resource_version: str | None = None
) -> SnapshotResult:
"""
Waits for the PodSnapshotManualTrigger to be processed and returns SnapshotResult.
"""
w = watch.Watch()
logger.info(
f"Waiting for snapshot manual trigger '{trigger_name}' to be processed..."
)

kwargs = {}
if resource_version:
kwargs["resource_version"] = resource_version

try:
for event in w.stream(
func=self.custom_objects_api.list_namespaced_custom_object,
namespace=self.namespace,
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
plural=PODSNAPSHOTMANUALTRIGGER_PLURAL,
field_selector=f"metadata.name={trigger_name}",
timeout_seconds=self.podsnapshot_timeout,
**kwargs,
):
if event["type"] in ["ADDED", "MODIFIED"]:
obj = event["object"]
try:
result = self._parse_created_snapshot_info(obj)
logger.info(
f"Snapshot manual trigger '{trigger_name}' processed successfully. Created Snapshot UID: {result.snapshot_uid}"
)
return result
except ValueError:
# Continue watching if snapshot is not yet complete
continue
elif event["type"] == "ERROR":
logger.error(
f"Snapshot watch received error event: {event['object']}"
)
raise RuntimeError(f"Snapshot watch error: {event['object']}")
elif event["type"] == "DELETED":
logger.error(
f"Snapshot manual trigger '{trigger_name}' was deleted before completion."
)
raise RuntimeError(
f"Snapshot manual trigger '{trigger_name}' was deleted."
)
except Exception as e:
logger.error(f"Error watching snapshot: {e}")
raise
finally:
w.stop()

raise TimeoutError(
f"Snapshot manual trigger '{trigger_name}' was not processed within {self.podsnapshot_timeout} seconds."
)

def snapshot(self, trigger_name: str) -> SnapshotResponse:
"""
Triggers a snapshot of the specified pod by creating a PodSnapshotManualTrigger resource.
The trigger_name will be suffixed with a timestamp and random hex string.
Returns:
SnapshotResponse: The result of the operation.
"""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
suffix = uuid.uuid4().hex[:8]
trigger_name = f"{trigger_name}-{timestamp}-{suffix}"

if not self.snapshot_crd_installed:
return SnapshotResponse(
success=False,
trigger_name=trigger_name,
snapshot_uid=None,
snapshot_timestamp=None,
error_reason="Snapshot CRD is not installed. Ensure it is installed and running.",
error_code=SNAPSHOT_ERROR_CODE,
)
if not self.pod_name:
return SnapshotResponse(
success=False,
trigger_name=trigger_name,
snapshot_uid=None,
snapshot_timestamp=None,
error_reason="Sandbox pod name not found. Ensure sandbox is created.",
error_code=SNAPSHOT_ERROR_CODE,
)

manifest = {
"apiVersion": f"{PODSNAPSHOT_API_GROUP}/{PODSNAPSHOT_API_VERSION}",
"kind": f"{PODSNAPSHOTMANUALTRIGGER_API_KIND}",
"metadata": {"name": trigger_name, "namespace": self.namespace},
"spec": {"targetPod": self.pod_name},
}

try:
created_obj = self.custom_objects_api.create_namespaced_custom_object(
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
namespace=self.namespace,
plural=PODSNAPSHOTMANUALTRIGGER_PLURAL,
body=manifest,
)
self.created_manual_triggers.append(trigger_name)

# Start watching from the version we just created to avoid missing updates
resource_version = created_obj.get("metadata", {}).get("resourceVersion")
snapshot_result = self._wait_for_snapshot_to_be_completed(
trigger_name, resource_version
)

return SnapshotResponse(
success=True,
trigger_name=trigger_name,
snapshot_uid=snapshot_result.snapshot_uid,
snapshot_timestamp=snapshot_result.snapshot_timestamp,
error_reason="",
error_code=SNAPSHOT_SUCCESS_CODE,
)
except ApiException as e:
logger.exception(
f"Failed to create PodSnapshotManualTrigger '{trigger_name}': {e}"
)
return SnapshotResponse(
success=False,
trigger_name=trigger_name,
snapshot_uid=None,
snapshot_timestamp=None,
error_reason=f"Failed to create PodSnapshotManualTrigger: {e}",
error_code=SNAPSHOT_ERROR_CODE,
)
except TimeoutError as e:
logger.exception(
f"Snapshot creation timed out for trigger '{trigger_name}': {e}"
)
return SnapshotResponse(
success=False,
trigger_name=trigger_name,
snapshot_uid=None,
snapshot_timestamp=None,
error_reason=f"Snapshot creation timed out: {e}",
error_code=SNAPSHOT_ERROR_CODE,
)
except RuntimeError as e:
logger.exception(
f"Snapshot creation failed for trigger '{trigger_name}': {e}"
)
return SnapshotResponse(
success=False,
trigger_name=trigger_name,
snapshot_uid=None,
snapshot_timestamp=None,
error_reason=f"Snapshot creation failed: {e}",
error_code=SNAPSHOT_ERROR_CODE,
)
except Exception as e:
logger.exception(
f"Unexpected error during snapshot creation for trigger '{trigger_name}': {e}"
)
return SnapshotResponse(
success=False,
trigger_name=trigger_name,
snapshot_uid=None,
snapshot_timestamp=None,
error_reason=f"Unexpected error: {e}",
error_code=SNAPSHOT_ERROR_CODE,
)

def __exit__(self, exc_type, exc_val, exc_tb):
"""
Cleans up the PodSnapshotManualTrigger Resources.
Automatically cleans up the Sandbox.
"""
for trigger_name in self.created_manual_triggers:
try:
self.custom_objects_api.delete_namespaced_custom_object(
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
namespace=self.namespace,
plural=PODSNAPSHOTMANUALTRIGGER_PLURAL,
name=trigger_name,
)
logger.info(f"Deleted PodSnapshotManualTrigger '{trigger_name}'")
except ApiException as e:
if e.status == 404:
# Ignore if the resource is already deleted
continue
logger.error(
f"Failed to delete PodSnapshotManualTrigger '{trigger_name}': {e}"
)
super().__exit__(exc_type, exc_val, exc_tb)
Loading