Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Agentic Sandbox Pod Snapshot Extension

This directory contains the Python client extension for interacting with the Agentic Sandbox to manage Pod Snapshots. This extension allows you to trigger snapshots of a running sandbox and restore a new sandbox from the recently created snapshot.
This directory contains the Python client extension for interacting with the Agentic Sandbox to manage Pod Snapshots. This extension allows you to trigger snapshots of a running sandbox and restore a new sandbox from a recently created snapshot.

## Components

Expand All @@ -14,10 +14,10 @@ This class wraps the base `Sandbox` to seamlessly provide snapshot capabilities.

### `SnapshotEngine`
The core engine responsible for interacting with the GKE Pod Snapshot Controller.
* Creates `PodSnapshotManualTrigger` custom resources.
* Watches for the snapshot controller to process the trigger and create a `PodSnapshot` resource.
* Returns a structured `SnapshotResponse` containing the success status, error details, and `snapshot_uid`.
* Ensures that manual trigger resources are cleanly deleted when the sandbox context exits.
* **Create**: Creates `PodSnapshotManualTrigger` custom resources and waits for the snapshot to be completed.
* **List**: Lists existing snapshots for a sandbox, with optional filtering by grouping labels and ready state.
* **Delete**: Deletes snapshots associated with the sandbox, either by specific UID or by grouping labels.
* **Cleanup**: Ensures that manual trigger resources are cleanly deleted when the sandbox context exits.

## Usage Example

Expand Down Expand Up @@ -59,7 +59,7 @@ This file, located in the parent directory (`clients/python/agentic-sandbox-clie
* Takes a snapshot (`test-snapshot-20`) after ~20 seconds.
2. **Phase 2: Restoring from Recent Snapshot**:
* Restores a sandbox from the second snapshot.
* Verifies that sandbox has been restored from the recent snapshot.
* Verifies that the sandbox has been restored from the recent snapshot.

### Prerequisites

Expand All @@ -79,7 +79,7 @@ This file, located in the parent directory (`clients/python/agentic-sandbox-clie
* For detailed setup instructions, refer to the [GKE Pod Snapshots public documentation](https://docs.cloud.google.com/kubernetes-engine/docs/how-to/pod-snapshots).
* Ensure a GCS bucket is configured to store the pod snapshot states and that the necessary IAM permissions are applied.

4. **CRDs**: `PodSnapshotStorageConfig`, `PodSnapshotPolicy` CRDs must be applied. `PodSnapshotPolicy` should specify the selector match labels.
4. **CRDs**: `PodSnapshotStorageConfig`, `PodSnapshotPolicy` CRDs must be applied. `PodSnapshotPolicy` should specify the selector match labels. (Note: For the test file to work, `maxSnapshotCountPerGroup` in `PodSnapshotPolicy` must be set to 2 or more, and the grouping labels must include `tenant-id` and `user-id`.)

5. **Sandbox Template**: A `SandboxTemplate` (e.g., `python-counter-template`) with runtime gVisor, appropriate KSA and label that matches that selector label in `PodSnapshotPolicy` must be available in the cluster.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
from pydantic import BaseModel

from k8s_agent_sandbox.constants import (
PODSNAPSHOT_PLURAL,
PODSNAPSHOT_API_GROUP,
PODSNAPSHOT_API_VERSION,
PODSNAPSHOTMANUALTRIGGER_API_KIND,
PODSNAPSHOTMANUALTRIGGER_PLURAL,
)
from .utils import wait_for_snapshot_to_be_completed
from .utils import wait_for_snapshot_to_be_completed, wait_for_snapshot_deletion

SNAPSHOT_SUCCESS_CODE = 0
SNAPSHOT_ERROR_CODE = 1
Expand All @@ -36,6 +37,7 @@

class SnapshotResponse(BaseModel):
"""Structured response for snapshot operations."""

success: bool
trigger_name: str
snapshot_uid: str | None
Expand All @@ -44,6 +46,33 @@ class SnapshotResponse(BaseModel):
error_code: int


class SnapshotDetail(BaseModel):
"""Detailed information about a snapshot."""

snapshot_uid: str
source_pod: str
creation_timestamp: str | None
status: str


class ListSnapshotResult(BaseModel):
"""Result of a list snapshots operation."""

success: bool
snapshots: list[SnapshotDetail]
error_reason: str
error_code: int


class DeleteSnapshotResult(BaseModel):
"""Result of a delete snapshot operation."""

success: bool
deleted_snapshots: list[str]
error_reason: str
error_code: int


class SnapshotEngine:
"""Engine for managing Sandbox snapshots."""

Expand All @@ -58,7 +87,9 @@ def __init__(
self.get_pod_name_func = get_pod_name_func
self.created_manual_triggers = []

def create(self, trigger_name: str, podsnapshot_timeout: int = 180) -> SnapshotResponse:
def create(
self, trigger_name: str, podsnapshot_timeout: int = 180
) -> SnapshotResponse:
"""
Creates a snapshot of the Sandbox.
"""
Expand All @@ -83,12 +114,14 @@ def create(self, trigger_name: str, podsnapshot_timeout: int = 180) -> SnapshotR
}

try:
pod_snapshot_manual_trigger_cr = self.k8s_helper.custom_objects_api.create_namespaced_custom_object(
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
namespace=self.namespace,
plural=PODSNAPSHOTMANUALTRIGGER_PLURAL,
body=manifest,
pod_snapshot_manual_trigger_cr = (
self.k8s_helper.custom_objects_api.create_namespaced_custom_object(
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
namespace=self.namespace,
plural=PODSNAPSHOTMANUALTRIGGER_PLURAL,
body=manifest,
)
)
self.created_manual_triggers.append(trigger_name)
except ApiException as e:
Expand All @@ -108,10 +141,12 @@ def create(self, trigger_name: str, podsnapshot_timeout: int = 180) -> SnapshotR
error_reason=error_message,
error_code=SNAPSHOT_ERROR_CODE,
)

try:
# Start watching from the version we just created to avoid missing updates
resource_version = pod_snapshot_manual_trigger_cr.get("metadata", {}).get("resourceVersion")
resource_version = pod_snapshot_manual_trigger_cr.get("metadata", {}).get(
"resourceVersion"
)
snapshot_result = wait_for_snapshot_to_be_completed(
k8s_helper=self.k8s_helper,
namespace=self.namespace,
Expand Down Expand Up @@ -211,3 +246,230 @@ def delete_manual_triggers(self, max_retries: int = 3):
f"after {max_retries} attempts: {', '.join(self.created_manual_triggers)}. "
"These resources may be leaked in Kubernetes and require manual cleanup."
)

def list(
self, grouping_labels: dict[str, str] | None = None, ready_only: bool = True
) -> ListSnapshotResult:
"""
Checks for existing snapshots matching the grouping labels associated with the sandbox.
Returns a ListSnapshotResult containing valid snapshots sorted by creation timestamp (newest first).

grouping_labels: Filters snapshots by their metadata labels.
ready_only: If True, only returns snapshots that are in the 'Ready' state.
"""

valid_snapshots = []
pod_name = self.get_pod_name_func()

selectors = []
if not pod_name:
logger.warning("Pod name not found. Ensure sandbox is created.")
return ListSnapshotResult(
success=False,
snapshots=[],
error_reason="Pod name not found. Ensure sandbox is created.",
error_code=SNAPSHOT_ERROR_CODE,
)

selectors.append(f"podsnapshot.gke.io/pod-name={pod_name}")

if grouping_labels:
for k, v in grouping_labels.items():
selectors.append(f"{k}={v}")

label_selector = ",".join(selectors)

logger.info(f"Listing snapshots with label selector: {label_selector}")
try:
# Fetch the PodSnapshots using label selector directly
response = self.k8s_helper.custom_objects_api.list_namespaced_custom_object(
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
namespace=self.namespace,
plural=PODSNAPSHOT_PLURAL,
label_selector=label_selector,
)
except ApiException as e:
logger.error(f"Failed to list PodSnapshots: {e}")
return ListSnapshotResult(
success=False,
snapshots=[],
error_reason=f"Failed to list PodSnapshots: {e}",
error_code=SNAPSHOT_ERROR_CODE,
)
except Exception as e:
logger.exception(
f"Unexpected error during list snapshots for grouping labels '{grouping_labels}': {e}"
)
return ListSnapshotResult(
success=False,
snapshots=[],
error_reason=f"Unexpected error: {e}",
error_code=SNAPSHOT_ERROR_CODE,
)

for snapshot in response.get("items") or []:
status = snapshot.get("status", {})
conditions = status.get("conditions") or []
metadata = snapshot.get("metadata", {})

# Check for Ready=True
is_ready = False
for cond in conditions:
if cond.get("type") == "Ready" and cond.get("status") == "True":
is_ready = True
break

# Skip if only ready snapshots are requested
if ready_only and not is_ready:
continue

valid_snapshots.append(
SnapshotDetail(
snapshot_uid=metadata.get("name"),
source_pod=metadata.get("labels", {}).get(
"podsnapshot.gke.io/pod-name", "Unknown"
),
creation_timestamp=metadata.get("creationTimestamp"),
status="Ready" if is_ready else "NotReady",
)
)

if not valid_snapshots:
logger.info("No snapshots found matching criteria.")
return ListSnapshotResult(
success=True,
snapshots=[],
error_reason="",
error_code=SNAPSHOT_SUCCESS_CODE,
)

# Sort snapshots by creation timestamp descending
valid_snapshots.sort(key=lambda x: x.creation_timestamp or "", reverse=True)
logger.info(f"Found {len(valid_snapshots)} snapshots.")
return ListSnapshotResult(
success=True,
snapshots=valid_snapshots,
error_reason="",
error_code=SNAPSHOT_SUCCESS_CODE,
)

def delete(
self,
grouping_labels: dict[str, str] | None = None,
snapshot_uid: str | None = None,
) -> DeleteSnapshotResult:
"""
Deletes snapshots.
- If snapshot_uid is provided, deletes that specific snapshot.
- If grouping_labels is provided, deletes all snapshots matching the grouping labels.
- If not provided, deletes ALL snapshots for this pod.

Note: snapshot_uid and grouping_labels are mutually exclusive.

Returns a DeleteSnapshotResult containing the list of successfully deleted snapshots.
"""
if snapshot_uid and grouping_labels:
raise ValueError(
"snapshot_uid and grouping_labels are mutually exclusive. "
"Provide only one of them."
)

snapshots_to_delete = []

if snapshot_uid:
snapshots_to_delete.append(snapshot_uid)
else:
if grouping_labels:
logger.info(
f"No snapshot_uid provided. Deleting snapshots based on pod name and grouping_labels: {grouping_labels}"
)
else:
logger.info("No filters provided. Deleting ALL snapshots for this pod.")

# Fetch all snapshots using list without filtering by ready status
snapshots_result = self.list(
grouping_labels=grouping_labels, ready_only=False
)
if not snapshots_result.success:
return DeleteSnapshotResult(
success=False,
deleted_snapshots=[],
error_reason=f"Failed to list snapshots before deletion: {snapshots_result.error_reason}",
error_code=SNAPSHOT_ERROR_CODE,
)
if snapshots_result.snapshots:
snapshots_to_delete = [
s.snapshot_uid for s in snapshots_result.snapshots
]
logger.info(f"Snapshots to delete: {snapshots_to_delete}")

if not snapshots_to_delete:
logger.info("No snapshots found matching criteria to delete.")
return DeleteSnapshotResult(
success=True,
deleted_snapshots=[],
error_reason="",
error_code=SNAPSHOT_SUCCESS_CODE,
)

deleted_snapshots = []
errors = []
for uid in snapshots_to_delete:
# Delete PodSnapshot
try:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for_snapshot_deletion returns a boolean indicating whether the deletion was confirmed or timed out. Currently, this return value is ignored, and the uid is unconditionally appended to deleted_snapshots even if the deletion timed out and potentially failed. You should check the return value and handle the timeout case appropriately (e.g., by adding it to errors instead of deleted_snapshots).

logger.info(f"Deleting PodSnapshot '{uid}'...")
self.k8s_helper.custom_objects_api.delete_namespaced_custom_object(
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
namespace=self.namespace,
plural=PODSNAPSHOT_PLURAL,
name=uid,
)
logger.info(
f"PodSnapshot '{uid}' deletion requested. Waiting for confirmation..."
)

# Wait for completion of deletion
wait_for_snapshot_deletion(
k8s_helper=self.k8s_helper,
namespace=self.namespace,
snapshot_uid=uid,
)

deleted_snapshots.append(uid)
except ApiException as e:
if e.status == 404:
logger.info(
f"PodSnapshot '{uid}' not found in K8s (already deleted?)."
)
else:
msg = f"Failed to delete PodSnapshot '{uid}': {e}"
logger.error(msg)
errors.append(msg)
except Exception as e:
msg = f"Unexpected error deleting PodSnapshot '{uid}': {e}"
logger.exception(msg)
errors.append(msg)

logger.info(
f"Snapshot deletion process completed. Deleted {len(deleted_snapshots)} snapshots."
)

if errors:
error_msg = "; ".join(errors)
if deleted_snapshots:
error_msg = f"Partial failure: deleted {len(deleted_snapshots)}/{len(snapshots_to_delete)} snapshots. Errors: {error_msg}"
return DeleteSnapshotResult(
success=False,
deleted_snapshots=deleted_snapshots,
error_reason=error_msg,
error_code=SNAPSHOT_ERROR_CODE,
)

return DeleteSnapshotResult(
success=True,
deleted_snapshots=deleted_snapshots,
error_reason="",
error_code=SNAPSHOT_SUCCESS_CODE,
)
Loading