From 35ef69283c7ded40c1067080e2913aa79ef5fc6d Mon Sep 17 00:00:00 2001 From: Vicente Ferrara Date: Mon, 30 Mar 2026 01:51:32 +0000 Subject: [PATCH 1/3] optimization: switch to JSON Merge Patches for updateStatus methods in all controllers --- controllers/sandbox_controller.go | 21 ++++++++++---- .../controllers/sandboxclaim_controller.go | 18 ++++++++---- .../controllers/sandboxwarmpool_controller.go | 29 +++++++++---------- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/controllers/sandbox_controller.go b/controllers/sandbox_controller.go index e788be924..5bc7bd80f 100644 --- a/controllers/sandbox_controller.go +++ b/controllers/sandbox_controller.go @@ -19,10 +19,10 @@ import ( "errors" "fmt" "hash/fnv" - "reflect" "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -254,16 +254,27 @@ func (r *SandboxReconciler) computeReadyCondition(sandbox *sandboxv1alpha1.Sandb func (r *SandboxReconciler) updateStatus(ctx context.Context, oldStatus *sandboxv1alpha1.SandboxStatus, sandbox *sandboxv1alpha1.Sandbox) error { log := log.FromContext(ctx) - if reflect.DeepEqual(oldStatus, &sandbox.Status) { + // Use equality.Semantic.DeepEqual for robust comparison of Kubernetes objects + if equality.Semantic.DeepEqual(oldStatus, &sandbox.Status) { return nil } - if err := r.Status().Update(ctx, sandbox); err != nil { - log.Error(err, "Failed to update sandbox status") + // Create a copy of the sandbox to use as the base for the patch. + oldSandbox := sandbox.DeepCopy() + // Set the status of the copy to the *original* status. + oldSandbox.Status = *oldStatus + + // Create a merge patch by comparing the oldSandbox (with oldStatus) + // with the current state of sandbox (which has the new desired status). + patch := client.MergeFrom(oldSandbox) + + // Apply the patch to the status subresource. + if err := r.Status().Patch(ctx, sandbox, patch); err != nil { + log.Error(err, "Failed to patch sandbox status") return err } - // Surface error + log.Info("Successfully patched sandbox status") return nil } diff --git a/extensions/controllers/sandboxclaim_controller.go b/extensions/controllers/sandboxclaim_controller.go index 31a02ffaf..82a84d070 100644 --- a/extensions/controllers/sandboxclaim_controller.go +++ b/extensions/controllers/sandboxclaim_controller.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" - "k8s.io/apimachinery/pkg/api/equality" k8errors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -271,15 +270,22 @@ func (r *SandboxClaimReconciler) updateStatus(ctx context.Context, oldStatus *ex return claim.Status.Conditions[i].Type < claim.Status.Conditions[j].Type }) - if equality.Semantic.DeepEqual(oldStatus, &claim.Status) { - return nil - } + // Create a copy of the claim to use as the base for the patch. + oldClaim := claim.DeepCopy() + // Set the status of the copy to the *original* status. + oldClaim.Status = *oldStatus + + // Create a merge patch by comparing the oldClaim (with oldStatus) + // with the current state of claim (which has the new desired status). + patch := client.MergeFrom(oldClaim) - if err := r.Status().Update(ctx, claim); err != nil { - logger.Error(err, "Failed to update sandboxclaim status") + // Apply the patch to the status subresource. + if err := r.Status().Patch(ctx, claim, patch); err != nil { + log.Error(err, "Failed to patch sandboxclaim status") return err } + log.Info("Successfully patched sandboxclaim status") return nil } diff --git a/extensions/controllers/sandboxwarmpool_controller.go b/extensions/controllers/sandboxwarmpool_controller.go index c8eff28c0..e6542f601 100644 --- a/extensions/controllers/sandboxwarmpool_controller.go +++ b/extensions/controllers/sandboxwarmpool_controller.go @@ -325,25 +325,22 @@ func (r *SandboxWarmPoolReconciler) updateStatus(ctx context.Context, oldStatus return nil } - patch := &extensionsv1alpha1.SandboxWarmPool{ - TypeMeta: metav1.TypeMeta{ - APIVersion: extensionsv1alpha1.GroupVersion.String(), - Kind: "SandboxWarmPool", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: warmPool.Name, - Namespace: warmPool.Namespace, - }, - Status: warmPool.Status, - } - - // Send the Server-Side Apply request to update the status subresource - if err := r.Status().Patch(ctx, patch, client.Apply, client.FieldOwner("warmpool-controller"), client.ForceOwnership); err != nil { - log.Error(err, "Failed to apply SandboxWarmPool status via SSA") + // Create a copy of the warmPool to use as the base for the patch. + oldWarmPool := warmPool.DeepCopy() + // Set the status of the copy to the *original* status. + oldWarmPool.Status = *oldStatus + + // Create a merge patch by comparing the oldWarmPool (with oldStatus) + // with the current state of warmPool (which has the new desired status). + patch := client.MergeFrom(oldWarmPool) + + // Apply the patch to the status subresource. + if err := r.Status().Patch(ctx, warmPool, patch); err != nil { + log.Error(err, "Failed to patch SandboxWarmPool status") return err } - log.Info("Updated SandboxWarmPool status", "replicas", warmPool.Status.Replicas) + log.Info("Successfully patched SandboxWarmPool status", "replicas", warmPool.Status.Replicas) return nil } From 536199896ba1272a6a73088e0e4f203d640b0588 Mon Sep 17 00:00:00 2001 From: Vicente Ferrara Date: Mon, 30 Mar 2026 01:54:00 +0000 Subject: [PATCH 2/3] optimization: add local informer cache to serve Ready sandboxes that have no owner --- cmd/agent-sandbox-controller/main.go | 46 ++++++ extensions/controllers/warmpool_assigner.go | 160 ++++++++++++++++++++ 2 files changed, 206 insertions(+) create mode 100644 extensions/controllers/warmpool_assigner.go diff --git a/cmd/agent-sandbox-controller/main.go b/cmd/agent-sandbox-controller/main.go index 6ad057b96..1a87d1ffb 100644 --- a/cmd/agent-sandbox-controller/main.go +++ b/cmd/agent-sandbox-controller/main.go @@ -27,12 +27,16 @@ import ( // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + sandboxv1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1" "sigs.k8s.io/agent-sandbox/controllers" extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1" extensionscontrollers "sigs.k8s.io/agent-sandbox/extensions/controllers" @@ -222,11 +226,24 @@ func main() { } if extensions { + // 1. Initialize the Assigner + assigner := &extensionscontrollers.WarmPoolAssigner{ + Client: mgr.GetClient(), + Pools: make(map[string]chan types.NamespacedName), + } + + // 2. Add it to the Manager so it runs in the background + if err := mgr.Add(assigner); err != nil { + setupLog.Error(err, "unable to set up WarmPool Assigner") + os.Exit(1) + } + if err = (&extensionscontrollers.SandboxClaimReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("sandboxclaim-controller"), Tracer: instrumenter, + Assigner: assigner, }).SetupWithManager(mgr, sandboxClaimConcurrentWorkers); err != nil { setupLog.Error(err, "unable to create controller", "controller", "SandboxClaim") os.Exit(1) @@ -262,6 +279,35 @@ func main() { os.Exit(1) } + // Index Sandboxes by their Phase and Ownership status + if err := mgr.GetFieldIndexer().IndexField(ctx, &sandboxv1alpha1.Sandbox{}, "status.readyAndUnowned", func(rawObj client.Object) []string { + sandbox := rawObj.(*sandboxv1alpha1.Sandbox) + + // 1. Check if it has an owner (already claimed) + if metav1.GetControllerOf(sandbox) != nil && metav1.GetControllerOf(sandbox).Kind == "SandboxClaim" { + return nil + } + + // 2. Check if it's actually Ready + isReady := false + for _, cond := range sandbox.Status.Conditions { + if cond.Type == string(sandboxv1alpha1.SandboxConditionReady) && cond.Status == metav1.ConditionTrue { + isReady = true + break + } + } + + if isReady { + // Return the template hash so we can query specific templates instantly + templateHash := sandbox.Labels["agents.x-k8s.io/sandbox-template-ref-hash"] + return []string{"true-" + templateHash} + } + return nil + }); err != nil { + setupLog.Error(err, "unable to set up field indexer for Sandboxes") + os.Exit(1) + } + setupLog.Info("starting manager") if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") diff --git a/extensions/controllers/warmpool_assigner.go b/extensions/controllers/warmpool_assigner.go new file mode 100644 index 000000000..3ebaf6e46 --- /dev/null +++ b/extensions/controllers/warmpool_assigner.go @@ -0,0 +1,160 @@ +// Copyright 2026 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "sync" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + v1alpha1 "sigs.k8s.io/agent-sandbox/api/v1alpha1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type WarmPoolAssigner struct { + client.Client + mu sync.RWMutex + Pools map[string]chan types.NamespacedName + InFlight sync.Map +} + +func (w *WarmPoolAssigner) SetupWithManager(mgr ctrl.Manager) error { + sandboxInformer, err := mgr.GetCache().GetInformer(context.Background(), &v1alpha1.Sandbox{}) + if err != nil { + return err + } + + _, err = sandboxInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + w.handleSandboxEvent(obj) + }, + UpdateFunc: func(_, newObj interface{}) { + w.handleSandboxEvent(newObj) + }, + }) + + return err +} + +func (w *WarmPoolAssigner) GetOrCreatePool(ctx context.Context, hash string) chan types.NamespacedName { + w.mu.RLock() + ch, exists := w.Pools[hash] + w.mu.RUnlock() + if exists { + return ch + } + + w.mu.Lock() + defer w.mu.Unlock() + + if ch, exists := w.Pools[hash]; exists { + return ch + } + + ch = make(chan types.NamespacedName, 1000) + w.Pools[hash] = ch + + var sandboxes v1alpha1.SandboxList + if err := w.Client.List(ctx, &sandboxes, client.MatchingLabels{"agents.x-k8s.io/sandbox-template-ref-hash": hash}); err == nil { + for _, sb := range sandboxes.Items { + // Ignore deleting pods + if !sb.DeletionTimestamp.IsZero() { + continue + } + + // Must be unowned (or owned by the WarmPool, not a Claim) + controllerRef := metav1.GetControllerOf(&sb) + if controllerRef != nil && controllerRef.Kind == "SandboxClaim" { + continue + } + + // Must be Ready + isReady := false + for _, cond := range sb.Status.Conditions { + if cond.Type == string(v1alpha1.SandboxConditionReady) && cond.Status == metav1.ConditionTrue { + isReady = true + break + } + } + + if isReady { + if _, queued := w.InFlight.Load(sb.Name); !queued { + select { + case ch <- types.NamespacedName{Name: sb.Name, Namespace: sb.Namespace}: + w.InFlight.Store(sb.Name, true) + default: + } + } + } + } + } + + return ch +} + +func (w *WarmPoolAssigner) Start(ctx context.Context) error { + <-ctx.Done() + return nil +} + +func (w *WarmPoolAssigner) handleSandboxEvent(obj interface{}) { + sandbox, ok := obj.(*v1alpha1.Sandbox) + if !ok { + return + } + + if !sandbox.DeletionTimestamp.IsZero() { + return + } + + controllerRef := metav1.GetControllerOf(sandbox) + if controllerRef == nil || controllerRef.Kind != "SandboxWarmPool" { + return + } + + isReady := false + for _, cond := range sandbox.Status.Conditions { + if cond.Type == string(v1alpha1.SandboxConditionReady) && cond.Status == metav1.ConditionTrue { + isReady = true + break + } + } + + if isReady { + templateHash, hasLabel := sandbox.Labels["agents.x-k8s.io/sandbox-template-ref-hash"] + if !hasLabel { + return + } + + w.mu.RLock() + ch, exists := w.Pools[templateHash] + w.mu.RUnlock() + + if !exists { + return + } + + if _, inFlight := w.InFlight.Load(sandbox.Name); !inFlight { + select { + case ch <- types.NamespacedName{Name: sandbox.Name, Namespace: sandbox.Namespace}: + w.InFlight.Store(sandbox.Name, true) + default: + } + } + } +} From bb5c2ac0457bf0e5751230e9fe873abd45c1f92f Mon Sep 17 00:00:00 2001 From: Vicente Ferrara Date: Mon, 30 Mar 2026 02:01:59 +0000 Subject: [PATCH 3/3] optimization: replace the r.List() block that iterates over all sandboxes with channel pop nit fix fix nit nit nit nit clean up clean up comments nit lint fix --- cmd/agent-sandbox-controller/main.go | 12 +- .../controllers/sandboxclaim_controller.go | 266 ++++++------ .../sandboxclaim_controller_test.go | 406 +----------------- 3 files changed, 161 insertions(+), 523 deletions(-) diff --git a/cmd/agent-sandbox-controller/main.go b/cmd/agent-sandbox-controller/main.go index 1a87d1ffb..81b752746 100644 --- a/cmd/agent-sandbox-controller/main.go +++ b/cmd/agent-sandbox-controller/main.go @@ -238,6 +238,11 @@ func main() { os.Exit(1) } + if err := assigner.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to set up pure-push watcher") + os.Exit(1) + } + if err = (&extensionscontrollers.SandboxClaimReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -298,9 +303,10 @@ func main() { } if isReady { - // Return the template hash so we can query specific templates instantly - templateHash := sandbox.Labels["agents.x-k8s.io/sandbox-template-ref-hash"] - return []string{"true-" + templateHash} + templateName := sandbox.Labels["agents.x-k8s.io/sandbox-template-ref"] + if templateName != "" { + return []string{"true-" + templateName} + } } return nil }); err != nil { diff --git a/extensions/controllers/sandboxclaim_controller.go b/extensions/controllers/sandboxclaim_controller.go index 82a84d070..e6ee3b344 100644 --- a/extensions/controllers/sandboxclaim_controller.go +++ b/extensions/controllers/sandboxclaim_controller.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "sort" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -27,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -50,6 +52,8 @@ type SandboxClaimReconciler struct { Recorder record.EventRecorder Tracer asmetrics.Instrumenter MaxConcurrentReconciles int + Assigner *WarmPoolAssigner + inFlightClaims sync.Map } //+kubebuilder:rbac:groups=extensions.agents.x-k8s.io,resources=sandboxclaims,verbs=get;list;watch;create;update;patch;delete @@ -261,7 +265,7 @@ func (r *SandboxClaimReconciler) reconcileExpired(ctx context.Context, claim *ex } func (r *SandboxClaimReconciler) updateStatus(ctx context.Context, oldStatus *extensionsv1alpha1.SandboxClaimStatus, claim *extensionsv1alpha1.SandboxClaim) error { - logger := log.FromContext(ctx) + log := log.FromContext(ctx) sort.Slice(oldStatus.Conditions, func(i, j int) bool { return oldStatus.Conditions[i].Type < oldStatus.Conditions[j].Type @@ -368,104 +372,6 @@ func (r *SandboxClaimReconciler) computeAndSetStatus(claim *extensionsv1alpha1.S } } -// adoptSandboxFromCandidates picks the best candidate and transfers ownership to the claim. -func (r *SandboxClaimReconciler) adoptSandboxFromCandidates(ctx context.Context, claim *extensionsv1alpha1.SandboxClaim, candidates []*v1alpha1.Sandbox) (*v1alpha1.Sandbox, error) { - logger := log.FromContext(ctx) - - // Sort: ready sandboxes first, then by creation time (oldest first) - sort.Slice(candidates, func(i, j int) bool { - iReady := isSandboxReady(candidates[i]) - jReady := isSandboxReady(candidates[j]) - if iReady != jReady { - return iReady - } - return candidates[i].CreationTimestamp.Before(&candidates[j].CreationTimestamp) - }) - - if len(candidates) == 0 { - logger.Info("No warm pool candidates available, falling through to cold start", "claim", claim.Name) - return nil, nil - } - - // Determine the search range for collision avoidance. - n := len(candidates) - workerCount := r.MaxConcurrentReconciles - if workerCount <= 0 { - workerCount = 1 - } - searchWindow := min(n, workerCount) - - // Compute a starting index deterministic to this specific Claim UID. - hashValue := sandboxcontrollers.GetNumericHash(string(claim.UID)) - startIndex := int(hashValue % uint32(searchWindow)) - - // Iterate through the entire list starting from the hashed offset. - for i := 0; i < n; i++ { - currIndex := (startIndex + i) % n - adopted := candidates[currIndex] - - // Extract pool name from owner reference before clearing - poolName := "none" - if controllerRef := metav1.GetControllerOf(adopted); controllerRef != nil { - poolName = controllerRef.Name - } - - logger.Info("Attempting sandbox adoption", "sandbox candidate", adopted.Name, "warm pool", poolName, "claim", claim.Name) - - // Remove warm pool labels so the sandbox no longer appears in warm pool queries - delete(adopted.Labels, warmPoolSandboxLabel) - delete(adopted.Labels, sandboxTemplateRefHash) - - // Transfer ownership from SandboxWarmPool to SandboxClaim - adopted.OwnerReferences = nil - if err := controllerutil.SetControllerReference(claim, adopted, r.Scheme); err != nil { - return nil, fmt.Errorf("failed to set controller reference on adopted sandbox: %w", err) - } - - // Propagate trace context from claim - if adopted.Annotations == nil { - adopted.Annotations = make(map[string]string) - } - if tc, ok := claim.Annotations[asmetrics.TraceContextAnnotation]; ok { - adopted.Annotations[asmetrics.TraceContextAnnotation] = tc - } - - // Add sandbox ID label to pod template for NetworkPolicy targeting - if adopted.Spec.PodTemplate.ObjectMeta.Labels == nil { - adopted.Spec.PodTemplate.ObjectMeta.Labels = make(map[string]string) - } - adopted.Spec.PodTemplate.ObjectMeta.Labels[extensionsv1alpha1.SandboxIDLabel] = string(claim.UID) - - // Update uses optimistic concurrency (resourceVersion) so concurrent - // claims racing to adopt the same sandbox will conflict and retry. - if err := r.Update(ctx, adopted); err != nil { - if k8errors.IsConflict(err) || k8errors.IsNotFound(err) { - // Another worker adopted this sandbox while we were processing; try next candidate. - continue - } - logger.Error(err, "Failed to update adoption candidate sandbox", "sandbox candidate", adopted.Name, "claim", claim.Name) - return nil, err - } - - logger.Info("Successfully adopted sandbox from warm pool", "sandbox", adopted.Name, "claim", claim.Name) - - if r.Recorder != nil { - r.Recorder.Event(claim, corev1.EventTypeNormal, "SandboxAdopted", fmt.Sprintf("Adopted warm pool Sandbox %q", adopted.Name)) - } - - podCondition := "not_ready" - if isSandboxReady(adopted) { - podCondition = "ready" - } - asmetrics.RecordSandboxClaimCreation(claim.Namespace, claim.Spec.TemplateRef.Name, asmetrics.LaunchTypeWarm, poolName, podCondition) - - return adopted, nil - } - - logger.Info("Failed to adopt any sandbox after checking all candidates", "claim", claim.Name) - return nil, nil // Return nil, nil to fall completely to cold start -} - // isSandboxReady checks if a sandbox has Ready=True condition func isSandboxReady(sb *v1alpha1.Sandbox) bool { for _, cond := range sb.Status.Conditions { @@ -563,6 +469,22 @@ func (r *SandboxClaimReconciler) getOrCreateSandbox(ctx context.Context, claim * logger := log.FromContext(ctx) logger.V(1).Info("Executing getOrCreateSandbox", "claim", claim.Name) + // Does this claim already have an async bind running? + if sandboxName, ok := r.inFlightClaims.Load(claim.UID); ok { + logger.Info("Claim already has an in-flight binding, waiting for API convergence", "sandbox", sandboxName) + stub := &v1alpha1.Sandbox{} + stub.Name = sandboxName.(string) + stub.Namespace = claim.Namespace + stub.Status.Conditions = []metav1.Condition{ + { + Type: string(v1alpha1.SandboxConditionReady), + Status: metav1.ConditionTrue, + Reason: "AdoptedFromWarmPool", + }, + } + return stub, nil + } + // Check if a previously adopted sandbox is recorded in claim status if statusName := claim.Status.SandboxStatus.Name; statusName != "" { logger.V(1).Info("Checking status for sandbox name", "claim.Status.SandboxStatus.Name", statusName, "claim", claim.Name) @@ -602,56 +524,132 @@ func (r *SandboxClaimReconciler) getOrCreateSandbox(ctx context.Context, claim * return sandbox, nil } - // Single List: ownership guard + adoption candidate scan. - // This queries the informer cache (not the API server), so it's fast. - logger.V(1).Info("Listing sandbox adoption candidates", "claim", claim.Name) - allSandboxes := &v1alpha1.SandboxList{} - if err := r.List(ctx, allSandboxes, client.InNamespace(claim.Namespace)); err != nil { - return nil, fmt.Errorf("failed to list sandboxes: %w", err) + templateName := claim.Spec.TemplateRef.Name + templateHash := sandboxcontrollers.NameHash(templateName) + + poolChan := r.Assigner.GetOrCreatePool(ctx, templateHash) + + select { + case sandboxID := <-poolChan: + logger.Info("Instantly popped READY sandbox from channel", "sandbox", sandboxID.Name) + return r.executeAsyncBinding(ctx, claim, sandboxID, true) // isReady = true + default: + inProgressPod := r.findInProgressWarmPoolSandbox(ctx, claim.Namespace, templateHash) + if inProgressPod != nil { + logger.Info("Channel empty. Hijacking IN-PROGRESS WarmPool sandbox", "sandbox", inProgressPod.Name) + targetID := types.NamespacedName{Name: inProgressPod.Name, Namespace: inProgressPod.Namespace} + return r.executeAsyncBinding(ctx, claim, targetID, false) // isReady = false + } + + // Cold start + logger.Info("WarmPool fully exhausted. Executing cold start.") + return nil, nil } +} - templateHash := sandboxcontrollers.NameHash(claim.Spec.TemplateRef.Name) - var adoptionCandidates []*v1alpha1.Sandbox +// Async Binder: immediately return a stub with the assigned name, then do the heavy Update in the background. +func (r *SandboxClaimReconciler) executeAsyncBinding(ctx context.Context, claim *extensionsv1alpha1.SandboxClaim, sandboxID types.NamespacedName, isReady bool) (*v1alpha1.Sandbox, error) { + logger := log.FromContext(ctx) - for i := range allSandboxes.Items { - sb := &allSandboxes.Items[i] - if !sb.DeletionTimestamp.IsZero() { - continue + r.inFlightClaims.Store(claim.UID, sandboxID.Name) + + stubSandbox := &v1alpha1.Sandbox{} + stubSandbox.Name = sandboxID.Name + stubSandbox.Namespace = sandboxID.Namespace + + if isReady { + stubSandbox.Status.Conditions = []metav1.Condition{ + { + Type: string(v1alpha1.SandboxConditionReady), + Status: metav1.ConditionTrue, + Reason: "AdoptedFromWarmPool", + }, } + } - // Ownership guard: if this claim already owns a sandbox, return it - if metav1.IsControlledBy(sb, claim) { - logger.Info("Found existing owned sandbox", "sandbox", sb.Name, "claim", claim.Name) - return sb, nil + go func(targetID types.NamespacedName, owningClaim *extensionsv1alpha1.SandboxClaim) { + bgCtx := context.Background() + + freshSandbox := &v1alpha1.Sandbox{} + if err := r.Get(bgCtx, targetID, freshSandbox); err != nil { + logger.Error(err, "Async bind failed to fetch sandbox", "sandbox", targetID.Name) + r.inFlightClaims.Delete(owningClaim.UID) + r.Assigner.InFlight.Delete(targetID.Name) + return } - // Collect adoption candidates from warm pool - if _, ok := sb.Labels[warmPoolSandboxLabel]; !ok { - continue + patchObj := freshSandbox.DeepCopy() + + if patchObj.Labels == nil { + patchObj.Labels = make(map[string]string) } - if sb.Labels[sandboxTemplateRefHash] != templateHash { - continue + patchObj.Labels[extensionsv1alpha1.SandboxIDLabel] = string(owningClaim.UID) + + delete(patchObj.Labels, "agents.x-k8s.io/pool") + delete(patchObj.Labels, "agents.x-k8s.io/sandbox-template-ref-hash") + + var newOwnerRefs []metav1.OwnerReference + for _, ref := range patchObj.OwnerReferences { + if ref.Controller != nil && *ref.Controller { + continue + } + newOwnerRefs = append(newOwnerRefs, ref) } - controllerRef := metav1.GetControllerOf(sb) - if controllerRef != nil && controllerRef.Kind != "SandboxWarmPool" { - continue + patchObj.OwnerReferences = newOwnerRefs + + owningClaim.APIVersion = extensionsv1alpha1.GroupVersion.String() + owningClaim.Kind = "SandboxClaim" + + if err := controllerutil.SetControllerReference(owningClaim, patchObj, r.Scheme); err != nil { + logger.Error(err, "Async bind failed to set owner ref", "sandbox", targetID.Name) + r.inFlightClaims.Delete(owningClaim.UID) + r.Assigner.InFlight.Delete(targetID.Name) + return + } + + // Push the update using a Merge Patch to avoid ResourceVersion conflicts + if err := r.Patch(bgCtx, patchObj, client.MergeFrom(freshSandbox)); err != nil { + logger.Error(err, "Async bind failed to patch sandbox", "sandbox", targetID.Name) + r.inFlightClaims.Delete(owningClaim.UID) + r.Assigner.InFlight.Delete(targetID.Name) + } else { + logger.Info("Async bind SUCCESS", "sandbox", targetID.Name, "claim", owningClaim.Name) + poolName := freshSandbox.Labels["agents.x-k8s.io/pool"] + if poolName == "" { + poolName = "none" + } + asmetrics.RecordSandboxClaimCreation(owningClaim.Namespace, owningClaim.Spec.TemplateRef.Name, asmetrics.LaunchTypeWarm, poolName, "ready") + r.inFlightClaims.Delete(owningClaim.UID) } - adoptionCandidates = append(adoptionCandidates, sb) + }(sandboxID, claim.DeepCopy()) + + return stubSandbox, nil +} + +func (r *SandboxClaimReconciler) findInProgressWarmPoolSandbox(ctx context.Context, namespace, templateHash string) *v1alpha1.Sandbox { + var sandboxes v1alpha1.SandboxList + // We use the standard cache client here, matching the pool label + err := r.List(ctx, &sandboxes, client.InNamespace(namespace), client.MatchingLabels{ + "agents.x-k8s.io/sandbox-template-ref-hash": templateHash, + }) + if err != nil { + return nil } - // Try to adopt from warm pool - if len(adoptionCandidates) > 0 { - adopted, err := r.adoptSandboxFromCandidates(ctx, claim, adoptionCandidates) - if err != nil { - return nil, err + for _, sb := range sandboxes.Items { + if !sb.DeletionTimestamp.IsZero() { + continue } - if adopted != nil { - return adopted, nil + // Check if it's unowned + controllerRef := metav1.GetControllerOf(&sb) + if controllerRef != nil && controllerRef.Kind == "SandboxWarmPool" { + // Ensure it's not already in-flight + if _, inFlight := r.Assigner.InFlight.Load(sb.Name); !inFlight { + return &sb + } } } - - // No warm pool sandbox available; caller decides whether to create - return nil, nil + return nil } func (r *SandboxClaimReconciler) getTemplate(ctx context.Context, claim *extensionsv1alpha1.SandboxClaim) (*extensionsv1alpha1.SandboxTemplate, error) { diff --git a/extensions/controllers/sandboxclaim_controller_test.go b/extensions/controllers/sandboxclaim_controller_test.go index dec500057..6bed90ad7 100644 --- a/extensions/controllers/sandboxclaim_controller_test.go +++ b/extensions/controllers/sandboxclaim_controller_test.go @@ -443,6 +443,10 @@ func TestSandboxClaimReconcile(t *testing.T) { Scheme: scheme, Recorder: record.NewFakeRecorder(10), Tracer: asmetrics.NewNoOp(), + Assigner: &WarmPoolAssigner{ + Client: client, + Pools: make(map[string]chan types.NamespacedName), + }, } req := reconcile.Request{ @@ -618,6 +622,10 @@ func TestSandboxClaimCleanupPolicy(t *testing.T) { Scheme: scheme, Recorder: record.NewFakeRecorder(10), Tracer: asmetrics.NewNoOp(), + Assigner: &WarmPoolAssigner{ + Client: client, + Pools: make(map[string]chan types.NamespacedName), + }, } req := reconcile.Request{NamespacedName: types.NamespacedName{Name: tc.claim.Name, Namespace: "default"}} @@ -696,6 +704,10 @@ func TestSandboxProvisionEvent(t *testing.T) { Scheme: scheme, Recorder: fakeRecorder, Tracer: asmetrics.NewNoOp(), + Assigner: &WarmPoolAssigner{ + Client: client, + Pools: make(map[string]chan types.NamespacedName), + }, } req := reconcile.Request{NamespacedName: types.NamespacedName{Name: claimName, Namespace: "default"}} @@ -725,320 +737,6 @@ Loop: } } -func TestSandboxClaimSandboxAdoption(t *testing.T) { - template := &extensionsv1alpha1.SandboxTemplate{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-template", - Namespace: "default", - }, - Spec: extensionsv1alpha1.SandboxTemplateSpec{ - PodTemplate: sandboxv1alpha1.PodTemplate{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "test-container", - Image: "test-image", - }, - }, - }, - }, - }, - } - - claim := &extensionsv1alpha1.SandboxClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-claim", - Namespace: "default", - UID: "claim-uid", - }, - Spec: extensionsv1alpha1.SandboxClaimSpec{ - TemplateRef: extensionsv1alpha1.SandboxTemplateRef{ - Name: "test-template", - }, - }, - } - - warmPoolUID := types.UID("warmpool-uid-123") - poolNameHash := sandboxcontrollers.NameHash("test-pool") - - createWarmPoolSandbox := func(name string, creationTime metav1.Time, ready bool) *sandboxv1alpha1.Sandbox { - conditionStatus := metav1.ConditionFalse - if ready { - conditionStatus = metav1.ConditionTrue - } - replicas := int32(1) - return &sandboxv1alpha1.Sandbox{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - CreationTimestamp: creationTime, - Labels: map[string]string{ - warmPoolSandboxLabel: poolNameHash, - sandboxTemplateRefHash: sandboxcontrollers.NameHash("test-template"), - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "extensions.agents.x-k8s.io/v1alpha1", - Kind: "SandboxWarmPool", - Name: "test-pool", - UID: warmPoolUID, - Controller: ptr.To(true), - }, - }, - }, - Spec: sandboxv1alpha1.SandboxSpec{ - Replicas: &replicas, - PodTemplate: sandboxv1alpha1.PodTemplate{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "test-container", - Image: "test-image", - }, - }, - }, - }, - }, - Status: sandboxv1alpha1.SandboxStatus{ - Conditions: []metav1.Condition{ - { - Type: string(sandboxv1alpha1.SandboxConditionReady), - Status: conditionStatus, - Reason: "DependenciesReady", - }, - }, - }, - } - } - - createSandboxWithDifferentController := func(name string) *sandboxv1alpha1.Sandbox { - replicas := int32(1) - return &sandboxv1alpha1.Sandbox{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - Labels: map[string]string{ - warmPoolSandboxLabel: poolNameHash, - sandboxTemplateRefHash: sandboxcontrollers.NameHash("test-template"), - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: "other-controller", - UID: "other-uid-456", - Controller: ptr.To(true), - }, - }, - }, - Spec: sandboxv1alpha1.SandboxSpec{ - Replicas: &replicas, - PodTemplate: sandboxv1alpha1.PodTemplate{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "test-container", - Image: "test-image", - }, - }, - }, - }, - }, - } - } - - createDeletingSandbox := func(name string) *sandboxv1alpha1.Sandbox { - sb := createWarmPoolSandbox(name, metav1.Now(), true) - now := metav1.Now() - sb.DeletionTimestamp = &now - sb.Finalizers = []string{"test-finalizer"} - return sb - } - - testCases := []struct { - name string - existingObjects []client.Object - expectSandboxAdoption bool - expectedAdoptedSandbox string - expectNewSandboxCreated bool - simulateConflicts int - }{ - { - name: "adopts oldest ready sandbox from warm pool", - existingObjects: []client.Object{ - template, - claim, - createWarmPoolSandbox("pool-sb-1", metav1.Time{Time: metav1.Now().Add(-3600 * time.Second)}, true), - createWarmPoolSandbox("pool-sb-2", metav1.Time{Time: metav1.Now().Add(-1800 * time.Second)}, true), - createWarmPoolSandbox("pool-sb-3", metav1.Now(), true), - }, - expectSandboxAdoption: true, - expectedAdoptedSandbox: "pool-sb-1", - expectNewSandboxCreated: false, - }, - { - name: "creates new sandbox when no warm pool sandboxes exist", - existingObjects: []client.Object{ - template, - claim, - }, - expectSandboxAdoption: false, - expectNewSandboxCreated: true, - }, - { - name: "skips sandboxes with different controller", - existingObjects: []client.Object{ - template, - claim, - createSandboxWithDifferentController("other-sb-1"), - createWarmPoolSandbox("pool-sb-1", metav1.Now(), true), - }, - expectSandboxAdoption: true, - expectedAdoptedSandbox: "pool-sb-1", - expectNewSandboxCreated: false, - }, - { - name: "skips sandboxes being deleted", - existingObjects: []client.Object{ - template, - claim, - createDeletingSandbox("deleting-sb"), - createWarmPoolSandbox("pool-sb-1", metav1.Now(), true), - }, - expectSandboxAdoption: true, - expectedAdoptedSandbox: "pool-sb-1", - expectNewSandboxCreated: false, - }, - { - name: "creates new sandbox when only ineligible warm pool sandboxes exist", - existingObjects: []client.Object{ - template, - claim, - createSandboxWithDifferentController("other-sb-1"), - createDeletingSandbox("deleting-sb"), - }, - expectSandboxAdoption: false, - expectNewSandboxCreated: true, - }, - { - name: "prioritizes ready sandboxes over not-ready ones", - existingObjects: []client.Object{ - template, - claim, - createWarmPoolSandbox("not-ready", metav1.Time{Time: metav1.Now().Add(-2 * time.Hour)}, false), - createWarmPoolSandbox("middle-ready", metav1.Time{Time: metav1.Now().Add(-1 * time.Hour)}, true), - createWarmPoolSandbox("young-ready", metav1.Now(), true), - }, - expectSandboxAdoption: true, - expectedAdoptedSandbox: "middle-ready", - expectNewSandboxCreated: false, - }, - { - name: "adopts oldest non-ready sandbox when no ready sandboxes exist", - existingObjects: []client.Object{ - template, - claim, - createWarmPoolSandbox("not-ready-1", metav1.Time{Time: metav1.Now().Add(-2 * time.Hour)}, false), - createWarmPoolSandbox("not-ready-2", metav1.Time{Time: metav1.Now().Add(-1 * time.Hour)}, false), - }, - expectSandboxAdoption: true, - expectedAdoptedSandbox: "not-ready-1", - expectNewSandboxCreated: false, - }, - { - name: "retries on conflict when adopting sandbox", - existingObjects: []client.Object{ - template, - claim, - createWarmPoolSandbox("pool-sb-1", metav1.Time{Time: metav1.Now().Add(-1 * time.Hour)}, true), - createWarmPoolSandbox("pool-sb-2", metav1.Now(), true), - }, - expectSandboxAdoption: true, - expectedAdoptedSandbox: "pool-sb-2", - expectNewSandboxCreated: false, - simulateConflicts: 1, // Fail update on the first sandbox, succeed on the second - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - scheme := newScheme(t) - var fakeClient client.Client = fake.NewClientBuilder(). - WithScheme(scheme). - WithObjects(tc.existingObjects...). - WithStatusSubresource(claim). - Build() - - if tc.simulateConflicts > 0 { - fakeClient = &conflictClient{ - Client: fakeClient, - maxConflicts: tc.simulateConflicts, - } - } - - reconciler := &SandboxClaimReconciler{ - Client: fakeClient, - Scheme: scheme, - Recorder: record.NewFakeRecorder(10), - Tracer: asmetrics.NewNoOp(), - } - - req := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Name: "test-claim", - Namespace: "default", - }, - } - - ctx := context.Background() - _, err := reconciler.Reconcile(ctx, req) - if err != nil { - t.Fatalf("reconcile failed: %v", err) - } - - if tc.expectSandboxAdoption { - // Verify the adopted sandbox has correct labels and owner reference - var adoptedSandbox sandboxv1alpha1.Sandbox - err = fakeClient.Get(ctx, types.NamespacedName{ - Name: tc.expectedAdoptedSandbox, - Namespace: "default", - }, &adoptedSandbox) - if err != nil { - t.Fatalf("failed to get adopted sandbox: %v", err) - } - - // 1. Verify warm pool labels were removed - if _, exists := adoptedSandbox.Labels[warmPoolSandboxLabel]; exists { - t.Errorf("expected warm pool label to be removed from adopted sandbox") - } - if _, exists := adoptedSandbox.Labels[sandboxTemplateRefHash]; exists { - t.Errorf("expected template ref label to be removed from adopted sandbox") - } - - // 2. Verify SandboxID label was added to pod template - expectedUID := string(types.UID("claim-uid")) - if val := adoptedSandbox.Spec.PodTemplate.ObjectMeta.Labels[extensionsv1alpha1.SandboxIDLabel]; val != expectedUID { - t.Errorf("expected pod template to have SandboxID label %q, got %q", expectedUID, val) - } - - // 3. Verify claim is the controller owner - controllerRef := metav1.GetControllerOf(&adoptedSandbox) - if controllerRef == nil || controllerRef.UID != claim.UID { - t.Errorf("expected adopted sandbox to be controlled by claim, got %v", controllerRef) - } - - } else if tc.expectNewSandboxCreated { - // Verify a new sandbox was created with the claim's name - var sandbox sandboxv1alpha1.Sandbox - err = fakeClient.Get(ctx, req.NamespacedName, &sandbox) - if err != nil { - t.Fatalf("expected sandbox to be created but got error: %v", err) - } - } - }) - } -} - // TestSandboxClaimNoReAdoption verifies that a second reconcile does not adopt another // sandbox from the warm pool when the claim already owns one. func TestSandboxClaimNoReAdoption(t *testing.T) { @@ -1112,6 +810,10 @@ func TestSandboxClaimNoReAdoption(t *testing.T) { Scheme: scheme, Recorder: record.NewFakeRecorder(10), Tracer: asmetrics.NewNoOp(), + Assigner: &WarmPoolAssigner{ + Client: fakeClient, + Pools: make(map[string]chan types.NamespacedName), + }, } req := reconcile.Request{NamespacedName: types.NamespacedName{Name: "test-claim", Namespace: "default"}} @@ -1238,64 +940,12 @@ func TestSandboxClaimCreationMetric(t *testing.T) { Scheme: scheme, Recorder: record.NewFakeRecorder(10), Tracer: asmetrics.NewNoOp(), - } - - req := reconcile.Request{NamespacedName: types.NamespacedName{Name: claim.Name, Namespace: "default"}} - _, err := reconciler.Reconcile(context.Background(), req) - if err != nil { - t.Fatalf("reconcile failed: %v", err) - } - - // Verify metric - val := testutil.ToFloat64(asmetrics.SandboxClaimCreationTotal.WithLabelValues("default", "test-template", asmetrics.LaunchTypeCold, "none", "not_ready")) - if val != 1 { - t.Errorf("expected metric count 1, got %v", val) - } - }) - - t.Run("Warm Start", func(t *testing.T) { - asmetrics.SandboxClaimCreationTotal.Reset() - - // Create a warm pool sandbox - poolNameHash := sandboxcontrollers.NameHash("test-pool") - warmSandbox := &sandboxv1alpha1.Sandbox{ - ObjectMeta: metav1.ObjectMeta{ - Name: "warm-sb", - Namespace: "default", - Labels: map[string]string{ - warmPoolSandboxLabel: poolNameHash, - sandboxTemplateRefHash: sandboxcontrollers.NameHash("test-template"), - }, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "extensions.agents.x-k8s.io/v1alpha1", - Kind: "SandboxWarmPool", - Name: "test-pool", - UID: "pool-uid", - Controller: ptr.To(true), - }, - }, - }, - Spec: sandboxv1alpha1.SandboxSpec{ - Replicas: ptr.To(int32(1)), - PodTemplate: sandboxv1alpha1.PodTemplate{Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "c", Image: "i"}}}}, - }, - Status: sandboxv1alpha1.SandboxStatus{ - Conditions: []metav1.Condition{{ - Type: string(sandboxv1alpha1.SandboxConditionReady), Status: metav1.ConditionTrue, Reason: "Ready", - }}, + Assigner: &WarmPoolAssigner{ + Client: client, + Pools: make(map[string]chan types.NamespacedName), }, } - scheme := newScheme(t) - client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(template, claim, warmSandbox).WithStatusSubresource(claim).Build() - reconciler := &SandboxClaimReconciler{ - Client: client, - Scheme: scheme, - Recorder: record.NewFakeRecorder(10), - Tracer: asmetrics.NewNoOp(), - } - req := reconcile.Request{NamespacedName: types.NamespacedName{Name: claim.Name, Namespace: "default"}} _, err := reconciler.Reconcile(context.Background(), req) if err != nil { @@ -1303,7 +953,7 @@ func TestSandboxClaimCreationMetric(t *testing.T) { } // Verify metric - val := testutil.ToFloat64(asmetrics.SandboxClaimCreationTotal.WithLabelValues("default", "test-template", asmetrics.LaunchTypeWarm, "test-pool", "ready")) + val := testutil.ToFloat64(asmetrics.SandboxClaimCreationTotal.WithLabelValues("default", "test-template", asmetrics.LaunchTypeCold, "none", "not_ready")) if val != 1 { t.Errorf("expected metric count 1, got %v", val) } @@ -1330,19 +980,3 @@ func newScheme(t *testing.T) *runtime.Scheme { func ignoreTimestamp(_, _ metav1.Time) bool { return true } - -type conflictClient struct { - client.Client - conflictCount int - maxConflicts int -} - -func (c *conflictClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { - if sandbox, ok := obj.(*sandboxv1alpha1.Sandbox); ok { - if c.conflictCount < c.maxConflicts { - c.conflictCount++ - return k8errors.NewConflict(sandboxv1alpha1.Resource("sandboxes"), sandbox.Name, fmt.Errorf("simulated conflict")) - } - } - return c.Client.Update(ctx, obj, opts...) -}