Skip to content

Commit

Permalink
feat: add leader node names to v1beta2.nodepool (#2297)
Browse files Browse the repository at this point in the history
* feat: add leader node names to v1beta2.nodepool

Co-authored-by: Simon Tien <[email protected]>
  • Loading branch information
tnsimon and Simon Tien authored Feb 4, 2025
1 parent 93b544c commit ffb736a
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 27 deletions.
13 changes: 12 additions & 1 deletion charts/yurt-manager/crds/apps.openyurt.io_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,18 @@ spec:
leaderEndpoints:
description: LeaderEndpoints is used for storing the address of Leader Yurthub.
items:
type: string
description: Leader represents the hub leader in a nodepool
properties:
address:
description: The address of the leader yurthub
type: string
nodeName:
description: The node name of the leader yurthub
type: string
required:
- address
- nodeName
type: object
type: array
nodes:
description: The list of nodes' names in the pool
Expand Down
11 changes: 10 additions & 1 deletion pkg/apis/apps/v1beta2/nodepool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,23 @@ type NodePoolStatus struct {

// LeaderEndpoints is used for storing the address of Leader Yurthub.
// +optional
LeaderEndpoints []string `json:"leaderEndpoints,omitempty"`
LeaderEndpoints []Leader `json:"leaderEndpoints,omitempty"`

// Conditions represents the latest available observations of a NodePool's
// current state that includes LeaderHubElection status.
// +optional
Conditions []NodePoolCondition `json:"conditions,omitempty"`
}

// Leader represents the hub leader in a nodepool
type Leader struct {
// The node name of the leader yurthub
NodeName string `json:"nodeName"`

// The address of the leader yurthub
Address string `json:"address"`
}

// NodePoolConditionType represents a NodePool condition value.
type NodePoolConditionType string

Expand Down
17 changes: 16 additions & 1 deletion pkg/apis/apps/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 21 additions & 13 deletions pkg/yurtmanager/controller/hubleader/hubleader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ func (r *ReconcileHubLeader) reconcileHubLeader(ctx context.Context, nodepool *a
// Copy the nodepool to update
updatedNodePool := nodepool.DeepCopy()

// Cache nodes in the list by internalIP -> Node
// Cache nodes in the list by Leader -> Node
// if they are ready and have internal IP
endpointsMap := make(map[string]*corev1.Node)
leadersMap := make(map[appsv1beta2.Leader]*corev1.Node)
for _, n := range currentNodeList.Items {
internalIP, ok := nodeutil.GetInternalIP(&n)
if !ok {
Expand All @@ -196,13 +196,16 @@ func (r *ReconcileHubLeader) reconcileHubLeader(ctx context.Context, nodepool *a
continue
}

endpointsMap[internalIP] = &n
leadersMap[appsv1beta2.Leader{
Address: internalIP,
NodeName: n.Name,
}] = &n
}

// Delete leader endpoints that are not in endpoints map
// Delete leaders that are not in leaders map
// They are either not ready or not longer the node list and need to be removed
leaderDeleteFn := func(endpoint string) bool {
_, ok := endpointsMap[endpoint]
leaderDeleteFn := func(leader appsv1beta2.Leader) bool {
_, ok := leadersMap[leader]
return !ok
}
updatedLeaders := slices.DeleteFunc(updatedNodePool.Status.LeaderEndpoints, leaderDeleteFn)
Expand All @@ -211,13 +214,13 @@ func (r *ReconcileHubLeader) reconcileHubLeader(ctx context.Context, nodepool *a
if len(updatedLeaders) < int(nodepool.Spec.LeaderReplicas) {
// Remove current leaders from candidates
for _, leader := range updatedLeaders {
delete(endpointsMap, leader)
delete(leadersMap, leader)
}

leaders, ok := electNLeaders(
nodepool.Spec.LeaderElectionStrategy,
int(nodepool.Spec.LeaderReplicas)-len(updatedLeaders),
endpointsMap,
leadersMap,
)
if !ok {
klog.Errorf("Failed to elect a leader for NodePool %s", nodepool.Name)
Expand Down Expand Up @@ -246,12 +249,12 @@ func (r *ReconcileHubLeader) reconcileHubLeader(ctx context.Context, nodepool *a
}

// hasLeadersChanged checks if the leader endpoints have changed
func hasLeadersChanged(old, new []string) bool {
func hasLeadersChanged(old, new []appsv1beta2.Leader) bool {
if len(old) != len(new) {
return true
}

oldSet := make(map[string]struct{}, len(old))
oldSet := make(map[appsv1beta2.Leader]struct{}, len(old))

for i := range old {
oldSet[old[i]] = struct{}{}
Expand All @@ -270,9 +273,14 @@ func hasLeadersChanged(old, new []string) bool {
func electNLeaders(
strategy string,
numLeaders int,
candidates map[string]*corev1.Node,
) ([]string, bool) {
leaderEndpoints := make([]string, 0, len(candidates))
candidates map[appsv1beta2.Leader]*corev1.Node,
) ([]appsv1beta2.Leader, bool) {
// No candidates to elect leaders from
if len(candidates) == 0 {
return nil, true
}

leaderEndpoints := make([]appsv1beta2.Leader, 0, numLeaders)

switch strategy {
case string(appsv1beta2.ElectionStrategyMark), string(appsv1beta2.ElectionStrategyRandom):
Expand Down
106 changes: 95 additions & 11 deletions pkg/yurtmanager/controller/hubleader/hubleader_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package hubleader

import (
"cmp"
"context"
"slices"
"testing"
Expand Down Expand Up @@ -266,7 +267,12 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.1"},
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP",
Address: "10.0.0.1",
},
},
},
},
expectErr: false,
Expand Down Expand Up @@ -306,7 +312,16 @@ func TestReconcile(t *testing.T) {
LeaderReplicas: 2,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.5"},
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Address: "10.0.0.5",
},
},
},
},
expectErr: false,
Expand Down Expand Up @@ -465,7 +480,12 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2"}, // leader already set
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2", // leader already set
},
},
},
},
expectedNodePool: &appsv1beta2.NodePool{
Expand All @@ -485,7 +505,12 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2"}, // should not change leader as replicas met
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2", // should not change leader as replicas met
},
},
},
},
expectErr: false,
Expand All @@ -508,7 +533,16 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.4"}, // .4 was leader (node not ready)
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2",
},
{
NodeName: "not ready with internal IP marked as leader",
Address: "10.0.0.4", // .4 was leader (node not ready)
},
},
},
},
expectedNodePool: &appsv1beta2.NodePool{
Expand All @@ -528,7 +562,16 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.5"}, // new leader is .5
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Address: "10.0.0.5", // new leader is .5
},
},
},
},
expectErr: false,
Expand Down Expand Up @@ -568,7 +611,16 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.5"}, // multiple marked leaders
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Address: "10.0.0.5",
}, // multiple marked leaders
},
},
},
expectErr: false,
Expand Down Expand Up @@ -602,7 +654,20 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.3", "10.0.0.5"}, // multiple marked leaders
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2",
},
{
NodeName: "ready with internal IP and not marked as leader",
Address: "10.0.0.3",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Address: "10.0.0.5",
}, // multiple marked leaders,
},
},
},
expectErr: false,
Expand All @@ -625,7 +690,16 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2", "10.0.0.5"}, // 2 leaders set, last should be dropped
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2",
},
{
NodeName: "ready with internal IP and marked as 2nd leader",
Address: "10.0.0.5",
}, // 2 leaders set, last should be dropped
},
},
},
expectedNodePool: &appsv1beta2.NodePool{
Expand All @@ -645,7 +719,12 @@ func TestReconcile(t *testing.T) {
InterConnectivity: true,
},
Status: appsv1beta2.NodePoolStatus{
LeaderEndpoints: []string{"10.0.0.2"},
LeaderEndpoints: []appsv1beta2.Leader{
{
NodeName: "ready with internal IP and marked as leader",
Address: "10.0.0.2",
},
},
},
},
expectErr: false,
Expand Down Expand Up @@ -682,7 +761,12 @@ func TestReconcile(t *testing.T) {
// Reset resource version - it's not important for the test
actualPool.ResourceVersion = ""
// Sort leader endpoints for comparison - it is not important for the order
slices.Sort(actualPool.Status.LeaderEndpoints)
slices.SortStableFunc(actualPool.Status.LeaderEndpoints, func(a, b appsv1beta2.Leader) int {
return cmp.Compare(
a.Address,
b.Address,
)
})

require.Equal(t, *tc.expectedNodePool, actualPool)
})
Expand Down

0 comments on commit ffb736a

Please sign in to comment.