diff --git a/controllers/cloud.go b/controllers/cloud.go index dc19f72d..1c2bb4ce 100644 --- a/controllers/cloud.go +++ b/controllers/cloud.go @@ -29,6 +29,7 @@ import ( var ( instanceStateTagKey = "upgrademgr.keikoproj.io/state" inProgressTagValue = "in-progress" + failedDrainTagValue = "failed-drain" ) type DiscoveredState struct { diff --git a/controllers/upgrade.go b/controllers/upgrade.go index e2660fcb..4047a0fd 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -68,6 +68,11 @@ type RollingUpgradeContext struct { } func (r *RollingUpgradeContext) RotateNodes() error { + failedDrainInstances, err := r.Auth.DescribeTaggedInstanceIDs(instanceStateTagKey, failedDrainTagValue) + if err != nil { + r.Error(err, "failed to discover ec2 instances with drain-failed tag", "name", r.RollingUpgrade.NamespacedName()) + } + // set status to running r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusRunning) r.RollingUpgrade.SetLabel(v1alpha1.LabelKeyRollingUpgradeCurrentStatus, v1alpha1.StatusRunning) @@ -112,7 +117,14 @@ func (r *RollingUpgradeContext) RotateNodes() error { return nil } - rotationTargets := r.SelectTargets(scalingGroup) + rotationTargets := r.SelectTargets(scalingGroup, failedDrainInstances) + + if len(rotationTargets) == 0 && len(failedDrainInstances) > 0 { + // If there are failed instances, but no rotation targets, then select failed instances anyway + r.Info("selecting from failed instances since there are no rotation targets", "failedDrainInstances", failedDrainInstances, "name", r.RollingUpgrade.NamespacedName()) + rotationTargets = r.SelectTargets(scalingGroup, []string{}) + } + if ok, err := r.ReplaceNodeBatch(rotationTargets); !ok { return err } @@ -308,6 +320,9 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) if err := r.Auth.DrainNode(node, time.Duration(r.RollingUpgrade.PostDrainDelaySeconds()), drainTimeout, r.Auth.Kubernetes); err != nil { // ignore drain failures if either of spec or controller args have set ignoreDrainFailures to true. if !ignoreDrainFailures { + if err := r.Auth.TagEC2instances([]string{instanceID}, instanceStateTagKey, failedDrainTagValue); err != nil { + r.Error(err, "failed to set instances to drain-failed", "batch", instanceID, "name", r.RollingUpgrade.NamespacedName()) + } r.DrainManager.DrainErrors <- errors.Errorf("DrainNode failed: instanceID - %v, %v", instanceID, err.Error()) return } @@ -418,7 +433,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance) return true, nil } -func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) []*autoscaling.Instance { +func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group, excludedInstances []string) []*autoscaling.Instance { var ( batchSize = r.RollingUpgrade.MaxUnavailable() totalNodes = int(aws.Int64Value(scalingGroup.DesiredCapacity)) @@ -428,6 +443,9 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) [ // first process all in progress instances r.Info("selecting batch for rotation", "batch size", unavailableInt, "name", r.RollingUpgrade.NamespacedName()) + if len(excludedInstances) > 0 { + r.Info("ignoring failed drain instances", "instances", excludedInstances, "name", r.RollingUpgrade.NamespacedName()) + } for _, instance := range r.Cloud.InProgressInstances { if selectedInstance := awsprovider.SelectScalingGroupInstance(instance, scalingGroup); !reflect.DeepEqual(selectedInstance, &autoscaling.Instance{}) { //In-progress instances shouldn't be considered if they are in terminating state. @@ -444,7 +462,7 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) [ // select via strategy if there are no in-progress instances if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.RandomUpdateStrategy { for _, instance := range scalingGroup.Instances { - if r.IsInstanceDrifted(instance) && !common.ContainsEqualFold(awsprovider.GetInstanceIDs(targets), aws.StringValue(instance.InstanceId)) { + if r.IsInstanceDrifted(instance) && !common.ContainsEqualFold(awsprovider.GetInstanceIDs(targets), aws.StringValue(instance.InstanceId)) && !common.ContainsEqualFold(excludedInstances, aws.StringValue(instance.InstanceId)) { targets = append(targets, instance) } } @@ -455,7 +473,7 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) [ } else if r.RollingUpgrade.UpdateStrategyType() == v1alpha1.UniformAcrossAzUpdateStrategy { for _, instance := range scalingGroup.Instances { - if r.IsInstanceDrifted(instance) && !common.ContainsEqualFold(awsprovider.GetInstanceIDs(targets), aws.StringValue(instance.InstanceId)) { + if r.IsInstanceDrifted(instance) && !common.ContainsEqualFold(awsprovider.GetInstanceIDs(targets), aws.StringValue(instance.InstanceId)) && !common.ContainsEqualFold(excludedInstances, aws.StringValue(instance.InstanceId)) { targets = append(targets, instance) } }