Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,12 @@ type WorkloadStatus struct {
// +optional
NominatedClusterNames []string `json:"nominatedClusterNames,omitempty"`

// clusterName is the name of the cluster where the workload is actually assigned.
// clusterName is the name of the cluster where the workload is currently assigned.
//
// With ElasticJobs, this field may also indicate the cluster where the original (old) workload
// was assigned, providing placement context for new scaled-up workloads. This supports
// affinity or propagation policies across workload slices.
//
// This field is reset after the Workload is evicted.
// +optional
ClusterName *string `json:"clusterName,omitempty"`
Expand Down
7 changes: 6 additions & 1 deletion charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8612,7 +8612,12 @@ spec:
x-kubernetes-list-type: map
clusterName:
description: |-
clusterName is the name of the cluster where the workload is actually assigned.
clusterName is the name of the cluster where the workload is currently assigned.

With ElasticJobs, this field may also indicate the cluster where the original (old) workload
was assigned, providing placement context for new scaled-up workloads. This supports
affinity or propagation policies across workload slices.

This field is reset after the Workload is evicted.
type: string
conditions:
Expand Down
7 changes: 6 additions & 1 deletion config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9096,7 +9096,12 @@ spec:
x-kubernetes-list-type: map
clusterName:
description: |-
clusterName is the name of the cluster where the workload is actually assigned.
clusterName is the name of the cluster where the workload is currently assigned.

With ElasticJobs, this field may also indicate the cluster where the original (old) workload
was assigned, providing placement context for new scaled-up workloads. This supports
affinity or propagation policies across workload slices.

This field is reset after the Workload is evicted.
type: string
conditions:
Expand Down
40 changes: 37 additions & 3 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
clientutil "sigs.k8s.io/kueue/pkg/util/client"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/pkg/workloadslicing"
)

var (
Expand Down Expand Up @@ -95,6 +96,13 @@ func (g *wlGroup) IsFinished() bool {
return apimeta.IsStatusConditionTrue(g.local.Status.Conditions, kueue.WorkloadFinished)
}

// IsElasticWorkload returns true if the workload is considered elastic,
// meaning the ElasticJobsViaWorkloadSlices feature is enabled and the
// workload has the corresponding annotation set.
func (g *wlGroup) IsElasticWorkload() bool {
return workloadslicing.IsElasticWorkload(g.local)
}

Comment on lines +99 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// IsElasticWorkload returns true if the workload is considered elastic,
// meaning the ElasticJobsViaWorkloadSlices feature is enabled and the
// workload has the corresponding annotation set.
func (g *wlGroup) IsElasticWorkload() bool {
return workloadslicing.IsElasticWorkload(g.local)
}

Do we still need? In each place, couldn't we just call workloadslicing.IsElasticWorkload(g.local)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this helper is consistent with other group functions (e.g., IsFinished()), where we don’t need to explicitly unpack group attributes.

That said, I can remove it if it’s a blocker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im ok either way. I like consistency

// FirstReserving returns true if there is a workload reserving quota,
// the string identifies the remote cluster.
func (g *wlGroup) FirstReserving() (bool, string) {
Expand Down Expand Up @@ -315,7 +323,16 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco

acs := admissioncheck.FindAdmissionCheck(group.local.Status.AdmissionChecks, group.acName)

// 1. delete all remote workloads when finished or the local wl has no reservation
// 0. Ignore Elastic workloads Finished when:
// - Workload is "Finished" as a result workload slice replacement, OR
// - Workload doesn't have quota reservation as a result of scale-up, i.e., scaling-up in progress.
if group.IsElasticWorkload() &&
((group.IsFinished() && workloadslicing.IsReplaced(group.local.Status)) ||
(!workload.HasQuotaReservation(group.local) && workloadslicing.ScaledUp(group.local))) {
return reconcile.Result{}, nil
}

// 1. delete all remote workloads when local workload is finished or has no quota reservation.
if group.IsFinished() || !workload.HasQuotaReservation(group.local) {
var errs []error
for rem := range group.remotes {
Expand Down Expand Up @@ -361,9 +378,19 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueue.MultiKueueControllerName+"-finish"), client.ForceOwnership)
}

// 2. delete all workloads that are out of sync or are not in the chosen worker
// 2. delete all workloads that are out of sync (other than scaled-down elastic workloads)
// or are not in the chosen worker.
for rem, remWl := range group.remotes {
if remWl != nil && !equality.Semantic.DeepEqual(group.local.Spec, remWl.Spec) {
// For elastic workloads detect a scale-down event and propagate changes to the remote.
if group.IsElasticWorkload() && workloadslicing.ScaledDown(workload.ExtractPodSetCountsFromWorkload(remWl), workload.ExtractPodSetCountsFromWorkload(group.local)) {
remWl.Spec = group.local.Spec
if err := group.remoteClients[rem].client.Update(ctx, remWl); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update remote workload slice: %w", err)
}
continue
}

if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil {
log.V(2).Error(err, "Deleting out of sync remote objects", "remote", rem)
return reconcile.Result{}, err
Expand Down Expand Up @@ -442,7 +469,13 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group
log.V(3).Info("Nominate and Synchronize Worker Clusters")
var nominatedWorkers []string

if w.dispatcherName == config.MultiKueueDispatcherModeAllAtOnce {
// For elastic workloads, retrieve the remote cluster where the original workload was scheduled.
// For now, new workload slices will continue to be assigned to the same cluster.
// In the future, we may introduce more nuanced remote workload propagation policies,
// supporting preferred or required placement constraints.
if clusterName := workload.ClusterName(group.local); group.IsElasticWorkload() && clusterName != "" {
nominatedWorkers = []string{clusterName}
} else if w.dispatcherName == config.MultiKueueDispatcherModeAllAtOnce {
for workerName := range group.remotes {
nominatedWorkers = append(nominatedWorkers, workerName)
}
Expand All @@ -459,6 +492,7 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group
// Incremental dispatcher and External dispatcher path
nominatedWorkers = group.local.Status.NominatedClusterNames
}

log.V(4).Info("Synchronize nominated worker clusters", "dispatcherName", w.dispatcherName, "nominatedWorkerClusterNames", nominatedWorkers)

var errs []error
Expand Down
Loading