diff --git a/apis/kueue/v1beta1/workload_types.go b/apis/kueue/v1beta1/workload_types.go index 79db6f41cc1..76adcd8edf4 100644 --- a/apis/kueue/v1beta1/workload_types.go +++ b/apis/kueue/v1beta1/workload_types.go @@ -424,7 +424,12 @@ type WorkloadStatus struct { // +optional NominatedClusterNames []string `json:"nominatedClusterNames,omitempty"` - // clusterName is the name of the cluster where the workload is actually assigned. + // clusterName is the name of the cluster where the workload is currently assigned. + // + // With ElasticJobs, this field may also indicate the cluster where the original (old) workload + // was assigned, providing placement context for new scaled-up workloads. This supports + // affinity or propagation policies across workload slices. + // // This field is reset after the Workload is evicted. // +optional ClusterName *string `json:"clusterName,omitempty"` diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml index 955cdd95c35..bd1c6ca6cbf 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml @@ -8612,7 +8612,12 @@ spec: x-kubernetes-list-type: map clusterName: description: |- - clusterName is the name of the cluster where the workload is actually assigned. + clusterName is the name of the cluster where the workload is currently assigned. + + With ElasticJobs, this field may also indicate the cluster where the original (old) workload + was assigned, providing placement context for new scaled-up workloads. This supports + affinity or propagation policies across workload slices. + This field is reset after the Workload is evicted. type: string conditions: diff --git a/config/components/crd/bases/kueue.x-k8s.io_workloads.yaml b/config/components/crd/bases/kueue.x-k8s.io_workloads.yaml index 13affa0cecf..fd3cb8b277a 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_workloads.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_workloads.yaml @@ -9096,7 +9096,12 @@ spec: x-kubernetes-list-type: map clusterName: description: |- - clusterName is the name of the cluster where the workload is actually assigned. + clusterName is the name of the cluster where the workload is currently assigned. + + With ElasticJobs, this field may also indicate the cluster where the original (old) workload + was assigned, providing placement context for new scaled-up workloads. This supports + affinity or propagation policies across workload slices. + This field is reset after the Workload is evicted. type: string conditions: diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 4933128f344..db7a23e0f99 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -51,6 +51,7 @@ import ( clientutil "sigs.k8s.io/kueue/pkg/util/client" utilmaps "sigs.k8s.io/kueue/pkg/util/maps" "sigs.k8s.io/kueue/pkg/workload" + "sigs.k8s.io/kueue/pkg/workloadslicing" ) var ( @@ -95,6 +96,13 @@ func (g *wlGroup) IsFinished() bool { return apimeta.IsStatusConditionTrue(g.local.Status.Conditions, kueue.WorkloadFinished) } +// IsElasticWorkload returns true if the workload is considered elastic, +// meaning the ElasticJobsViaWorkloadSlices feature is enabled and the +// workload has the corresponding annotation set. +func (g *wlGroup) IsElasticWorkload() bool { + return workloadslicing.IsElasticWorkload(g.local) +} + // FirstReserving returns true if there is a workload reserving quota, // the string identifies the remote cluster. func (g *wlGroup) FirstReserving() (bool, string) { @@ -315,7 +323,16 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco acs := admissioncheck.FindAdmissionCheck(group.local.Status.AdmissionChecks, group.acName) - // 1. delete all remote workloads when finished or the local wl has no reservation + // 0. Ignore Elastic workloads Finished when: + // - Workload is "Finished" as a result workload slice replacement, OR + // - Workload doesn't have quota reservation as a result of scale-up, i.e., scaling-up in progress. + if group.IsElasticWorkload() && + ((group.IsFinished() && workloadslicing.IsReplaced(group.local.Status)) || + (!workload.HasQuotaReservation(group.local) && workloadslicing.ScaledUp(group.local))) { + return reconcile.Result{}, nil + } + + // 1. delete all remote workloads when local workload is finished or has no quota reservation. if group.IsFinished() || !workload.HasQuotaReservation(group.local) { var errs []error for rem := range group.remotes { @@ -361,9 +378,19 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueue.MultiKueueControllerName+"-finish"), client.ForceOwnership) } - // 2. delete all workloads that are out of sync or are not in the chosen worker + // 2. delete all workloads that are out of sync (other than scaled-down elastic workloads) + // or are not in the chosen worker. for rem, remWl := range group.remotes { if remWl != nil && !equality.Semantic.DeepEqual(group.local.Spec, remWl.Spec) { + // For elastic workloads detect a scale-down event and propagate changes to the remote. + if group.IsElasticWorkload() && workloadslicing.ScaledDown(workload.ExtractPodSetCountsFromWorkload(remWl), workload.ExtractPodSetCountsFromWorkload(group.local)) { + remWl.Spec = group.local.Spec + if err := group.remoteClients[rem].client.Update(ctx, remWl); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to update remote workload slice: %w", err) + } + continue + } + if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil { log.V(2).Error(err, "Deleting out of sync remote objects", "remote", rem) return reconcile.Result{}, err @@ -442,7 +469,13 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group log.V(3).Info("Nominate and Synchronize Worker Clusters") var nominatedWorkers []string - if w.dispatcherName == config.MultiKueueDispatcherModeAllAtOnce { + // For elastic workloads, retrieve the remote cluster where the original workload was scheduled. + // For now, new workload slices will continue to be assigned to the same cluster. + // In the future, we may introduce more nuanced remote workload propagation policies, + // supporting preferred or required placement constraints. + if clusterName := workload.ClusterName(group.local); group.IsElasticWorkload() && clusterName != "" { + nominatedWorkers = []string{clusterName} + } else if w.dispatcherName == config.MultiKueueDispatcherModeAllAtOnce { for workerName := range group.remotes { nominatedWorkers = append(nominatedWorkers, workerName) } @@ -459,6 +492,7 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group // Incremental dispatcher and External dispatcher path nominatedWorkers = group.local.Status.NominatedClusterNames } + log.V(4).Info("Synchronize nominated worker clusters", "dispatcherName", w.dispatcherName, "nominatedWorkerClusterNames", nominatedWorkers) var errs []error diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index 695a349bb08..282610220e7 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/component-base/featuregate" testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -48,8 +49,8 @@ import ( "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" + "sigs.k8s.io/kueue/pkg/workloadslicing" - // To ensure the integration manager gets populated _ "sigs.k8s.io/kueue/pkg/controller/jobs" ) @@ -74,13 +75,14 @@ func TestWlReconcile(t *testing.T) { baseJobManagedByKueueBuilder := baseJobBuilder.Clone().ManagedBy(kueue.MultiKueueControllerName) cases := map[string]struct { + features map[featuregate.Feature]bool + reconcileFor string managersWorkloads []kueue.Workload managersJobs []batchv1.Job managersDeletedWorkloads []*kueue.Workload worker1Workloads []kueue.Workload worker1Jobs []batchv1.Job - withoutJobManagedBy bool dispatcherName *string // second worker @@ -211,6 +213,7 @@ func TestWlReconcile(t *testing.T) { }, }, "unmanaged wl (job not managed by multikueue) is rejected": { + features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true}, reconcileFor: "wl1", managersJobs: []batchv1.Job{ *baseJobBuilder.Clone().Obj(), @@ -297,9 +300,8 @@ func TestWlReconcile(t *testing.T) { }, }, "wl without reservation, clears the workload objects (withoutJobManagedBy)": { - reconcileFor: "wl1", - withoutJobManagedBy: true, - managersJobs: []batchv1.Job{*baseJobBuilder.Clone().Obj()}, + reconcileFor: "wl1", + managersJobs: []batchv1.Job{*baseJobBuilder.Clone().Obj()}, managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). @@ -435,6 +437,7 @@ func TestWlReconcile(t *testing.T) { wantError: errFake, }, "remote wl with reservation": { + features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true}, reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). @@ -499,8 +502,7 @@ func TestWlReconcile(t *testing.T) { }, }, "remote wl with reservation (withoutJobManagedBy)": { - reconcileFor: "wl1", - withoutJobManagedBy: true, + reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). @@ -564,9 +566,8 @@ func TestWlReconcile(t *testing.T) { }, }, "remote wl with reservation (withoutJobManagedBy, MultiKueueDispatcherModeIncremental)": { - reconcileFor: "wl1", - withoutJobManagedBy: true, - dispatcherName: ptr.To(config.MultiKueueDispatcherModeIncremental), + reconcileFor: "wl1", + dispatcherName: ptr.To(config.MultiKueueDispatcherModeIncremental), managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). @@ -630,6 +631,7 @@ func TestWlReconcile(t *testing.T) { }, }, "remote job is changing status the local Job is updated ": { + features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true}, reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). @@ -701,8 +703,7 @@ func TestWlReconcile(t *testing.T) { }, }, "remote job is changing status, the local job is not updated (withoutJobManagedBy)": { - reconcileFor: "wl1", - withoutJobManagedBy: true, + reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ @@ -835,8 +836,7 @@ func TestWlReconcile(t *testing.T) { }, }, "remote wl is finished, the local workload and Job are marked completed (withoutJobManagedBy)": { - reconcileFor: "wl1", - withoutJobManagedBy: true, + reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ @@ -1047,6 +1047,7 @@ func TestWlReconcile(t *testing.T) { }, }, "worker reconnects after the local workload is requeued and got reservation on a second worker": { + features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true}, // the worker with the oldest reservation is kept reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ @@ -1124,11 +1125,330 @@ func TestWlReconcile(t *testing.T) { }, }, }, + "elastic job finished local workload via replacement is ignored": { + features: map[featuregate.Feature]bool{features.ElasticJobsViaWorkloadSlices: true}, + reconcileFor: "wl1", + + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadSliceReplaced, + }). + ClusterName("worker1"). + Obj(), + }, + managersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueue.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(now.Add(-time.Hour)). // one hour ago + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + useSecondWorker: true, + + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + ClusterName("worker1"). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadSliceReplaced, + }). + Obj(), + }, + wantManagersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + wantWorker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueue.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantWorker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + }, + "elastic job local workload without quota reservation": { + features: map[featuregate.Feature]bool{features.ElasticJobsViaWorkloadSlices: true}, + reconcileFor: "wl1", + + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ClusterName("worker1"). + Obj(), + }, + managersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueue.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(now.Add(-time.Hour)). // one hour ago + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + useSecondWorker: true, + + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ClusterName("worker1"). + Obj(), + }, + wantManagersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + }, + "elastic job local scaled-up workload slice without quota reservation": { + features: map[featuregate.Feature]bool{ + features.ElasticJobsViaWorkloadSlices: true, + features.MultiKueueBatchJobWithManagedBy: true, + }, + reconcileFor: "wl1", + + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Annotation(workloadslicing.WorkloadSliceReplacementFor, "old-slice"). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ClusterName("worker1"). + Obj(), + }, + managersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueue.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(now.Add(-time.Hour)). // one hour ago + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + useSecondWorker: true, + + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Annotation(workloadslicing.WorkloadSliceReplacementFor, "old-slice"). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ClusterName("worker1"). + Obj(), + }, + wantManagersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + wantWorker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueue.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantWorker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + }, + "elastic job local workload out-of-sync other than scaled-down": { + features: map[featuregate.Feature]bool{ + features.ElasticJobsViaWorkloadSlices: true, + features.MultiKueueBatchJobWithManagedBy: true, + }, + reconcileFor: "wl1", + + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + PodSets(*utiltesting.MakePodSet("different-name", 1).Obj()). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + ClusterName("worker1"). + Obj(), + }, + managersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueue.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(now.Add(-time.Hour)). // one hour ago + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + useSecondWorker: true, + + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + PodSets(*utiltesting.MakePodSet("different-name", 1).Obj()). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateRetry, + Message: "Reserving remote lost", + }). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + ClusterName("worker1"). + Obj(), + }, + wantManagersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + }, + "elastic job local workload out-of-sync scaled-down": { + features: map[featuregate.Feature]bool{ + features.ElasticJobsViaWorkloadSlices: true, + features.MultiKueueBatchJobWithManagedBy: true, + }, + reconcileFor: "wl1", + + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + ClusterName("worker1"). + Obj(), + }, + managersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueue.MultiKueueOriginLabel, defaultOrigin). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 2).Obj()). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(now.Add(-time.Hour)). // one hour ago + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + useSecondWorker: true, + + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Annotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + ClusterName("worker1"). + Obj(), + }, + wantManagersJobs: []batchv1.Job{ + *baseJobManagedByKueueBuilder.Clone().Obj(), + }, + wantWorker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueue.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantWorker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: client.ObjectKeyFromObject(baseWorkloadBuilder.Clone().Obj()), + EventType: "Normal", + Reason: "MultiKueue", + Message: `The workload got reservation on "worker1"`, + }, + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { - features.SetFeatureGateDuringTest(t, features.MultiKueueBatchJobWithManagedBy, !tc.withoutJobManagedBy) + for feature, enabled := range tc.features { + features.SetFeatureGateDuringTest(t, feature, enabled) + } + ctx, _ := utiltesting.ContextWithLog(t) managerBuilder := getClientBuilder(ctx) managerBuilder = managerBuilder.WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index d26c48b0b93..71c5f00821b 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -785,26 +785,6 @@ func FindAncestorJobManagedByKueue(ctx context.Context, c client.Client, jobObj func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, object client.Object) (*kueue.Workload, error) { log := ctrl.LoggerFrom(ctx) - // If workload slicing is enabled for this job, use the slice-based processing path. - if workloadSliceEnabled(job) { - podSets, err := job.PodSets() - if err != nil { - return nil, fmt.Errorf("failed to retrieve pod sets from job: %w", err) - } - - // Workload slices allow modifications only to PodSet.Count. - // Any other changes will result in the slice being marked as incompatible, - // and the workload will fall back to being processed by the original ensureOneWorkload function. - wl, compatible, err := workloadslicing.EnsureWorkloadSlices(ctx, r.client, podSets, object, job.GVK()) - if err != nil { - return nil, err - } - if compatible { - return wl, nil - } - // Fallback. - } - if prebuiltWorkloadName, usePrebuiltWorkload := PrebuiltWorkloadFor(job); usePrebuiltWorkload { wl := &kueue.Workload{} err := r.client.Get(ctx, types.NamespacedName{Name: prebuiltWorkloadName, Namespace: object.GetNamespace()}, wl) @@ -830,12 +810,40 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o return nil, err } + // Skip the in-sync check for ElasticJob workloads if the workload is a + // newly scaled-up replacement. This prevents premature removal of remote + // objects for a Job that has not yet been synced after scale-up. + if workloadslicing.Enabled(object) && workloadslicing.ScaledUp(wl) { + log.V(3).Info("WorkloadSlice: skip in-sync check in ensurePrebuiltWorkload") + return wl, nil + } + if inSync, err := r.ensurePrebuiltWorkloadInSync(ctx, wl, job); !inSync || err != nil { return nil, err } return wl, nil } + // If workload slicing is enabled for this job, use the slice-based processing path. + if workloadSliceEnabled(job) { + podSets, err := job.PodSets() + if err != nil { + return nil, fmt.Errorf("failed to retrieve pod sets from job: %w", err) + } + + // Workload slices allow modifications only to PodSet.Count. + // Any other changes will result in the slice being marked as incompatible, + // and the workload will fall back to being processed by the original ensureOneWorkload function. + wl, compatible, err := workloadslicing.EnsureWorkloadSlices(ctx, r.client, podSets, object, job.GVK()) + if err != nil { + return nil, err + } + if compatible { + return wl, nil + } + // Fallback. + } + // Find a matching workload first if there is one. var toDelete []*kueue.Workload var match *kueue.Workload @@ -1206,11 +1214,19 @@ func ConstructWorkload(ctx context.Context, c client.Client, job GenericJob, lab if err := ctrl.SetControllerReference(object, wl, c.Scheme()); err != nil { return nil, err } + return wl, nil } // prepareWorkloadSlice adds necessary workload slice annotations. func prepareWorkloadSlice(ctx context.Context, clnt client.Client, job GenericJob, wl *kueue.Workload) error { + // Annotate the workload to indicate that elastic-job support is enabled. + // This annotation makes it possible to distinguish workloads with elastic-job + // support directly at the workload level, without requiring access to the + // associated Job object. This is particularly useful in contexts such as the + // MultiKueue workload controller, where the Job may not be immediately available. + metav1.SetMetaDataAnnotation(&wl.ObjectMeta, workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue) + // Lookup existing slice for a given job. workloadSlices, err := workloadslicing.FindNotFinishedWorkloads(ctx, clnt, job.Object(), job.GVK()) if err != nil { @@ -1227,6 +1243,7 @@ func prepareWorkloadSlice(ctx context.Context, clnt client.Client, job GenericJo oldSlice := workloadSlices[0] // Annotate new workload slice with the preemptible (old) workload slice. metav1.SetMetaDataAnnotation(&wl.ObjectMeta, workloadslicing.WorkloadSliceReplacementFor, string(workload.Key(&oldSlice))) + return nil default: // Any other slices length is invalid. I.E, we expect to have at most 1 "current/old" workload slice. @@ -1485,7 +1502,7 @@ func clearMinCountsIfFeatureDisabled(in []kueue.PodSet) []kueue.PodSet { // - The job's underlying object is not nil. // - The job's object has opted in for WorkloadSlice processing. func workloadSliceEnabled(job GenericJob) bool { - if !features.Enabled(features.ElasticJobsViaWorkloadSlices) || job == nil { + if job == nil { return false } jobObject := job.Object() diff --git a/pkg/controller/jobframework/reconciler_test.go b/pkg/controller/jobframework/reconciler_test.go index 485f5c08f38..aa9bd8fd64c 100644 --- a/pkg/controller/jobframework/reconciler_test.go +++ b/pkg/controller/jobframework/reconciler_test.go @@ -123,7 +123,9 @@ func TestReconcileGenericJob(t *testing.T) { Obj(), podSets: basePodSets, wantWorkloads: []kueue.Workload{ - *baseWl.Clone().Name("job-test-job-3991b").Obj(), + *baseWl.Clone().Name("job-test-job-3991b"). + Annotations(map[string]string{workloadslicing.EnabledAnnotationKey: workloadslicing.EnabledAnnotationValue}). + Obj(), }, }, "update workload to match job (one existing workload)": { diff --git a/pkg/controller/jobframework/validation.go b/pkg/controller/jobframework/validation.go index 85673f74e98..bcd8d4a02e3 100644 --- a/pkg/controller/jobframework/validation.go +++ b/pkg/controller/jobframework/validation.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/features" utilpod "sigs.k8s.io/kueue/pkg/util/pod" + "sigs.k8s.io/kueue/pkg/workloadslicing" ) var ( @@ -78,6 +79,7 @@ func ValidateJobOnUpdate(oldJob, newJob GenericJob, defaultQueueExist func(strin allErrs = append(allErrs, validateUpdateForPrebuiltWorkload(oldJob, newJob)...) allErrs = append(allErrs, validateUpdateForMaxExecTime(oldJob, newJob)...) allErrs = append(allErrs, validateJobUpdateForWorkloadPriorityClassName(oldJob, newJob)...) + allErrs = append(allErrs, validatedUpdateForEnabledWorkloadSlice(oldJob, newJob)...) return allErrs } @@ -137,16 +139,13 @@ func validateUpdateForQueueName(oldJob, newJob GenericJob, defaultQueueExist fun } func validateUpdateForPrebuiltWorkload(oldJob, newJob GenericJob) field.ErrorList { - var allErrs field.ErrorList - if !newJob.IsSuspended() { - oldWlName, _ := PrebuiltWorkloadFor(oldJob) - newWlName, _ := PrebuiltWorkloadFor(newJob) - - allErrs = append(allErrs, apivalidation.ValidateImmutableField(newWlName, oldWlName, labelsPath.Key(constants.PrebuiltWorkloadLabel))...) - } else { - allErrs = append(allErrs, validateCreateForPrebuiltWorkload(newJob)...) + if newJob.IsSuspended() || workloadslicing.Enabled(oldJob.Object()) { + return validateCreateForPrebuiltWorkload(newJob) } - return allErrs + + oldWlName, _ := PrebuiltWorkloadFor(oldJob) + newWlName, _ := PrebuiltWorkloadFor(newJob) + return apivalidation.ValidateImmutableField(newWlName, oldWlName, labelsPath.Key(constants.PrebuiltWorkloadLabel)) } func validateJobUpdateForWorkloadPriorityClassName(oldJob, newJob GenericJob) field.ErrorList { @@ -157,6 +156,19 @@ func validateJobUpdateForWorkloadPriorityClassName(oldJob, newJob GenericJob) fi return allErrs } +// validatedUpdateForEnabledWorkloadSlice validates that the workload-slicing toggle remains immutable on update. +// +// It compares the boolean returned by workloadslicing.Enabled for the old and new Job objects. +// If the value changed, it returns a field.ErrorList with a single field.Invalid pointing at +// labels[workloadslicing.EnabledAnnotationKey] and using apivalidation.FieldImmutableErrorMsg. +// If the value did not change, // it returns nil. +func validatedUpdateForEnabledWorkloadSlice(oldJob, newJob GenericJob) field.ErrorList { + if oldEnabled, newEnabled := workloadslicing.Enabled(oldJob.Object()), workloadslicing.Enabled(newJob.Object()); oldEnabled != newEnabled { + return field.ErrorList{field.Invalid(labelsPath.Key(workloadslicing.EnabledAnnotationKey), newEnabled, apivalidation.FieldImmutableErrorMsg)} + } + return nil +} + func ValidateUpdateForWorkloadPriorityClassName(oldObj, newObj client.Object) field.ErrorList { allErrs := apivalidation.ValidateImmutableField(WorkloadPriorityClassName(newObj), WorkloadPriorityClassName(oldObj), workloadPriorityClassNamePath) return allErrs diff --git a/pkg/controller/jobframework/validation_test.go b/pkg/controller/jobframework/validation_test.go index 6635dc0e7e1..0b51dc5de5a 100644 --- a/pkg/controller/jobframework/validation_test.go +++ b/pkg/controller/jobframework/validation_test.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/component-base/featuregate" "k8s.io/utils/ptr" mocks "sigs.k8s.io/kueue/internal/mocks/controller/jobframework" @@ -33,6 +34,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/features" utiltestingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" + "sigs.k8s.io/kueue/pkg/workloadslicing" ) var ( @@ -253,16 +255,16 @@ func TestValidateJobOnUpdate(t *testing.T) { t.Cleanup(jobframework.EnableIntegrationsForTest(t, "batch/job")) fieldString := field.NewPath("metadata").Child("labels").Key(constants.QueueLabel).String() testCases := map[string]struct { - oldJob *batchv1.Job - newJob *batchv1.Job - defaultLocalQueueEnabled bool - nsHasDefaultQueue bool - wantErr field.ErrorList + oldJob *batchv1.Job + newJob *batchv1.Job + nsHasDefaultQueue bool + featureGates map[featuregate.Feature]bool + wantErr field.ErrorList }{ "local queue cannot be changed if job is not suspended": { - oldJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq1").Suspend(false).Obj(), - newJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq2").Suspend(false).Obj(), - defaultLocalQueueEnabled: true, + oldJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq1").Suspend(false).Obj(), + newJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq2").Suspend(false).Obj(), + featureGates: map[featuregate.Feature]bool{features.LocalQueueDefaulting: true}, wantErr: field.ErrorList{ &field.Error{ Type: field.ErrorTypeInvalid, @@ -271,22 +273,19 @@ func TestValidateJobOnUpdate(t *testing.T) { }, }, "local queue can be changed": { - oldJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq1").Suspend(true).Obj(), - newJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq2").Suspend(true).Obj(), - nsHasDefaultQueue: true, - defaultLocalQueueEnabled: true, + oldJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq1").Suspend(true).Obj(), + newJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq2").Suspend(true).Obj(), + nsHasDefaultQueue: true, }, "local queue can be changed from default": { - oldJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("default").Suspend(true).Obj(), - newJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq2").Suspend(true).Obj(), - nsHasDefaultQueue: true, - defaultLocalQueueEnabled: true, + oldJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("default").Suspend(true).Obj(), + newJob: utiltestingjob.MakeJob("test-job", "ns1").Queue("lq2").Suspend(true).Obj(), + nsHasDefaultQueue: true, }, "local queue cannot be removed if default queue exists and feature is enabled": { - oldJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("lq1").Obj(), - newJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("").Obj(), - nsHasDefaultQueue: true, - defaultLocalQueueEnabled: true, + oldJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("lq1").Obj(), + newJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("").Obj(), + nsHasDefaultQueue: true, wantErr: field.ErrorList{ &field.Error{ Type: field.ErrorTypeInvalid, @@ -295,22 +294,43 @@ func TestValidateJobOnUpdate(t *testing.T) { }, }, "local queue can be removed if default queue does not exists and feature is enabled": { - oldJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("lq1").Obj(), - newJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("").Obj(), - nsHasDefaultQueue: false, - defaultLocalQueueEnabled: true, + oldJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("lq1").Obj(), + newJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("").Obj(), + nsHasDefaultQueue: false, }, "local queue can be removed if feature is not enabled": { - oldJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("lq1").Obj(), - newJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("").Obj(), - nsHasDefaultQueue: true, - defaultLocalQueueEnabled: false, + oldJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("lq1").Obj(), + newJob: utiltestingjob.MakeJob("test-job", "ns1").Suspend(true).Queue("").Obj(), + nsHasDefaultQueue: true, + featureGates: map[featuregate.Feature]bool{features.LocalQueueDefaulting: false}, + }, + "elastic job enabled annotation cannot be removed on update": { + oldJob: utiltestingjob.MakeJob("test-job", "ns1").SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue).Obj(), + newJob: utiltestingjob.MakeJob("test-job", "ns1").Obj(), + featureGates: map[featuregate.Feature]bool{ + features.ElasticJobsViaWorkloadSlices: true, + }, + wantErr: field.ErrorList{ + field.Invalid(field.NewPath("metadata.labels["+workloadslicing.EnabledAnnotationKey+"]"), "false", "field is immutable"), + }, + }, + "elastic job enabled annotation cannot be added on update": { + oldJob: utiltestingjob.MakeJob("test-job", "ns1").Obj(), + newJob: utiltestingjob.MakeJob("test-job", "ns1").SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue).Obj(), + featureGates: map[featuregate.Feature]bool{ + features.ElasticJobsViaWorkloadSlices: true, + }, + wantErr: field.ErrorList{ + field.Invalid(field.NewPath("metadata.labels["+workloadslicing.EnabledAnnotationKey+"]"), "false", "field is immutable"), + }, }, } for tcName, tc := range testCases { t.Run(tcName, func(t *testing.T) { - features.SetFeatureGateDuringTest(t, features.LocalQueueDefaulting, tc.defaultLocalQueueEnabled) + for feature, enabled := range tc.featureGates { + features.SetFeatureGateDuringTest(t, feature, enabled) + } mockctrl := gomock.NewController(t) diff --git a/pkg/controller/jobs/job/job_multikueue_adapter.go b/pkg/controller/jobs/job/job_multikueue_adapter.go index f903ef26c68..783557e664a 100644 --- a/pkg/controller/jobs/job/job_multikueue_adapter.go +++ b/pkg/controller/jobs/job/job_multikueue_adapter.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/api" clientutil "sigs.k8s.io/kueue/pkg/util/client" + "sigs.k8s.io/kueue/pkg/workloadslicing" ) type multiKueueAdapter struct{} @@ -50,6 +51,7 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie localJob := batchv1.Job{} err := localClient.Get(ctx, key, &localJob) if err != nil { + fmt.Println("PROD: LOCAL JOB ERROR", err, key) return err } @@ -85,6 +87,32 @@ func (b *multiKueueAdapter) SyncJob(ctx context.Context, localClient client.Clie return &localJob, true, nil }) } + + if workloadslicing.Enabled(&localJob) { + // Update remote job's workload slice name and parallelism if needed. + if err := clientutil.Patch(ctx, remoteClient, &remoteJob, func() (client.Object, bool, error) { + // Update workload name label. + labelsChanged := false + if remoteJob.Labels == nil { + remoteJob.Labels = map[string]string{constants.PrebuiltWorkloadLabel: workloadName} + labelsChanged = true + } else { + if cur, ok := remoteJob.Labels[constants.PrebuiltWorkloadLabel]; !ok || cur != workloadName { + remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName + labelsChanged = true + } + } + + // Update parallelism. + oldParallelism := ptr.Deref(remoteJob.Spec.Parallelism, 0) + newParallelism := ptr.Deref(localJob.Spec.Parallelism, 0) + remoteJob.Spec.Parallelism = localJob.Spec.Parallelism + return &remoteJob, oldParallelism != newParallelism || labelsChanged, nil + }); err != nil { + return fmt.Errorf("failed to patch remote job: %w", err) + } + } + return nil } diff --git a/pkg/controller/jobs/job/job_multikueue_adapter_test.go b/pkg/controller/jobs/job/job_multikueue_adapter_test.go index 59bded04631..d17ad6ab812 100644 --- a/pkg/controller/jobs/job/job_multikueue_adapter_test.go +++ b/pkg/controller/jobs/job/job_multikueue_adapter_test.go @@ -26,8 +26,12 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/component-base/featuregate" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -36,6 +40,7 @@ import ( "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" utiltestingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" + "sigs.k8s.io/kueue/pkg/workloadslicing" ) const ( @@ -288,6 +293,7 @@ func TestMultiKueueAdapter(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { features.SetFeatureGateDuringTest(t, features.MultiKueueBatchJobWithManagedBy, !tc.withoutJobManagedBy) + managerBuilder := utiltesting.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) managerBuilder = managerBuilder.WithLists(&batchv1.JobList{Items: tc.managersJobs}) managerBuilder = managerBuilder.WithStatusSubresource(slices.Map(tc.managersJobs, func(w *batchv1.Job) client.Object { return w })...) @@ -327,3 +333,406 @@ func TestMultiKueueAdapter(t *testing.T) { }) } } + +func Test_multiKueueAdapter_SyncJob(t *testing.T) { + type fields struct { + features map[featuregate.Feature]bool + } + type args struct { + ctx context.Context + localClient client.Client + remoteClient client.Client + key types.NamespacedName + workloadName string + origin string + } + type want struct { + err bool + localJob *batchv1.Job + remoteJob *batchv1.Job + } + + schema := runtime.NewScheme() + _ = scheme.AddToScheme(schema) + _ = kueue.AddToScheme(schema) + + newJob := func() *utiltestingjob.JobWrapper { + return utiltestingjob.MakeJob("test", TestNamespace).ResourceVersion("1") + } + runningJobCondition := batchv1.JobCondition{ + Type: batchv1.JobSuccessCriteriaMet, + Status: corev1.ConditionFalse, + } + + tests := map[string]struct { + fields fields + args args + want want + }{ + "FailureToRetrieveLocalJob": { + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().Build(), + }, + want: want{ + err: true, + }, + }, + "FailureToRetrieveRemoteJob": { + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + ManagedBy("parent").Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{ + Get: func(_ context.Context, _ client.WithWatch, _ client.ObjectKey, _ client.Object, _ ...client.GetOption) error { + return errors.New("test-error") + }, + }).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + }, + want: want{ + err: true, + localJob: newJob().ManagedBy("parent").Obj(), + }, + }, + "RemoteJobNotFound": { + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + ManagedBy("parent").Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + }, + want: want{ + localJob: newJob().ManagedBy("parent").Obj(), + remoteJob: newJob().Label(kueue.MultiKueueOriginLabel, ""). + Label(constants.PrebuiltWorkloadLabel, ""). + ManagedBy("parent"). + Obj(), + }, + }, + "RemoteJobNotFound_ResetManagedBy": { + fields: fields{ + features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true}, + }, + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + ManagedBy("parent").Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + }, + want: want{ + localJob: newJob().ManagedBy("parent").Obj(), + remoteJob: newJob().Label(kueue.MultiKueueOriginLabel, ""). + Label(constants.PrebuiltWorkloadLabel, ""). + Obj(), + }, + }, + "RemoteJobInProgress_LocalIsManagedButStillSuspended": { + fields: fields{ + features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true}, + }, + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob().Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + Condition(runningJobCondition). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + }, + want: want{ + localJob: newJob().Obj(), + remoteJob: newJob(). + Condition(runningJobCondition). + Obj(), + }, + }, + "RemoteJobInProgress_LocalIsManagedAndUnsuspended": { + fields: fields{ + features: map[featuregate.Feature]bool{features.MultiKueueBatchJobWithManagedBy: true}, + }, + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob().Suspend(false).Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + Condition(runningJobCondition). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + }, + want: want{ + localJob: newJob(). + ResourceVersion("2"). + Suspend(false). + Condition(runningJobCondition). + Obj(), + remoteJob: newJob(). + Condition(runningJobCondition). + Obj(), + }, + }, + "RemoteJobInProgress_LocalIsNotManaged": { + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob().Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + Condition(runningJobCondition). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + }, + want: want{ + localJob: newJob().Obj(), + remoteJob: newJob(). + Condition(runningJobCondition). + Obj(), + }, + }, + "RemoteJobFinished_Completed": { + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob().Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + Condition(batchv1.JobCondition{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + }, + want: want{ + localJob: newJob(). + ResourceVersion("2"). + Condition( + batchv1.JobCondition{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }). + Obj(), + }, + }, + "RemoteJobFinished_Failed": { + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob().Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + Condition(batchv1.JobCondition{ + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + }). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + }, + want: want{ + localJob: newJob(). + ResourceVersion("2"). + Condition( + batchv1.JobCondition{ + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + }). + Obj(), + }, + }, + "ElasticJob_RemoteInSync": { + fields: fields{ + features: map[featuregate.Feature]bool{features.ElasticJobsViaWorkloadSlices: true}, + }, + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Condition(runningJobCondition). + Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload"). + Condition(runningJobCondition). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + workloadName: "test-workload", + }, + want: want{ + localJob: newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Condition(runningJobCondition). + Obj(), + remoteJob: newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload"). + Condition(runningJobCondition). + Obj(), + }, + }, + "ElasticJob_WorkloadNameOnlyChange_EdgeCase": { + fields: fields{ + features: map[featuregate.Feature]bool{features.ElasticJobsViaWorkloadSlices: true}, + }, + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Condition(runningJobCondition). + Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Condition(runningJobCondition). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + workloadName: "test-workload-new", + }, + want: want{ + localJob: newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Condition(runningJobCondition). + Obj(), + remoteJob: newJob(). + ResourceVersion("2"). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload-new"). + Condition(runningJobCondition). + Obj(), + }, + }, + "ElasticJob_ParallelismOnlyChange_EdgeCase": { + fields: fields{ + features: map[featuregate.Feature]bool{features.ElasticJobsViaWorkloadSlices: true}, + }, + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Parallelism(22). + Condition(runningJobCondition). + Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload"). + Condition(runningJobCondition). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + workloadName: "test-workload", + }, + want: want{ + localJob: newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Parallelism(22). + Condition(runningJobCondition). + Obj(), + remoteJob: newJob(). + ResourceVersion("2"). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload"). + Parallelism(22). + Condition(runningJobCondition). + Obj(), + }, + }, + "ElasticJob_RemoteOutOfSync": { + fields: fields{ + features: map[featuregate.Feature]bool{features.ElasticJobsViaWorkloadSlices: true}, + }, + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Parallelism(22). + Condition(runningJobCondition). + Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload"). + Condition(runningJobCondition). + Obj()).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + workloadName: "test-workload-new", + }, + want: want{ + localJob: newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Parallelism(22). + Condition(runningJobCondition). + Obj(), + remoteJob: newJob(). + ResourceVersion("2"). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload-new"). + Parallelism(22). + Condition(runningJobCondition). + Obj(), + }, + }, + "ElasticJob_RemoteOutOfSync_PatchFailure": { + fields: fields{ + features: map[featuregate.Feature]bool{features.ElasticJobsViaWorkloadSlices: true}, + }, + args: args{ + ctx: t.Context(), + localClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Parallelism(22). + Condition(runningJobCondition). + Obj()).Build(), + remoteClient: fake.NewClientBuilder().WithScheme(schema).WithObjects(newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload"). + Condition(runningJobCondition). + Obj()). + WithInterceptorFuncs(interceptor.Funcs{ + Patch: func(ctx context.Context, client client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + return errors.New("test-patch-error") + }, + }).Build(), + key: client.ObjectKeyFromObject(newJob().Obj()), + workloadName: "test-workload-new", + }, + want: want{ + err: true, + localJob: newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Parallelism(22). + Condition(runningJobCondition). + Obj(), + remoteJob: newJob(). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Label(constants.PrebuiltWorkloadLabel, "test-workload"). + Condition(runningJobCondition). + Obj(), + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + // Setup feature gates if any provided. + for featureName, enabled := range tt.fields.features { + features.SetFeatureGateDuringTest(t, featureName, enabled) + } + + adapter := &multiKueueAdapter{} + // Function call under test with result (error) assertion. + if err := adapter.SyncJob(tt.args.ctx, tt.args.localClient, tt.args.remoteClient, tt.args.key, tt.args.workloadName, tt.args.origin); (err != nil) != tt.want.err { + t.Errorf("SyncJob() error = %v, wantErr %v", err, tt.want.err) + } + + // Side effect assertion: changes to the local job. Must have (not nil) both the client and the job. + if tt.args.localClient != nil && tt.want.localJob != nil { + got := &batchv1.Job{} + if err := tt.args.localClient.Get(tt.args.ctx, client.ObjectKeyFromObject(tt.want.localJob), got); err != nil { + t.Errorf("SyncJob() unexpected assertion error retrieving local job: %v", err) + } + if diff := cmp.Diff(tt.want.localJob, got, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("SyncJob() localJob (-want,+got):\n%s", diff) + } + } + // Side effect assertion: changes on the remote job. Must have (not nil) both the client and the job. + if tt.args.remoteClient != nil && tt.want.remoteJob != nil { + got := &batchv1.Job{} + if err := tt.args.remoteClient.Get(tt.args.ctx, client.ObjectKeyFromObject(tt.want.remoteJob), got); err != nil { + t.Errorf("SyncJob() unexpected assertion error retrieving remote job: %v", err) + } + if diff := cmp.Diff(tt.want.remoteJob, got, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("SyncJob() remoteJob (-want,+got):\n%s", diff) + } + } + }) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8be9fb89034..12e46334045 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -309,6 +309,7 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal { } continue } + if !s.cache.PodsReadyForAllAdmittedWorkloads(log) { log.V(5).Info("Waiting for all admitted workloads to be in the PodsReady condition") // If WaitForPodsReady is enabled and WaitForPodsReady.BlockAdmission is true @@ -324,11 +325,23 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal { log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition") } - // Evict old workload-slice if any. Note: that oldWorkloadSlice is not nil only if - // this is a workload-slice enabled workload and there is an old slice to evict. + // Evict the old workload slice if present. + // oldWorkloadSlice is non-nil only when workload slicing is enabled + // and there is an existing slice to evict. + // + // Copy the clusterName value from the old workload into the new workload + // to ensure consistent placement in a MultiKueue context. + // This status update will be persisted during the workload admission step below. + // + // If the admission step fails, we may end up in a state where: + // - the old workload is marked Finished, and + // - the new workload is not admitted. + // In a single-cluster context, this should lead to Job suspension. + // In a MultiKueue context, this should also trigger removal of remote workload/Job objects. if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil { + e.Obj.Status.ClusterName = oldWorkloadSlice.WorkloadInfo.Obj.Status.ClusterName if err := s.replaceWorkloadSlice(ctx, oldWorkloadSlice.WorkloadInfo.ClusterQueue, e.Obj, oldWorkloadSlice.WorkloadInfo.Obj.DeepCopy()); err != nil { - log.Error(err, "Failed to aggregate workload slice") + log.Error(err, "Failed to replace workload slice") continue } } diff --git a/pkg/util/testingjobs/job/wrappers.go b/pkg/util/testingjobs/job/wrappers.go index 5895dcf6035..11e70b893ab 100644 --- a/pkg/util/testingjobs/job/wrappers.go +++ b/pkg/util/testingjobs/job/wrappers.go @@ -156,6 +156,12 @@ func (j *JobWrapper) SetAnnotation(key, content string) *JobWrapper { return j } +// ResourceVersion sets resource version on job object - helpful when using controller-runtime fake client. +func (j *JobWrapper) ResourceVersion(resourceVersion string) *JobWrapper { + j.ObjectMeta.ResourceVersion = resourceVersion + return j +} + // Toleration adds a toleration to the job. func (j *JobWrapper) Toleration(t corev1.Toleration) *JobWrapper { j.Spec.Template.Spec.Tolerations = append(j.Spec.Template.Spec.Tolerations, t) diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index d407e2d0e56..c51f9be6dc8 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -1371,3 +1371,10 @@ func setSchedulingStatsEviction(wl *kueue.Workload, newEvictionState kueue.Workl func ReasonWithCause(reason, underlyingCause string) string { return fmt.Sprintf("%sDueTo%s", reason, underlyingCause) } + +// ClusterName returns the name of the remote cluster where the original workload +// was scheduled in a multikueue context. If the corresponding annotation is not set, +// it returns an empty string. +func ClusterName(wl *kueue.Workload) string { + return ptr.Deref(wl.Status.ClusterName, "") +} diff --git a/pkg/workloadslicing/workloadslicing.go b/pkg/workloadslicing/workloadslicing.go index 3b21088a101..4891ca4aaf9 100644 --- a/pkg/workloadslicing/workloadslicing.go +++ b/pkg/workloadslicing/workloadslicing.go @@ -61,23 +61,27 @@ func Enabled(object metav1.Object) bool { if object == nil { return false } - return object.GetAnnotations()[EnabledAnnotationKey] == EnabledAnnotationValue + return features.Enabled(features.ElasticJobsViaWorkloadSlices) && object.GetAnnotations()[EnabledAnnotationKey] == EnabledAnnotationValue +} + +// IsElasticWorkload returns true if ElasticJobsViaWorkloadSlices feature gate is enabled +// and the given Workload is marked as elastic (e.g., via annotations or other criteria). +func IsElasticWorkload(workload *kueue.Workload) bool { + if workload == nil { + return false + } + return Enabled(workload) } const ( - // WorkloadSliceReplacementFor is the annotation key used to capture an "old" workload slice key - // that will be preempted by the "new", e.g., this workload slice with annotation. + // WorkloadSliceReplacementFor is the annotation key set on a new workload slice to indicate + // the key of the workload slice it is intended to replace (i.e., the "old" slice being preempted). WorkloadSliceReplacementFor = "kueue.x-k8s.io/workload-slice-replacement-for" ) // ReplacementForKey returns a value for workload "WorkloadSliceReplacementFor" annotation -// key if this workload was annotated with such, otherwise, returns an empty string. func ReplacementForKey(wl *kueue.Workload) *workload.Reference { - annotations := wl.GetAnnotations() - if len(annotations) == 0 { - return nil - } - key, found := annotations[WorkloadSliceReplacementFor] + key, found := wl.GetAnnotations()[WorkloadSliceReplacementFor] if !found { return nil } @@ -85,6 +89,7 @@ func ReplacementForKey(wl *kueue.Workload) *workload.Reference { return &ref } +// Finish updates the status of a workload slice by applying the "Finished" condition // Finish updates the status of a workload slice by applying the "Finished" condition. // The function checks if the "Finished" condition is already applied, and if so, does nothing (NOOP). // If the "Finished" condition is not present, it applies the condition with the provided `reason` and `message`. @@ -153,6 +158,13 @@ func ScaledDown(oldCounts, newCounts workload.PodSetsCounts) bool { return newCounts.HasFewerReplicasThan(oldCounts) && !oldCounts.HasFewerReplicasThan(newCounts) } +// ScaledUp returns true if the given workload has the +// WorkloadSliceReplacementFor annotation, indicating that +// this workload is a scaled-up replacement for another. +func ScaledUp(workload *kueue.Workload) bool { + return ReplacementForKey(workload) != nil +} + // EnsureWorkloadSlices processes the Job object and returns the appropriate workload slice. // // Returns: @@ -314,6 +326,14 @@ func ReplacedWorkloadSlice(wl *workload.Info, snap *schdcache.Snapshot) ([]*pree return []*preemption.Target{{WorkloadInfo: replaced}}, replaced } +// IsReplaced returns true if the workload status contains active WorkloadFinish condition +// with WorkloadSliceReplaced reason. +func IsReplaced(status kueue.WorkloadStatus) bool { + finishedCondition := apimeta.FindStatusCondition(status.Conditions, kueue.WorkloadFinished) + return finishedCondition != nil && finishedCondition.Status == metav1.ConditionTrue && + finishedCondition.Reason == kueue.WorkloadSliceReplaced +} + // FindReplacedSliceTarget identifies and removes a preempted workload slice target from the given list of targets. // The function checks if Elastic Jobs via Workload Slices feature is enabled and if so, attempts to find a matching // workload slice in the target list for the provided preemptor. If a matching slice is found, it is removed from the list diff --git a/pkg/workloadslicing/workloadslicing_test.go b/pkg/workloadslicing/workloadslicing_test.go index 14fed96e370..6b4bfb3b133 100644 --- a/pkg/workloadslicing/workloadslicing_test.go +++ b/pkg/workloadslicing/workloadslicing_test.go @@ -91,6 +91,11 @@ func TestEnabled(t *testing.T) { } for name, tt := range tests { t.Run(name, func(t *testing.T) { + // Always false when feature is disabled (not enabled). + if got := Enabled(tt.args.object); got { + t.Error("Enabled() = true, want false when feature is not enabled") + } + features.SetFeatureGateDuringTest(t, features.ElasticJobsViaWorkloadSlices, true) if got := Enabled(tt.args.object); got != tt.want { t.Errorf("Enabled() = %v, want %v", got, tt.want) } diff --git a/site/content/en/docs/reference/kueue.v1beta1.md b/site/content/en/docs/reference/kueue.v1beta1.md index 261e5d00c98..49a0a7f109f 100644 --- a/site/content/en/docs/reference/kueue.v1beta1.md +++ b/site/content/en/docs/reference/kueue.v1beta1.md @@ -3315,8 +3315,11 @@ This field is optional.
string
clusterName is the name of the cluster where the workload is actually assigned. -This field is reset after the Workload is evicted.
+clusterName is the name of the cluster where the workload is currently assigned.
+With ElasticJobs, this field may also indicate the cluster where the original (old) workload +was assigned, providing placement context for new scaled-up workloads. This supports +affinity or propagation policies across workload slices.
+This field is reset after the Workload is evicted.
unhealthyNodes