Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support colocated placement #267

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const (
// which will be used for 1:1 exclusive scheduling in a given subgroup.
SubGroupExclusiveKeyAnnotationKey string = "leaderworkerset.sigs.k8s.io/subgroup-exclusive-topology"

// Colocated topology annotation is used to specify the topology where
// one pod group will be scheduled to one topology domain.
ColocatedKeyAnnotationKey string = "leaderworkerset.sigs.k8s.io/colocated-topology"

// Set name label will record the leaderworkerset name that those resources
// (Pod/Service/StatefulSets) belong to.
SetNameLabelKey string = "leaderworkerset.sigs.k8s.io/name"
Expand Down Expand Up @@ -127,6 +131,10 @@ type LeaderWorkerSetSpec struct {
// NetworkConfig defines the network configuration of the group
// +optional
NetworkConfig *NetworkConfig `json:"networkConfig,omitempty"`

// GroupPlacementPolicy defines scheduling policies for pod groups
// +optional
GroupPlacementPolicy GroupPlacementPolicy `json:"groupPlacementPolicy"`
}

// Template of the leader/worker pods, the group will include at least one leader pod.
Expand Down Expand Up @@ -212,6 +220,25 @@ const (
SubdomainUniquePerReplica SubdomainPolicy = "UniquePerReplica"
)

type GroupPlacementPolicy struct {
// Type defines the placement policy for one pod group
// +kubebuilder:validation:Enum={Exclusive,Colocated,None}
// +kubebuilder:default=None
Type GroupPlacementPolicyType `json:"type"`

// TopologyKey when type is set to
// +optional
TopologyKey *string `json:"topologyKey,omitempty"`
}

type GroupPlacementPolicyType string

const (
ExclusiveGroupPlacementPolicyType GroupPlacementPolicyType = "Exclusive"
ColocatedGroupPlacementPolicyType GroupPlacementPolicyType = "Colocated"
NoneGroupPlacementPolicyType GroupPlacementPolicyType = "None"
)

// RollingUpdateConfiguration defines the parameters to be used for RollingUpdateStrategyType.
type RollingUpdateConfiguration struct {
// The maximum number of replicas that can be unavailable during the update.
Expand Down
21 changes: 21 additions & 0 deletions api/leaderworkerset/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client-go/applyconfiguration/utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions config/crd/bases/leaderworkerset.x-k8s.io_leaderworkersets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ spec:
gets a workerIndex, and it is always set to 0.
Worker pods are named using the format: leaderWorkerSetName-leaderIndex-workerIndex.
properties:
groupPlacementPolicy:
description: GroupPlacementPolicy defines scheduling policies for
pod groups
properties:
topologyKey:
description: TopologyKey when type is set to
type: string
type:
default: None
description: Type defines the placement policy for one pod group
enum:
- Exclusive
- Colocated
- None
type: string
required:
- type
type: object
leaderWorkerTemplate:
description: LeaderWorkerTemplate defines the template for leader/worker
pods
Expand Down
23 changes: 23 additions & 0 deletions docs/examples/sample/lws-colocated-placement.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: leaderworkerset.x-k8s.io/v1
kind: LeaderWorkerSet
metadata:
name: leaderworkerset-colocated
spec:
replicas: 3
groupPlacementPolicy:
type: Colocated
topologyKey: nodepool-id
leaderWorkerTemplate:
size: 4
workerTemplate:
spec:
containers:
- name: nginx
image: nginx:1.14.2
resources:
limits:
cpu: "100m"
requests:
cpu: "50m"
ports:
- containerPort: 8080
10 changes: 8 additions & 2 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,15 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor
})
podAnnotations := make(map[string]string)
podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]

switch lws.Spec.GroupPlacementPolicy.Type {
case leaderworkerset.ExclusiveGroupPlacementPolicyType:
podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = *lws.Spec.GroupPlacementPolicy.TopologyKey
case leaderworkerset.ColocatedGroupPlacementPolicyType:
podAnnotations[leaderworkerset.ColocatedKeyAnnotationKey] = *lws.Spec.GroupPlacementPolicy.TopologyKey
case leaderworkerset.NoneGroupPlacementPolicyType:
}

if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
Expand Down
8 changes: 6 additions & 2 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,12 @@ func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws lead
podAnnotations := make(map[string]string)
podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
podAnnotations[leaderworkerset.LeaderPodNameAnnotationKey] = leaderPod.Name
if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
switch lws.Spec.GroupPlacementPolicy.Type {
case leaderworkerset.ExclusiveGroupPlacementPolicyType:
podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = *lws.Spec.GroupPlacementPolicy.TopologyKey
case leaderworkerset.ColocatedGroupPlacementPolicyType:
podAnnotations[leaderworkerset.ColocatedKeyAnnotationKey] = *lws.Spec.GroupPlacementPolicy.TopologyKey
case leaderworkerset.NoneGroupPlacementPolicyType:
}
if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
Expand Down
35 changes: 33 additions & 2 deletions pkg/webhooks/leaderworkerset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"strconv"

apivalidation "k8s.io/apimachinery/pkg/api/validation"
utilvalidation "k8s.io/apimachinery/pkg/util/validation"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
utilvalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand Down Expand Up @@ -82,6 +82,12 @@ func (r *LeaderWorkerSetWebhook) Default(ctx context.Context, obj runtime.Object
subdomainPolicy := v1.SubdomainShared
lws.Spec.NetworkConfig.SubdomainPolicy = &subdomainPolicy
}

if epKey, foundEpKey := lws.Annotations[v1.ExclusiveKeyAnnotationKey]; foundEpKey {
lws.Spec.GroupPlacementPolicy.Type = v1.ExclusiveGroupPlacementPolicyType
lws.Spec.GroupPlacementPolicy.TopologyKey = ptr.To[string](epKey)
}

return nil
}

Expand Down Expand Up @@ -168,6 +174,12 @@ func (r *LeaderWorkerSetWebhook) generalValidate(obj runtime.Object) field.Error
allErrs = append(allErrs, field.Invalid(maxUnavailablePath, maxUnavailable, "must not be 0 when `maxSurge` is 0"))
}

if epKey, foundEpKey := lws.Annotations[v1.ExclusiveKeyAnnotationKey]; foundEpKey {
allErrs = append(allErrs, validateExclusivePlacement(specPath, lws, epKey)...)
}

allErrs = append(allErrs, validateGroupPlacementPolicy(specPath, lws)...)

if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
allErrs = append(allErrs, validateUpdateSubGroupPolicy(specPath, lws)...)
} else {
Expand Down Expand Up @@ -244,3 +256,22 @@ func validateUpdateSubGroupPolicy(specPath *field.Path, lws *v1.LeaderWorkerSet)
}
return allErrs
}

func validateExclusivePlacement(specPath *field.Path, lws *v1.LeaderWorkerSet, value string) field.ErrorList {
allErrs := field.ErrorList{}
if lws.Spec.GroupPlacementPolicy.Type != v1.ExclusiveGroupPlacementPolicyType {
allErrs = append(allErrs, field.TypeInvalid(specPath.Child("groupPlacementPolicy", "type"), lws.Spec.GroupPlacementPolicy.Type, "groupPlacement must be consistent with annotation key, and if exclusive-topology is set, type must be Exclusive"))
}
if *lws.Spec.GroupPlacementPolicy.TopologyKey != value {
allErrs = append(allErrs, field.Invalid(specPath.Child("groupPlacementPolicy", "topologyKey"), lws.Spec.GroupPlacementPolicy.TopologyKey, "toplogyKey must be similar with annotation value when exclusive-topology is set"))
}
return allErrs
}

func validateGroupPlacementPolicy(specPath *field.Path, lws *v1.LeaderWorkerSet) field.ErrorList {
allErrs := field.ErrorList{}
if lws.Spec.GroupPlacementPolicy.Type != v1.NoneGroupPlacementPolicyType && lws.Spec.GroupPlacementPolicy.TopologyKey == nil {
allErrs = append(allErrs, field.Invalid(specPath.Child("groupPlacementPolicy", "type"), lws.Spec.GroupPlacementPolicy.TopologyKey, "toplogyKey must be set when type is not none"))
}
return allErrs
}
42 changes: 42 additions & 0 deletions pkg/webhooks/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error {
if epKey, foundEpKey := pod.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]; foundEpKey {
SetExclusiveAffinities(pod, groupUniqueKey, epKey, leaderworkerset.GroupUniqueHashLabelKey)
}
if cpKey, foundCpKey := pod.Annotations[leaderworkerset.ColocatedKeyAnnotationKey]; foundCpKey {
SetColocatedAffinities(pod, groupUniqueKey, cpKey, leaderworkerset.GroupUniqueHashLabelKey)
}
_, foundSubGroupSize := pod.Annotations[leaderworkerset.SubGroupSizeAnnotationKey]
if foundSubGroupSize && pod.Labels[leaderworkerset.SubGroupIndexLabelKey] == "" {
// The leader pod always lands on SubGroup 0.
Expand Down Expand Up @@ -204,6 +207,7 @@ func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey
}},
TopologyKey: topologyKey,
})

// Pod anti-affinity ensures exclusively this set lands on the topology, preventing multiple sets per topology domain.
pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
Expand Down Expand Up @@ -242,6 +246,44 @@ func exclusiveAffinityApplied(pod corev1.Pod, topologyKey string) bool {
return hasAffinity && hasAntiAffinity
}

func SetColocatedAffinities(pod *corev1.Pod, groupUniqueKey, topologyKey, podAffinityKey string) {
if colocatedAffinityApplied(pod, topologyKey) {
return
}
if pod.Spec.Affinity == nil {
pod.Spec.Affinity = &corev1.Affinity{}
}
if pod.Spec.Affinity.PodAffinity == nil {
pod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{}
}

// Pod affinity ensures the pods of this set land on the same topology domain.
pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: podAffinityKey,
Operator: metav1.LabelSelectorOpIn,
Values: []string{groupUniqueKey},
},
}},
TopologyKey: topologyKey,
})
}

func colocatedAffinityApplied(pod *corev1.Pod, topologyKey string) bool {
if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
return false
}
hasAffinity := false
for _, podAffinityTerm := range pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution {
if podAffinityTerm.TopologyKey == topologyKey {
hasAffinity = true
}
}
return hasAffinity
}

func getSubGroupIndex(podCount int, subGroupSize int, workerIndex int) string {
if (podCount-1)%subGroupSize == 0 {
// Leader is considered as extra pod, it is part of the first group
Expand Down
Loading