@@ -18,14 +18,21 @@ package disruption
18
18
19
19
import (
20
20
"context"
21
+ "fmt"
21
22
22
23
"github.com/samber/lo"
24
+ "k8s.io/apimachinery/pkg/api/resource"
23
25
"k8s.io/utils/clock"
24
26
"sigs.k8s.io/controller-runtime/pkg/client"
25
27
"sigs.k8s.io/controller-runtime/pkg/log"
26
28
"sigs.k8s.io/controller-runtime/pkg/reconcile"
27
29
30
+ "sigs.k8s.io/karpenter/pkg/utils/node"
31
+
32
+ corev1 "k8s.io/api/core/v1"
33
+
28
34
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
35
+ nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
29
36
)
30
37
31
38
// Consolidation is a nodeclaim sub-controller that adds or removes status conditions on empty nodeclaims based on consolidateAfter
@@ -69,10 +76,61 @@ func (c *Consolidation) Reconcile(ctx context.Context, nodePool *v1.NodePool, no
69
76
return reconcile.Result {RequeueAfter : consolidatableTime .Sub (c .clock .Now ())}, nil
70
77
}
71
78
79
+ // Get the node to check utilization
80
+ n , err := nodeclaimutil .NodeForNodeClaim (ctx , c .kubeClient , nodeClaim )
81
+ if err != nil {
82
+ if nodeclaimutil .IsDuplicateNodeError (err ) || nodeclaimutil .IsNodeNotFoundError (err ) {
83
+ return reconcile.Result {}, nil
84
+ }
85
+ return reconcile.Result {}, err
86
+ }
87
+ // Check the node utilization if the utilizationThreshold is specified, the node can be disruptted only if the utilization is below the threshold.
88
+ threshold := nodePool .Spec .Disruption .UtilizationThreshold
89
+ if threshold != nil {
90
+ pods , err := node .GetPods (ctx , c .kubeClient , n )
91
+ if err != nil {
92
+ return reconcile.Result {}, fmt .Errorf ("retrieving node pods, %w" , err )
93
+ }
94
+ cpu , err := calculateUtilizationOfResource (n , corev1 .ResourceCPU , pods )
95
+ if err != nil {
96
+ return reconcile.Result {}, fmt .Errorf ("failed to calculate CPU, %w" , err )
97
+ }
98
+ memory , err := calculateUtilizationOfResource (n , corev1 .ResourceMemory , pods )
99
+ if err != nil {
100
+ return reconcile.Result {}, fmt .Errorf ("failed to calculate memory, %w" , err )
101
+ }
102
+ if cpu > float64 (* threshold )/ 100 || memory > float64 (* threshold )/ 100 {
103
+ if hasConsolidatableCondition {
104
+ _ = nodeClaim .StatusConditions ().Clear (v1 .ConditionTypeConsolidatable )
105
+ log .FromContext (ctx ).V (1 ).Info ("removing consolidatable status condition due to high utilization" )
106
+ }
107
+ }
108
+ }
109
+
72
110
// 6. Otherwise, add the consolidatable status condition
73
111
nodeClaim .StatusConditions ().SetTrue (v1 .ConditionTypeConsolidatable )
74
112
if ! hasConsolidatableCondition {
75
113
log .FromContext (ctx ).V (1 ).Info ("marking consolidatable" )
76
114
}
77
115
return reconcile.Result {}, nil
78
116
}
117
+
118
+ // CalculateUtilizationOfResource calculates utilization of a given resource for a node.
119
+ func calculateUtilizationOfResource (node * corev1.Node , resourceName corev1.ResourceName , pods []* corev1.Pod ) (float64 , error ) {
120
+ allocatable , found := node .Status .Allocatable [resourceName ]
121
+ if ! found {
122
+ return 0 , fmt .Errorf ("failed to get %v from %s" , resourceName , node .Name )
123
+ }
124
+ if allocatable .MilliValue () == 0 {
125
+ return 0 , fmt .Errorf ("%v is 0 at %s" , resourceName , node .Name )
126
+ }
127
+ podsRequest := resource .MustParse ("0" )
128
+ for _ , pod := range pods {
129
+ for _ , container := range pod .Spec .Containers {
130
+ if resourceValue , found := container .Resources .Requests [resourceName ]; found {
131
+ podsRequest .Add (resourceValue )
132
+ }
133
+ }
134
+ }
135
+ return float64 (podsRequest .MilliValue ()) / float64 (allocatable .MilliValue ()), nil
136
+ }
0 commit comments