Skip to content
Closed
38 changes: 38 additions & 0 deletions clients/python/agentic-sandbox-client/agentic_sandbox/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2025 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Constants for API Groups and Resources
GATEWAY_API_GROUP = "gateway.networking.k8s.io"
GATEWAY_API_VERSION = "v1"
GATEWAY_PLURAL = "gateways"

CLAIM_API_GROUP = "extensions.agents.x-k8s.io"
CLAIM_API_VERSION = "v1alpha1"
CLAIM_PLURAL_NAME = "sandboxclaims"

SANDBOX_API_GROUP = "agents.x-k8s.io"
SANDBOX_API_VERSION = "v1alpha1"
SANDBOX_PLURAL_NAME = "sandboxes"

POD_NAME_ANNOTATION = "agents.x-k8s.io/pod-name"

PODSNAPSHOT_API_GROUP = "podsnapshot.gke.io"
PODSNAPSHOT_API_VERSION = "v1alpha1"
PODSNAPSHOT_PLURAL = "podsnapshots"
PODSNAPSHOTMANUALTRIGGER_PLURAL = "podsnapshotmanualtriggers"

SNAPSHOT_NAMESPACE_SELF_INSTALLED = "gps-system"
SNAPSHOT_NAMESPACE_MANAGED = "gke-managed-pod-snapshots"
SNAPSHOT_CONTROLLER_NAME = "pod-snapshot-controller"
SNAPSHOT_AGENT = "pod-snapshot-agent"
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2026 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .podsnapshot_client import PodSnapshotSandboxClient
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Agentic Sandbox Pod Snapshot Extension

This directory contains the Python client extension for interacting with the Agentic Sandbox to manage Pod Snapshots. This extension allows you to trigger checkpoints (snapshots) of a running sandbox and restore a new sandbox from the recently created snapshot.

## `podsnapshot_client.py`

This file defines the `SnapshotPersistenceManager` and `PodSnapshotSandboxClient` class, which extend the base `SandboxClient` to provide snapshot capabilities.

### `SnapshotPersistenceManager`

A utility class for managing local persistence of snapshot metadata in a secure directory. Stores metadata as a dictionary keyed by `trigger_name`.

### `PodSnapshotSandboxClient`

A specialized Sandbox client for interacting with the gke pod snapshot controller.

### Key Features:

* **`PodSnapshotSandboxClient(template_name: str, podsnapshot_timeout: int = 180, server_port: int = 8080, ...)`**:
* Initializes the client with optional podsnapshot timeout and server port.
* If snapshot exists, the pod snapshot controller restores from the most recent snapshot matching the label of the `SandboxTemplate`, otherwise creates a new `Sandbox`.
* **`snapshot_controller_ready(self) -> bool`**:
* Checks if the snapshot agent (both self-installed and GKE managed) is running and ready.
* **`checkpoint(self, trigger_name: str) -> tuple[ExecutionResult, str]`**:
* Triggers a manual snapshot of the current sandbox pod by creating a `PodSnapshotManualTrigger` resource.
* The trigger_name is suffixed with the current datetime.
* Waits for the snapshot to be processed.
* The pod snapshot controller creates a `PodSnapshot` resource automatically.
* Returns a tuple of ExecutionResult and the final trigger name.
* **`list_snapshots(self, policy_name: str, ready_only: bool = True) -> list | None`**:
* TBD
* **`delete_snapshots(self, trigger_name: str) -> int`**:
* TBD
* **Automatic Cleanup**:
* The `__exit__` method cleans up the `SandboxClaim` resources.

## `test_podsnapshot_extension.py`

This file, located in the parent directory (`clients/python/agentic-sandbox-client/`), contains an integration test script for the `PodSnapshotSandboxClient` extension. It verifies the checkpoint and restore functionality.

### Test Phases:

1. **Phase 1: Starting Counter & Checkpointing**:
* Starts a sandbox with a counter application.
* Takes a snapshot (`test-snapshot-10`) after ~10 seconds.
* Takes a second snapshot (`test-snapshot-20`) after another ~10 seconds.
2. **Phase 2: Restoring from Recent Snapshot**:
* Restores a sandbox from the second snapshot.
* Verifies that the counter continues from where it left off (>= 20), proving the state was preserved.

### Prerequisites

1. **Python Virtual Environment**:
```bash
python3 -m venv .venv
source .venv/bin/activate
```

2. **Install Dependencies**:
```bash
pip install kubernetes
pip install -e clients/python/agentic-sandbox-client/
```

3. **Pod Snapshot Controller**: The Pod Snapshot controller must be installed in the standard cluster running inside gVisor(Userguide). The GCS bucket to store the pod snapshot states and respective permissions must be applied.
4. **CRDs**: `PodSnapshotStorageConfig`, `PodSnapshotPolicy` CRDs must be applied. `PodSnapshotPolicy` should specify the selector match labels.
5. **Sandbox Template**: A `SandboxTemplate` (e.g., `python-counter-template`) with runtime gVisor and label that matches that selector label in `PodSnapshotPolicy` must be available in the cluster.

### Running Tests:

To run the integration test, execute the script with the appropriate arguments:

```bash
python3 clients/python/agentic-sandbox-client/test_podsnapshot_extension.py \
--labels app=agent-sandbox-workload \
--template-name python-counter-template \
--namespace sandbox-test
```

Adjust the `--namespace`, `--template-name`, and `--labels` as needed for your environment.
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# Copyright 2026 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys
from datetime import datetime
from typing import Any
from kubernetes import client, watch
from kubernetes.client import ApiException
from ..sandbox_client import SandboxClient, ExecutionResult
from ..constants import *

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stdout,
)


class SnapshotPersistenceManager:
"""
Manages local persistence of snapshot metadata in a secure directory.
Stores metadata as a dictionary keyed by trigger_name.
"""
def __init__(self):
"""Initializes the persistence manager and ensures the secure directory exists."""
pass

def _ensure_secure_dir(self):
"""Ensures the directory exists with 700 permissions."""
pass

def _load_metadata(self) -> dict[str, Any]:
"""Loads metadata. Returns an empty dict if file doesn't exist or is invalid."""
pass

def save_snapshot_metadata(self, record: dict[str, Any]):
"""Saves a snapshot record to the local registry."""
pass

def delete_snapshot_metadata(self, trigger_name: str):
"""Deletes a snapshot record from the local registry."""
pass


class PodSnapshotSandboxClient(SandboxClient):
"""
A specialized Sandbox client for interacting with the gke pod snapshot controller.
Handles the case only when triggerConfig is type manual.
"""

def __init__(
self,
template_name: str,
podsnapshot_timeout: int = 180,
server_port: int = 8080,
**kwargs,
):
super().__init__(
template_name, server_port=server_port, **kwargs
)

self.controller_ready = False
self.podsnapshot_timeout = podsnapshot_timeout
self.controller_ready = self.snapshot_controller_ready()


def _wait_for_snapshot_processed(self, trigger_name: str) -> tuple[str, str]:
"""
Waits for the PodSnapshotManualTrigger to be processed and returns (snapshot_uid, timestamp).
"""
w = watch.Watch()
logging.info(f"Waiting for snapshot manual trigger '{trigger_name}' to be processed...")

try:
for event in w.stream(
func=self.custom_objects_api.list_namespaced_custom_object,
namespace=self.namespace,
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
plural=PODSNAPSHOTMANUALTRIGGER_PLURAL,
field_selector=f"metadata.name={trigger_name}",
timeout_seconds=self.podsnapshot_timeout
):
if event["type"] in ["ADDED", "MODIFIED"]:
obj = event["object"]
status = obj.get("status", {})
conditions = status.get("conditions", [])

for condition in conditions:
if (
condition.get("type") == "Triggered"
and condition.get("status") == "True"
and condition.get("reason") == "Complete"
):
uid = status.get('snapshotCreated', {}).get('name')
timestamp = condition.get('lastTransitionTime')
logging.info(f"Snapshot manual trigger '{trigger_name}' processed successfully. Created Snapshot UID: {uid}")
w.stop()
return uid, timestamp
except Exception as e:
logging.error(f"Error watching snapshot: {e}")
raise

raise TimeoutError(f"Snapshot manual trigger '{trigger_name}' was not processed within {self.podsnapshot_timeout} seconds.")


def snapshot_controller_ready(self) -> bool:
"""
Checks if the snapshot controller and agent pods are running.
Checks both self-installed (gps-system) and GKE-managed pod snapshot systems.
"""

if self.controller_ready:
return True

v1 = client.CoreV1Api()

def check_namespace(namespace: str, required_components: list[str]) -> bool:
try:
pods = v1.list_namespaced_pod(namespace)
found_components = {component: False for component in required_components}

for pod in pods.items:
if pod.status.phase == "Running":
name = pod.metadata.name
for component in required_components:
if component in name:
found_components[component] = True

return all(found_components.values())
except ApiException:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will swallow more errors (500, 401 or network timeouts). You should only catch 404 Not Found (if checking for existence) or 403 Forbidden (if checking permissions). If the API returns 500 Internal Server Error or 401 Unauthorized, you should raise the exception so the user knows their cluster is broken, rather than silently telling them "snapshot controller not ready."

return False

# Check self-installed: requires both controller and agent in gps-system
if check_namespace(SNAPSHOT_NAMESPACE_SELF_INSTALLED, [SNAPSHOT_CONTROLLER_NAME, SNAPSHOT_AGENT]):
self.controller_ready = True
return True

# Check managed: requires only agent in gke-managed-pod-snapshots
if check_namespace(SNAPSHOT_NAMESPACE_MANAGED, [SNAPSHOT_AGENT]):
self.controller_ready = True
return True

self.controller_ready = False
return self.controller_ready


def checkpoint(self, trigger_name: str) -> tuple[ExecutionResult, str]:
"""
Triggers a snapshot of the specified pod by creating a PodSnapshotManualTrigger resource.
The trigger_name will be suffixed with the current datetime.
Returns:
tuple[ExecutionResult, str]: The result of the operation and the final trigger name (with suffix).
"""
trigger_name = f"{trigger_name}-{datetime.now().strftime('%Y%m%d%H%M%S')}"

if not self.controller_ready:
return ExecutionResult(
stdout="",
stderr="Snapshot controller is not ready. Ensure it is installed and running.",
exit_code=1
), trigger_name
if not self.pod_name:
return ExecutionResult(
stdout="",
stderr="Sandbox pod name not found. Ensure sandbox is created.",
exit_code=1
), trigger_name

manifest = {
"apiVersion": f"{PODSNAPSHOT_API_GROUP}/{PODSNAPSHOT_API_VERSION}",
"kind": "PodSnapshotManualTrigger",
"metadata": {
"name": trigger_name,
"namespace": self.namespace
},
"spec": {
"targetPod": self.pod_name
}
}

try:
self.custom_objects_api.create_namespaced_custom_object(
group=PODSNAPSHOT_API_GROUP,
version=PODSNAPSHOT_API_VERSION,
namespace=self.namespace,
plural=PODSNAPSHOTMANUALTRIGGER_PLURAL,
body=manifest
)
snapshot_uid, timestamp = self._wait_for_snapshot_processed(trigger_name)

# TODO: Add snapshot metadata persistence logic here using SnapshotPersistenceManager

return ExecutionResult(
stdout=f"PodSnapshotManualTrigger '{trigger_name}' created successfully.",
stderr="",
exit_code=0
), trigger_name
except ApiException as e:
return ExecutionResult(
stdout="",
stderr=f"Failed to create PodSnapshotManualTrigger: {e}",
exit_code=1
), trigger_name
except TimeoutError as e:
return ExecutionResult(
stdout="",
stderr=f"Snapshot creation timed out: {e}",
exit_code=1
), trigger_name


def list_snapshots(self, policy_name: str, ready_only: bool = True) -> list | None:
"""
Checks for existing snapshots matching the label selector and optional policy name.
Returns a list of valid snapshots sorted by creation timestamp (newest first).
policy_name: Filters snapshots by their spec.policyName.
ready_only: If True, filters out snapshots that are only in 'Ready' state.
"""
pass


def delete_snapshots(self, trigger_name: str) -> int:
"""
Deletes snapshots matching the provided trigger name and the PSMT resources.
Returns the count of successfully deleted snapshots.
"""
pass


def __exit__(self, exc_type, exc_val, exc_tb):
"""
Automatically cleans up the Sandbox.
"""
super().__exit__(exc_type, exc_val, exc_tb)
Loading