Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 113 additions & 11 deletions pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
WorkerLabel = "node-role.kubernetes.io/worker"
// ControlPlaneLabel defines the label associated with master/control-plane node.
ControlPlaneLabel = "node-role.kubernetes.io/control-plane"
// ArbiterLabel defines the label associated with arbiter node.
ArbiterLabel = "node-role.kubernetes.io/arbiter"

// maxRetries is the number of times a machineconfig pool will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
Expand Down Expand Up @@ -1129,6 +1131,11 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error {
pool := machineconfigpool.DeepCopy()
everything := metav1.LabelSelector{}

// If arbiter pool, requeue master pool update and only sync status
if pool.Name == ctrlcommon.MachineConfigPoolArbiter {
return ctrl.handleArbiterPoolEvent(pool)
}

if reflect.DeepEqual(pool.Spec.NodeSelector, &everything) {
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeWarning, "SelectingAll", "This machineconfigpool is selecting all nodes. A non-empty selector is required.")
return nil
Expand Down Expand Up @@ -1182,12 +1189,50 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error {
return err
}

if err := ctrl.setClusterConfigAnnotation(nodes); err != nil {
cc, err := ctrl.ccLister.Get(ctrlcommon.ControllerConfigName)
if err != nil {
return fmt.Errorf("error getting controllerconfig %q, error: %w", ctrlcommon.ControllerConfigName, err)
}
controlPlaneTopology := cc.Spec.Infra.Status.ControlPlaneTopology

// For master pool in HighlyAvailableArbiterMode, coordinate with arbiter pool
var arbiterPool *mcfgv1.MachineConfigPool
var arbiterNodes []*corev1.Node
var arbiterMosc *mcfgv1.MachineOSConfig
var arbiterMosb *mcfgv1.MachineOSBuild
var arbiterLayered bool
// If we are in HighlyAvailableArbiterMode, combine the arbiter nodes to use where we can avoid duplication.
var combinedNodes = append([]*corev1.Node{}, nodes...)
if pool.Name == ctrlcommon.MachineConfigPoolMaster && controlPlaneTopology == configv1.HighlyAvailableArbiterMode {
arbiterObj, err := ctrl.mcpLister.Get(ctrlcommon.MachineConfigPoolArbiter)
if err != nil {
return fmt.Errorf("error getting arbiter pool %q, error: %w", ctrlcommon.MachineConfigPoolArbiter, err)
}
if arbiterObj.Spec.Configuration.Name != "" && arbiterObj.DeletionTimestamp == nil && !arbiterObj.Spec.Paused {
arbiterPool = arbiterObj.DeepCopy()
arbiterNodes, err = ctrl.getNodesForPool(arbiterPool)
if err != nil {
return fmt.Errorf("error getting nodes for arbiter pool %q, error: %w", ctrlcommon.MachineConfigPoolArbiter, err)
}
arbiterMosc, arbiterMosb, arbiterLayered, err = ctrl.getConfigAndBuildAndLayeredStatus(arbiterPool)
if err != nil {
return fmt.Errorf("error getting config and build for arbiter pool %q, error: %w", ctrlcommon.MachineConfigPoolArbiter, err)
}
combinedNodes = append(combinedNodes, arbiterNodes...)
combinedMax, err := maxUnavailable(pool, combinedNodes)
if err != nil {
return fmt.Errorf("error getting max unavailable count for pool %q, error: %w", pool.Name, err)
}
maxunavail = combinedMax
}
}

if err := ctrl.setClusterConfigAnnotation(combinedNodes, controlPlaneTopology); err != nil {
return fmt.Errorf("error setting clusterConfig Annotation for node in pool %q, error: %w", pool.Name, err)
}
// Taint all the nodes in the node pool, irrespective of their upgrade status.
ctx := context.TODO()
for _, node := range nodes {
for _, node := range combinedNodes {
// All the nodes that need to be upgraded should have `NodeUpdateInProgressTaint` so that they're less likely
// to be chosen during the scheduling cycle. This includes nodes which are:
// (i) In a Pool being updated to a new MC or image
Expand All @@ -1196,7 +1241,18 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error {

lns := ctrlcommon.NewLayeredNodeState(node)

if (!layered && lns.IsDesiredMachineConfigEqualToPool(pool) && !lns.AreImageAnnotationsPresentOnNode()) || (layered && lns.IsDesiredEqualToBuild(mosc, mosb)) {
desiredPool := pool
desiredMosc := mosc
desiredMosb := mosb
desiredLayered := layered
if _, ok := node.GetLabels()[ArbiterLabel]; ok && controlPlaneTopology == configv1.HighlyAvailableArbiterMode {
desiredPool = arbiterPool
desiredMosc = arbiterMosc
desiredMosb = arbiterMosb
desiredLayered = arbiterLayered
}

if (!desiredLayered && lns.IsDesiredMachineConfigEqualToPool(desiredPool) && !lns.AreImageAnnotationsPresentOnNode()) || (layered && lns.IsDesiredEqualToBuild(desiredMosc, desiredMosb)) {
if hasInProgressTaint {
if err := ctrl.removeUpdateInProgressTaint(ctx, node.Name); err != nil {
err = fmt.Errorf("failed removing %s taint for node %s: %w", constants.NodeUpdateInProgressTaint.Key, node.Name, err)
Expand All @@ -1212,7 +1268,31 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error {
}
}
}

candidates, capacity := getAllCandidateMachines(layered, mosc, mosb, pool, nodes, maxunavail)
if err := ctrl.updateCandidates(layered, mosc, mosb, pool, candidates, capacity); err != nil {
return err
}

// If coordinating with arbiter pool, also handle arbiter node updates
if arbiterPool != nil && len(arbiterNodes) > 0 && pool.Name == ctrlcommon.MachineConfigPoolMaster && controlPlaneTopology == configv1.HighlyAvailableArbiterMode {
arbiterUnavailable := len(getUnavailableMachines(arbiterNodes, arbiterPool))
//#nosec G115 -- there is no overflow, it's an int converted to uint and then added to another int
arbiterMaxUnavail := int(capacity) - len(candidates) + arbiterUnavailable
arbiterCandidates, arbiterCapacity := getAllCandidateMachines(arbiterLayered, arbiterMosc, arbiterMosb, arbiterPool, arbiterNodes, arbiterMaxUnavail)
if err := ctrl.updateCandidates(arbiterLayered, arbiterMosc, arbiterMosb, arbiterPool, arbiterCandidates, arbiterCapacity); err != nil {
return err
}
// Sync status for arbiter pool
if err := ctrl.syncStatusOnly(arbiterPool); err != nil {
return err
}
}

return ctrl.syncStatusOnly(pool)
}

func (ctrl *Controller) updateCandidates(layered bool, mosc *mcfgv1.MachineOSConfig, mosb *mcfgv1.MachineOSBuild, pool *mcfgv1.MachineConfigPool, candidates []*corev1.Node, capacity uint) error {
if len(candidates) > 0 {
zones := make(map[string]bool)
for _, candidate := range candidates {
Expand All @@ -1231,6 +1311,33 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error {
}
ctrlcommon.UpdateStateMetric(ctrlcommon.MCCSubControllerState, "machine-config-controller-node", "Sync Machine Config Pool", pool.Name)
}
return nil
}

func (ctrl *Controller) handleArbiterPoolEvent(pool *mcfgv1.MachineConfigPool) error {
masterPool, err := ctrl.mcpLister.Get(ctrlcommon.MachineConfigPoolMaster)
if err == nil {
ctrl.enqueue(masterPool)
} else if !errors.IsNotFound(err) {
return err
}
// Still sync status for arbiter pool
if pool.DeletionTimestamp != nil || pool.Spec.Paused {
return ctrl.syncStatusOnly(pool)
}
mosc, mosb, layered, err := ctrl.getConfigAndBuildAndLayeredStatus(pool)
if err != nil {
return fmt.Errorf("could not get config and build: %w", err)
}
if layered {
_, canApplyUpdates, err := ctrl.canLayeredContinue(mosc, mosb)
if err != nil {
return err
}
if !canApplyUpdates {
return ctrl.syncStatusOnly(pool)
}
}
return ctrl.syncStatusOnly(pool)
}

Expand Down Expand Up @@ -1276,17 +1383,12 @@ func (ctrl *Controller) getNodesForPool(pool *mcfgv1.MachineConfigPool) ([]*core
// setClusterConfigAnnotation reads cluster configs set into controllerConfig
// and add/updates required annotation to node such as ControlPlaneTopology
// from infrastructure object.
func (ctrl *Controller) setClusterConfigAnnotation(nodes []*corev1.Node) error {
cc, err := ctrl.ccLister.Get(ctrlcommon.ControllerConfigName)
if err != nil {
return err
}

func (ctrl *Controller) setClusterConfigAnnotation(nodes []*corev1.Node, controlPlaneTopology configv1.TopologyMode) error {
for _, node := range nodes {
if node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] != string(cc.Spec.Infra.Status.ControlPlaneTopology) {
if node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] != string(controlPlaneTopology) {
oldAnn := node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey]
_, err := internal.UpdateNodeRetry(ctrl.kubeClient.CoreV1().Nodes(), ctrl.nodeLister, node.Name, func(node *corev1.Node) {
node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] = string(cc.Spec.Infra.Status.ControlPlaneTopology)
node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] = string(controlPlaneTopology)
})
if err != nil {
return err
Expand Down
71 changes: 71 additions & 0 deletions pkg/controller/node/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,3 +1772,74 @@ func filterLastTransitionTime(obj runtime.Object) runtime.Object {
}
return o
}

func TestArbiterPoolCoordination(t *testing.T) {
t.Parallel()
f := newFixture(t)

// Create controller config with HighlyAvailableArbiterMode
cc := newControllerConfig(ctrlcommon.ControllerConfigName, configv1.HighlyAvailableArbiterMode)
f.ccLister = append(f.ccLister, cc)
f.objects = append(f.objects, cc)

// Create master pool with new config
masterPool := helpers.NewMachineConfigPool(ctrlcommon.MachineConfigPoolMaster, nil, helpers.MasterSelector, machineConfigV1)
masterPool.Spec.Configuration.Name = machineConfigV2
f.mcpLister = append(f.mcpLister, masterPool)
f.objects = append(f.objects, masterPool)

// Create arbiter pool with new config
arbiterSelector := metav1.AddLabelToSelector(&metav1.LabelSelector{}, "node-role.kubernetes.io/arbiter", "")
arbiterPool := helpers.NewMachineConfigPool(ctrlcommon.MachineConfigPoolArbiter, nil, arbiterSelector, machineConfigV1)
arbiterPool.Spec.Configuration.Name = machineConfigV2
f.mcpLister = append(f.mcpLister, arbiterPool)
f.objects = append(f.objects, arbiterPool)

// Create master node with correct label format
masterNode := helpers.NewNodeWithReady("master-node-0", machineConfigV1, machineConfigV1, corev1.ConditionTrue)
masterNode.Labels = map[string]string{
"node-role/master": "",
}
f.nodeLister = append(f.nodeLister, masterNode)
f.kubeobjects = append(f.kubeobjects, masterNode)

// Create arbiter node
arbiterNode := helpers.NewNodeWithReady("arbiter-node-0", machineConfigV1, machineConfigV1, corev1.ConditionTrue)
arbiterNode.Labels = map[string]string{
"node-role.kubernetes.io/arbiter": "",
}
f.nodeLister = append(f.nodeLister, arbiterNode)
f.kubeobjects = append(f.kubeobjects, arbiterNode)

// Test: When master pool syncs in arbiter mode, it should coordinate both pools
// Expect status updates for both pools (arbiter first, then master)
f.expectUpdateMachineConfigPoolStatus(arbiterPool)
f.expectUpdateMachineConfigPoolStatus(masterPool)

// Sync master pool - this should coordinate both pools
c := f.newController()
err := c.syncHandler(ctrlcommon.MachineConfigPoolMaster)
require.NoError(t, err)

// Verify that both pools had their status updated
actions := filterInformerActions(f.client.Actions())
statusUpdates := 0
for _, action := range actions {
if action.Matches("update", "machineconfigpools") && action.GetSubresource() == "status" {
statusUpdates++
}
}
// Should have status updates for both master and arbiter pools
assert.GreaterOrEqual(t, statusUpdates, 2, "Expected at least 2 status updates (master and arbiter pools)")

// Verify that both nodes were patched (for desired config)
k8sActions := filterInformerActions(f.kubeclient.Actions())
nodePatches := 0
for _, action := range k8sActions {
if action.Matches("patch", "nodes") {
nodePatches++
}
}
// Should have patches for both master and arbiter nodes
assert.GreaterOrEqual(t, nodePatches, 2, "Expected at least 2 node patches (master and arbiter nodes)")
}