diff --git a/pkg/workloadmanager/handlers.go b/pkg/workloadmanager/handlers.go index 46ffac27..af08723a 100644 --- a/pkg/workloadmanager/handlers.go +++ b/pkg/workloadmanager/handlers.go @@ -132,8 +132,19 @@ func (s *Server) handleSandboxCreate(c *gin.Context, kind string) { response, err := s.createSandbox(c.Request.Context(), dynamicClient, sandbox, sandboxClaim, sandboxEntry, resultChan) if err != nil { + // Client disconnected — nothing to write back, avoid a misleading 500 in metrics. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + klog.Warningf("create sandbox aborted %s/%s: client disconnected", sandbox.Namespace, sandbox.Name) + return + } klog.Errorf("create sandbox failed %s/%s: %v", sandbox.Namespace, sandbox.Name, err) - respondError(c, http.StatusInternalServerError, "internal server error") + // Internal errors (store, K8s API) must not leak system details to callers; + // sandbox-level failures (terminal pod state, timeout) are safe to surface. + msg := err.Error() + if apierrors.IsInternalError(err) { + msg = "internal server error" + } + respondError(c, http.StatusInternalServerError, msg) return } @@ -164,45 +175,32 @@ func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interf // This ensures the K8s resource and store placeholder are cleaned up on // timeout, pod-IP failure, or store-update failure — not just on post-creation errors. needRollbackSandbox := true - sandboxRollbackFunc := func() { - ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - var err error - if sandboxClaim != nil { - // Rollback SandboxClaim - err = deleteSandboxClaim(ctxTimeout, dynamicClient, sandboxClaim.Namespace, sandboxClaim.Name) - if err != nil { - klog.Infof("sandbox claim %s/%s rollback failed: %v", sandboxClaim.Namespace, sandboxClaim.Name, err) - } else { - klog.Infof("sandbox claim %s/%s rollback succeeded", sandboxClaim.Namespace, sandboxClaim.Name) - } - } else { - // Rollback Sandbox - err = deleteSandbox(ctxTimeout, dynamicClient, sandbox.Namespace, sandbox.Name) - if err != nil { - klog.Infof("sandbox %s/%s rollback failed: %v", sandbox.Namespace, sandbox.Name, err) - } else { - klog.Infof("sandbox %s/%s rollback succeeded", sandbox.Namespace, sandbox.Name) - } - } - // Clean up the store placeholder so it does not pollute GC queries - if delErr := s.storeClient.DeleteSandboxBySessionID(ctxTimeout, sandboxEntry.SessionID); delErr != nil { - klog.Infof("sandbox %s/%s store placeholder cleanup failed: %v", sandbox.Namespace, sandbox.Name, delErr) - } - } defer func() { if !needRollbackSandbox { return } - sandboxRollbackFunc() + s.rollbackSandboxCreation(dynamicClient, sandbox, sandboxClaim, sandboxEntry.SessionID) }() + // Use NewTimer so we can stop it explicitly when another branch wins, + // preventing the runtime from retaining the timer until it fires. + timer := time.NewTimer(2 * time.Minute) // consistent with router settings + var createdSandbox *sandboxv1alpha1.Sandbox select { case result := <-resultChan: + timer.Stop() + if result.Err != nil { + klog.Warningf("sandbox %s/%s failed: %v", sandbox.Namespace, sandbox.Name, result.Err) + return nil, result.Err + } createdSandbox = result.Sandbox klog.V(2).Infof("sandbox %s/%s running", createdSandbox.Namespace, createdSandbox.Name) - case <-time.After(2 * time.Minute): // consistent with router settings + case <-ctx.Done(): + timer.Stop() + klog.Warningf("sandbox %s/%s wait canceled: %v", sandbox.Namespace, sandbox.Name, ctx.Err()) + return nil, ctx.Err() + case <-timer.C: klog.Warningf("sandbox %s/%s create timed out", sandbox.Namespace, sandbox.Name) return nil, fmt.Errorf("sandbox creation timed out") } @@ -240,6 +238,30 @@ func (s *Server) createSandbox(ctx context.Context, dynamicClient dynamic.Interf return response, nil } +// rollbackSandboxCreation deletes the sandbox (or sandbox claim) and its store +// placeholder when creation fails. It runs in a fresh context so that a +// canceled request context does not prevent cleanup. +func (s *Server) rollbackSandboxCreation(dynamicClient dynamic.Interface, sandbox *sandboxv1alpha1.Sandbox, sandboxClaim *extensionsv1alpha1.SandboxClaim, sessionID string) { + ctxTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if sandboxClaim != nil { + if err := deleteSandboxClaim(ctxTimeout, dynamicClient, sandboxClaim.Namespace, sandboxClaim.Name); err != nil { + klog.Infof("sandbox claim %s/%s rollback failed: %v", sandboxClaim.Namespace, sandboxClaim.Name, err) + } else { + klog.Infof("sandbox claim %s/%s rollback succeeded", sandboxClaim.Namespace, sandboxClaim.Name) + } + } else { + if err := deleteSandbox(ctxTimeout, dynamicClient, sandbox.Namespace, sandbox.Name); err != nil { + klog.Infof("sandbox %s/%s rollback failed: %v", sandbox.Namespace, sandbox.Name, err) + } else { + klog.Infof("sandbox %s/%s rollback succeeded", sandbox.Namespace, sandbox.Name) + } + } + if delErr := s.storeClient.DeleteSandboxBySessionID(ctxTimeout, sessionID); delErr != nil { + klog.Infof("sandbox %s/%s store placeholder cleanup failed: %v", sandbox.Namespace, sandbox.Name, delErr) + } +} + // handleDeleteSandbox handles sandbox deletion requests func (s *Server) handleDeleteSandbox(c *gin.Context) { sessionID := c.Param("sessionId") diff --git a/pkg/workloadmanager/handlers_test.go b/pkg/workloadmanager/handlers_test.go index a5f78712..99b59e25 100644 --- a/pkg/workloadmanager/handlers_test.go +++ b/pkg/workloadmanager/handlers_test.go @@ -319,10 +319,19 @@ func TestHandleSandboxCreate(t *testing.T) { expectMessage: "internal server error", }, { - name: "create sandbox error", + name: "create sandbox error exposes message for non-internal errors", kind: types.AgentRuntimeKind, body: `{"name":"workload","namespace":"ns"}`, - createErr: errors.New("create failed"), + createErr: errors.New("sandbox ns/name failed: ErrImagePull"), + expectStatus: http.StatusInternalServerError, + expectMessage: "sandbox ns/name failed: ErrImagePull", + expectCreateCalls: 1, + }, + { + name: "create sandbox internal error is sanitized", + kind: types.AgentRuntimeKind, + body: `{"name":"workload","namespace":"ns"}`, + createErr: api.NewInternalError(errors.New("store connection refused")), expectStatus: http.StatusInternalServerError, expectMessage: "internal server error", expectCreateCalls: 1, diff --git a/pkg/workloadmanager/sandbox_controller.go b/pkg/workloadmanager/sandbox_controller.go index 0b445254..6d00f014 100644 --- a/pkg/workloadmanager/sandbox_controller.go +++ b/pkg/workloadmanager/sandbox_controller.go @@ -18,6 +18,7 @@ package workloadmanager import ( "context" + "fmt" "sync" "k8s.io/apimachinery/pkg/runtime" @@ -40,6 +41,7 @@ type SandboxReconciler struct { type SandboxStatusUpdate struct { Sandbox *sandboxv1alpha1.Sandbox + Err error } func (r *SandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -48,31 +50,44 @@ func (r *SandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, client.IgnoreNotFound(err) } - status := getSandboxStatus(sandbox) - - // Check for pending requests with proper locking - if status == "running" { - klog.V(2).Infof("Sandbox %s/%s is running, sending notification", sandbox.Namespace, sandbox.Name) - r.mu.Lock() - resultChan, exists := r.watchers[req.NamespacedName] - if exists { - klog.V(2).Infof("Found %d pending requests for sandbox %s/%s", len(r.watchers), sandbox.Namespace, sandbox.Name) - // Remove from map before sending to avoid memory leak - delete(r.watchers, req.NamespacedName) - } else { - klog.V(2).Infof("No pending requests found for sandbox %s/%s", sandbox.Namespace, sandbox.Name) - } - r.mu.Unlock() - - if exists { - // Send notification outside the lock to avoid deadlock - select { - case resultChan <- SandboxStatusUpdate{Sandbox: sandbox}: - klog.V(2).Infof("Notified waiter about sandbox %s/%s reaching Running state", sandbox.Namespace, sandbox.Name) - default: - klog.Warningf("Failed to notify watcher for sandbox %s/%s: channel buffer full or not receiving", sandbox.Namespace, sandbox.Name) - } + status, failMsg := getSandboxStatus(sandbox) + + // Only notify the waiter on a terminal state (running or failed). + // "unknown" means the sandbox is still being scheduled/started; stay quiet. + var ( + update SandboxStatusUpdate + hasWork bool + ) + switch status { + case "running": + klog.V(2).Infof("Sandbox %s/%s is running, notifying waiter", sandbox.Namespace, sandbox.Name) + update = SandboxStatusUpdate{Sandbox: sandbox} + hasWork = true + case "failed": + klog.Warningf("Sandbox %s/%s entered a terminal failure state: %s", sandbox.Namespace, sandbox.Name, failMsg) + update = SandboxStatusUpdate{ + Sandbox: sandbox, + Err: fmt.Errorf("sandbox %s/%s failed: %s", sandbox.Namespace, sandbox.Name, failMsg), } + hasWork = true + default: + return ctrl.Result{}, nil + } + + r.mu.Lock() + resultChan, exists := r.watchers[req.NamespacedName] + if exists { + delete(r.watchers, req.NamespacedName) + } + r.mu.Unlock() + + if exists && hasWork { + // WatchSandboxOnce always creates a buffered channel of size 1, and the + // map entry is deleted before this point so only one sender can ever + // reach here for a given key. The buffer is therefore always empty and + // this send never blocks. + resultChan <- update + klog.V(2).Infof("Notified waiter about sandbox %s/%s (status: %s)", sandbox.Namespace, sandbox.Name, status) } return ctrl.Result{}, nil diff --git a/pkg/workloadmanager/sandbox_helper.go b/pkg/workloadmanager/sandbox_helper.go index 17d565bc..945d930a 100644 --- a/pkg/workloadmanager/sandbox_helper.go +++ b/pkg/workloadmanager/sandbox_helper.go @@ -19,6 +19,7 @@ package workloadmanager import ( "net" "strconv" + "strings" "time" "github.com/volcano-sh/agentcube/pkg/common/types" @@ -60,17 +61,41 @@ func buildSandboxInfo(sandbox *sandboxv1alpha1.Sandbox, podIP string, entry *san SessionID: entry.SessionID, CreatedAt: createdAt, ExpiresAt: expiresAt, - Status: getSandboxStatus(sandbox), + Status: sandboxStatusString(sandbox), } } -// getSandboxStatus extracts status from Sandbox CRD conditions -func getSandboxStatus(sandbox *sandboxv1alpha1.Sandbox) string { - // Check conditions for Ready status +// sandboxStatusString returns only the status string (discards the failure message). +// Use this where only the status label is needed (e.g. store metadata). +func sandboxStatusString(sandbox *sandboxv1alpha1.Sandbox) string { + s, _ := getSandboxStatus(sandbox) + return s +} + +// getSandboxStatus extracts status from Sandbox CRD conditions. +// Returns "running", "failed", or "unknown". +func getSandboxStatus(sandbox *sandboxv1alpha1.Sandbox) (string, string) { for _, condition := range sandbox.Status.Conditions { - if condition.Type == string(sandboxv1alpha1.SandboxConditionReady) && condition.Status == metav1.ConditionTrue { - return "running" + if condition.Type != string(sandboxv1alpha1.SandboxConditionReady) { + continue + } + if condition.Status == metav1.ConditionTrue { + return "running", "" + } + // ConditionFalse with a reason indicates a terminal failure, not transient pending. + if condition.Status == metav1.ConditionFalse && condition.Reason != "" { + msg := condition.Message + if msg == "" { + msg = condition.Reason + } + // "Operation cannot be fulfilled" is a Kubernetes conflict error (HTTP 409). + // The sandbox controller retries these automatically, so they are transient — + // do not surface them as terminal failures. + if strings.Contains(msg, "Operation cannot be fulfilled") { + return "unknown", "" + } + return "failed", msg } } - return "unknown" + return "unknown", "" } diff --git a/pkg/workloadmanager/sandbox_helper_test.go b/pkg/workloadmanager/sandbox_helper_test.go index c58ade10..5a8498dc 100644 --- a/pkg/workloadmanager/sandbox_helper_test.go +++ b/pkg/workloadmanager/sandbox_helper_test.go @@ -207,9 +207,10 @@ func TestBuildSandboxInfo_TableDriven(t *testing.T) { func TestGetSandboxStatus_TableDriven(t *testing.T) { tests := []struct { - name string - sandbox *sandboxv1alpha1.Sandbox - expected string + name string + sandbox *sandboxv1alpha1.Sandbox + expected string + expectedMsg string }{ { name: "ready condition true", @@ -223,10 +224,11 @@ func TestGetSandboxStatus_TableDriven(t *testing.T) { }, }, }, - expected: "running", + expected: "running", + expectedMsg: "", }, { - name: "ready condition false", + name: "ready condition false without reason", sandbox: &sandboxv1alpha1.Sandbox{ Status: sandboxv1alpha1.SandboxStatus{ Conditions: []metav1.Condition{ @@ -237,7 +239,58 @@ func TestGetSandboxStatus_TableDriven(t *testing.T) { }, }, }, - expected: "unknown", + expected: "unknown", + expectedMsg: "", + }, + { + name: "ready condition false with reason indicates terminal failure", + sandbox: &sandboxv1alpha1.Sandbox{ + Status: sandboxv1alpha1.SandboxStatus{ + Conditions: []metav1.Condition{ + { + Type: string(sandboxv1alpha1.SandboxConditionReady), + Status: metav1.ConditionFalse, + Reason: "ErrImagePull", + Message: "Back-off pulling image", + }, + }, + }, + }, + expected: "failed", + expectedMsg: "Back-off pulling image", + }, + { + name: "ready condition false with reason but no message falls back to reason", + sandbox: &sandboxv1alpha1.Sandbox{ + Status: sandboxv1alpha1.SandboxStatus{ + Conditions: []metav1.Condition{ + { + Type: string(sandboxv1alpha1.SandboxConditionReady), + Status: metav1.ConditionFalse, + Reason: "OOMKilled", + }, + }, + }, + }, + expected: "failed", + expectedMsg: "OOMKilled", + }, + { + name: "ready condition false with conflict error is treated as transient", + sandbox: &sandboxv1alpha1.Sandbox{ + Status: sandboxv1alpha1.SandboxStatus{ + Conditions: []metav1.Condition{ + { + Type: string(sandboxv1alpha1.SandboxConditionReady), + Status: metav1.ConditionFalse, + Reason: "Error", + Message: "Error seen: failed to update pod: Operation cannot be fulfilled on pods \"my-pod\": the object has been modified; please apply your changes to the latest version and try again", + }, + }, + }, + }, + expected: "unknown", + expectedMsg: "", }, { name: "ready condition unknown", @@ -251,7 +304,8 @@ func TestGetSandboxStatus_TableDriven(t *testing.T) { }, }, }, - expected: "unknown", + expected: "unknown", + expectedMsg: "", }, { name: "no conditions", @@ -260,7 +314,8 @@ func TestGetSandboxStatus_TableDriven(t *testing.T) { Conditions: []metav1.Condition{}, }, }, - expected: "unknown", + expected: "unknown", + expectedMsg: "", }, { name: "nil conditions", @@ -269,7 +324,8 @@ func TestGetSandboxStatus_TableDriven(t *testing.T) { Conditions: nil, }, }, - expected: "unknown", + expected: "unknown", + expectedMsg: "", }, { name: "other condition type", @@ -283,7 +339,8 @@ func TestGetSandboxStatus_TableDriven(t *testing.T) { }, }, }, - expected: "unknown", + expected: "unknown", + expectedMsg: "", }, { name: "multiple conditions with ready true", @@ -301,14 +358,16 @@ func TestGetSandboxStatus_TableDriven(t *testing.T) { }, }, }, - expected: "running", + expected: "running", + expectedMsg: "", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := getSandboxStatus(tt.sandbox) + result, msg := getSandboxStatus(tt.sandbox) assert.Equal(t, tt.expected, result) + assert.Equal(t, tt.expectedMsg, msg) }) } }