Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions manifests/charts/base/templates/rbac/workloadmanager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["networking.k8s.io"]
resources: ["networkpolicies"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/runtime/v1alpha1/agent_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type AgentRuntimeSpec struct {
// +kubebuilder:validation:Required
// +kubebuilder:default="8h"
MaxSessionDuration *metav1.Duration `json:"maxSessionDuration,omitempty" protobuf:"bytes,3,opt,name=maxSessionDuration"`

// NetworkPolicy defines the network access rules for the sandbox.
// If not specified, a default deny-all policy is applied to enforce isolation.
// +optional
NetworkPolicy *SandboxNetworkPolicy `json:"networkPolicy,omitempty"`
Comment on lines 56 to +63
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The agent_type.go file references SandboxNetworkPolicy (which contains networkingv1.NetworkPolicyIngressRule and networkingv1.NetworkPolicyEgressRule fields) but does not import the networkingv1 package. While this may compile since SandboxNetworkPolicy is defined in the same package, this creates an inconsistency since codeinterpreter_types.go has the import. For consistency and to support code generation tools, add: networkingv1 "k8s.io/api/networking/v1" to the imports.

Copilot uses AI. Check for mistakes.
}

// AgentRuntimeStatus represents the observed state of an AgentRuntime.
Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/runtime/v1alpha1/codeinterpreter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -81,6 +82,11 @@ type CodeInterpreterSpec struct {
// +kubebuilder:validation:Enum=picod;none
// +optional
AuthMode AuthModeType `json:"authMode,omitempty"`

// NetworkPolicy defines the network access rules for the sandbox.
// If not specified, a default deny-all policy is applied to enforce isolation.
// +optional
NetworkPolicy *SandboxNetworkPolicy `json:"networkPolicy,omitempty"`
}

// CodeInterpreterStatus represents the observed state of a CodeInterpreter.
Expand Down Expand Up @@ -190,6 +196,20 @@ const (
ProtocolTypeHTTPS ProtocolType = "HTTPS"
)

// SandboxNetworkPolicy defines the network access rules for a sandbox.
// It maps directly to a Kubernetes NetworkPolicy applied to the sandbox pod.
type SandboxNetworkPolicy struct {
// Ingress is the list of ingress rules applied to the sandbox pod.
// An empty list means all ingress traffic is denied.
// +optional
Ingress []networkingv1.NetworkPolicyIngressRule `json:"ingress,omitempty"`

// Egress is the list of egress rules applied to the sandbox pod.
// An empty list means all egress traffic is denied.
// +optional
Egress []networkingv1.NetworkPolicyEgressRule `json:"egress,omitempty"`
}

// CodeInterpreterList contains a list of CodeInterpreter
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true
Expand Down
40 changes: 40 additions & 0 deletions pkg/apis/runtime/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/workloadmanager/garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func (gc *garbageCollector) once() {
errs = append(errs, err)
continue
}
// Delete the NetworkPolicy associated with the sandbox.
if npErr := deleteNetworkPolicy(ctx, gc.k8sClient.clientset, gcSandbox.SandboxNamespace, gcSandbox.Name); npErr != nil {
klog.Warningf("garbage collector failed to delete network policy for %s/%s: %v", gcSandbox.SandboxNamespace, gcSandbox.Name, npErr)
}
klog.Infof("garbage collector %s %s/%s session %s deleted", gcSandbox.Kind, gcSandbox.SandboxNamespace, gcSandbox.Name, gcSandbox.SessionID)
err = gc.storeClient.DeleteSandboxBySessionID(ctx, gcSandbox.SessionID)
if err != nil {
Expand Down
97 changes: 56 additions & 41 deletions pkg/workloadmanager/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,63 +140,73 @@ func (s *Server) handleSandboxCreate(c *gin.Context, kind string) {
respondJSON(c, http.StatusOK, response)
}

// createK8sResource creates either a SandboxClaim or a Sandbox depending on which is provided.
func createK8sResource(ctx context.Context, dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxClaim *extensionsv1alpha1.SandboxClaim) error {
if sandboxClaim != nil {
if err := createSandboxClaim(ctx, dynamicClient, sandboxClaim); err != nil {
return api.NewInternalError(fmt.Errorf("create sandbox claim %s/%s failed: %v", sandboxClaim.Namespace, sandboxClaim.Name, err))
}
return nil
}
if _, err := createSandbox(ctx, dynamicClient, sandbox); err != nil {
return api.NewInternalError(fmt.Errorf("failed to create sandbox: %w", err))
}
return nil
}

// rollbackSandboxCreation deletes the sandbox/sandboxClaim, its NetworkPolicy, and the store placeholder.
func (s *Server) rollbackSandboxCreation(dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxClaim *extensionsv1alpha1.SandboxClaim, sessionID string) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if sandboxClaim != nil {
if err := deleteSandboxClaim(ctx, dynamicClient, sandboxClaim.Namespace, sandboxClaim.Name); err != nil {
klog.Infof("sandbox claim %s/%s rollback failed: %v", sandboxClaim.Namespace, sandboxClaim.Name, err)
} else {
klog.Infof("sandbox claim %s/%s rollback succeeded", sandboxClaim.Namespace, sandboxClaim.Name)
}
} else {
if err := deleteSandbox(ctx, dynamicClient, sandbox.Namespace, sandbox.Name); err != nil {
klog.Infof("sandbox %s/%s rollback failed: %v", sandbox.Namespace, sandbox.Name, err)
} else {
klog.Infof("sandbox %s/%s rollback succeeded", sandbox.Namespace, sandbox.Name)
}
}
if err := deleteNetworkPolicy(ctx, s.k8sClient.clientset, sandbox.Namespace, sandbox.Name); err != nil {
klog.Warningf("network policy %s/%s rollback failed: %v", sandbox.Namespace, sandbox.Name, err)
}
if err := s.storeClient.DeleteSandboxBySessionID(ctx, sessionID); err != nil {
klog.Infof("sandbox %s/%s store placeholder cleanup failed: %v", sandbox.Namespace, sandbox.Name, err)
}
}

// createSandbox performs sandbox creation and returns the response payload or an error with an HTTP status code.
func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxClaim *extensionsv1alpha1.SandboxClaim, sandboxEntry *sandboxEntry, resultChan <-chan SandboxStatusUpdate) (*types.CreateSandboxResponse, error) {
// Store placeholder before creating, make sandbox/sandboxClaim GarbageCollection possible
sandboxStorePlaceHolder := buildSandboxPlaceHolder(sandbox, sandboxEntry)
if err := s.storeClient.StoreSandbox(ctx, sandboxStorePlaceHolder); err != nil {
err = api.NewInternalError(fmt.Errorf("store sandbox placeholder failed: %v", err))
return nil, err
return nil, api.NewInternalError(fmt.Errorf("store sandbox placeholder failed: %v", err))
}

if sandboxClaim != nil {
if err := createSandboxClaim(ctx, dynamicClient, sandboxClaim); err != nil {
err = api.NewInternalError(fmt.Errorf("create sandbox claim %s/%s failed: %v", sandboxClaim.Namespace, sandboxClaim.Name, err))
return nil, err
}
} else {
if _, err := createSandbox(ctx, dynamicClient, sandbox); err != nil {
return nil, api.NewInternalError(fmt.Errorf("failed to create sandbox: %w", err))
}
if err := createK8sResource(ctx, dynamicClient, sandbox, sandboxClaim); err != nil {
return nil, err
}

// Register rollback BEFORE waiting for the sandbox to become ready.
// This ensures the K8s resource and store placeholder are cleaned up on
// timeout, pod-IP failure, or store-update failure — not just on post-creation errors.
// Register rollback BEFORE creating the NetworkPolicy and waiting for ready.
// This ensures all resources are cleaned up if anything fails from this point on.
needRollbackSandbox := true
sandboxRollbackFunc := func() {
ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var err error
if sandboxClaim != nil {
// Rollback SandboxClaim
err = deleteSandboxClaim(ctxTimeout, dynamicClient, sandboxClaim.Namespace, sandboxClaim.Name)
if err != nil {
klog.Infof("sandbox claim %s/%s rollback failed: %v", sandboxClaim.Namespace, sandboxClaim.Name, err)
} else {
klog.Infof("sandbox claim %s/%s rollback succeeded", sandboxClaim.Namespace, sandboxClaim.Name)
}
} else {
// Rollback Sandbox
err = deleteSandbox(ctxTimeout, dynamicClient, sandbox.Namespace, sandbox.Name)
if err != nil {
klog.Infof("sandbox %s/%s rollback failed: %v", sandbox.Namespace, sandbox.Name, err)
} else {
klog.Infof("sandbox %s/%s rollback succeeded", sandbox.Namespace, sandbox.Name)
}
}
// Clean up the store placeholder so it does not pollute GC queries
if delErr := s.storeClient.DeleteSandboxBySessionID(ctxTimeout, sandboxEntry.SessionID); delErr != nil {
klog.Infof("sandbox %s/%s store placeholder cleanup failed: %v", sandbox.Namespace, sandbox.Name, delErr)
}
}
defer func() {
if !needRollbackSandbox {
return
}
sandboxRollbackFunc()
s.rollbackSandboxCreation(dynamicClient, sandbox, sandboxClaim, sandboxEntry.SessionID)
}()

// Create NetworkPolicy after rollback is registered so it is included in cleanup on failure.
np := buildNetworkPolicy(sandbox.Namespace, sandbox.Name, sandboxEntry.NetworkPolicy)
if err := createNetworkPolicy(ctx, s.k8sClient.clientset, np); err != nil {
return nil, api.NewInternalError(fmt.Errorf("failed to create network policy for sandbox %s/%s: %w", sandbox.Namespace, sandbox.Name, err))
}

var createdSandbox *sandboxv1alpha1.Sandbox
select {
case result := <-resultChan:
Expand Down Expand Up @@ -291,6 +301,11 @@ func (s *Server) handleDeleteSandbox(c *gin.Context) {
}
}

// Delete the NetworkPolicy associated with the sandbox.
if npErr := deleteNetworkPolicy(c.Request.Context(), s.k8sClient.clientset, sandbox.SandboxNamespace, sandbox.Name); npErr != nil {
klog.Errorf("failed to delete network policy for sandbox %s/%s: %v", sandbox.SandboxNamespace, sandbox.Name, npErr)
}

// Delete sandbox from store
err = s.storeClient.DeleteSandboxBySessionID(c.Request.Context(), sessionID)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/workloadmanager/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1"
"github.com/volcano-sh/agentcube/pkg/common/types"
"github.com/volcano-sh/agentcube/pkg/store"
networkingv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1"
"sigs.k8s.io/agent-sandbox/controllers"
extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1"
Expand Down Expand Up @@ -218,6 +220,13 @@ func TestServerCreateSandbox(t *testing.T) {
return "10.0.0.9", nil
})

patches.ApplyFunc(createNetworkPolicy, func(_ context.Context, _ kubernetes.Interface, _ *networkingv1.NetworkPolicy) error {
return nil
})
Comment on lines +223 to +225
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for network policy creation failure. The test mocks createNetworkPolicy to always succeed (line 223-225), but there's no test case that verifies the behavior when network policy creation fails. This scenario should trigger rollback since rollback is registered before network policy creation. Add a test case to verify that network policy creation failure properly rolls back the sandbox and sandbox claim creation.

Copilot uses AI. Check for mistakes.
patches.ApplyFunc(deleteNetworkPolicy, func(_ context.Context, _ kubernetes.Interface, _, _ string) error {
return nil
})

resp, err := server.createSandbox(context.Background(), nil, sb, claim, makeEntry(), resultChan)

require.Equal(t, tt.expectCreateCalls, createCalls, "createSandbox call count")
Expand Down
7 changes: 4 additions & 3 deletions pkg/workloadmanager/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ type K8sClient struct {
}

type sandboxEntry struct {
Kind string
SessionID string
Ports []runtimev1alpha1.TargetPort
Kind string
SessionID string
Ports []runtimev1alpha1.TargetPort
NetworkPolicy *runtimev1alpha1.SandboxNetworkPolicy
}

// NewK8sClient creates a new Kubernetes client
Expand Down
84 changes: 84 additions & 0 deletions pkg/workloadmanager/network_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workloadmanager

import (
"context"
"fmt"

runtimev1alpha1 "github.com/volcano-sh/agentcube/pkg/apis/runtime/v1alpha1"
networkingv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// buildNetworkPolicy constructs a NetworkPolicy for a sandbox pod.
// The policy selects the pod via the SandboxNameLabelKey label.
// If spec is nil, a default deny-all policy is returned to enforce isolation.
func buildNetworkPolicy(namespace, sandboxName string, spec *runtimev1alpha1.SandboxNetworkPolicy) *networkingv1.NetworkPolicy {
ingress := []networkingv1.NetworkPolicyIngressRule{}
egress := []networkingv1.NetworkPolicyEgressRule{}

if spec != nil {
ingress = spec.Ingress
egress = spec.Egress
}

return &networkingv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: sandboxName,
Namespace: namespace,
Labels: map[string]string{
SandboxNameLabelKey: sandboxName,
"managed-by": "agentcube-workload-manager",
},
},
Spec: networkingv1.NetworkPolicySpec{
PodSelector: metav1.LabelSelector{
MatchLabels: map[string]string{
SandboxNameLabelKey: sandboxName,
},
},
PolicyTypes: []networkingv1.PolicyType{
networkingv1.PolicyTypeIngress,
networkingv1.PolicyTypeEgress,
},
Ingress: ingress,
Egress: egress,
},
}
}

// createNetworkPolicy creates a NetworkPolicy for the sandbox using the system client.
func createNetworkPolicy(ctx context.Context, clientset kubernetes.Interface, np *networkingv1.NetworkPolicy) error {
_, err := clientset.NetworkingV1().NetworkPolicies(np.Namespace).Create(ctx, np, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create network policy %s/%s: %w", np.Namespace, np.Name, err)
}
return nil
}

// deleteNetworkPolicy deletes the NetworkPolicy associated with a sandbox.
// Not-found errors are silently ignored.
func deleteNetworkPolicy(ctx context.Context, clientset kubernetes.Interface, namespace, name string) error {
err := clientset.NetworkingV1().NetworkPolicies(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete network policy %s/%s: %w", namespace, name, err)
}
return nil
}
Loading
Loading