Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions pkg/autoscaler/autoscaler/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ type Optimizer struct {
Generations
}

// OptimizeResult holds the optimization computation results.
// History should only be committed after successful API update.
type OptimizeResult struct {
RecommendedInstances int32
CorrectedInstances int32
ReplicasMap map[string]int32
Skip bool
}

type OptimizerMeta struct {
Config *workload.HeterogeneousTarget
MetricTargets map[string]float64
Expand Down Expand Up @@ -148,7 +157,10 @@ func (optimizer *Optimizer) NeedUpdate(policy *workload.AutoscalingPolicy, bindi
optimizer.Generations.BindingGeneration != binding.Generation
}

func (optimizer *Optimizer) Optimize(ctx context.Context, podLister listerv1.PodLister, autoscalePolicy *workload.AutoscalingPolicy, currentInstancesCounts map[string]int32) (map[string]int32, error) {
// Optimize computes the recommended replica distribution without updating history.
// The caller MUST call CommitOptimizeResult after successful API updates to record the scaling event.
// This prevents phantom scale events from being recorded when API updates fail.
func (optimizer *Optimizer) Optimize(ctx context.Context, podLister listerv1.PodLister, autoscalePolicy *workload.AutoscalingPolicy, currentInstancesCounts map[string]int32) (*OptimizeResult, error) {
size := len(optimizer.Meta.Config.Params)
unreadyInstancesCount := int32(0)
readyInstancesMetrics := make([]algorithm.Metrics, 0, size)
Expand Down Expand Up @@ -184,9 +196,10 @@ func (optimizer *Optimizer) Optimize(ctx context.Context, podLister listerv1.Pod
recommendedInstances, skip := instancesAlgorithm.GetRecommendedInstances()
if skip {
klog.Warning("skip recommended instances")
return nil, nil
return &OptimizeResult{Skip: true}, nil
}
if recommendedInstances*100 >= instancesCountSum*(*autoscalePolicy.Spec.Behavior.ScaleUp.PanicPolicy.PanicThresholdPercent) {
if autoscalePolicy.Spec.Behavior.ScaleUp.PanicPolicy.PanicThresholdPercent != nil &&
int64(recommendedInstances)*100 >= int64(instancesCountSum)*int64(*autoscalePolicy.Spec.Behavior.ScaleUp.PanicPolicy.PanicThresholdPercent) {
optimizer.Status.RefreshPanicMode()
}
CorrectedInstancesAlgorithm := algorithm.CorrectedInstancesAlgorithm{
Expand All @@ -197,12 +210,25 @@ func (optimizer *Optimizer) Optimize(ctx context.Context, podLister listerv1.Pod
MaxInstances: optimizer.Meta.MaxReplicas,
CurrentInstances: instancesCountSum,
RecommendedInstances: recommendedInstances}
recommendedInstances = CorrectedInstancesAlgorithm.GetCorrectedInstances()
correctedInstances := CorrectedInstancesAlgorithm.GetCorrectedInstances()

klog.InfoS("autoscale controller", "recommendedInstances", recommendedInstances, "correctedInstances", recommendedInstances)
optimizer.Status.AppendRecommendation(recommendedInstances)
optimizer.Status.AppendCorrected(recommendedInstances)
klog.InfoS("autoscale controller", "recommendedInstances", recommendedInstances, "correctedInstances", correctedInstances)

replicasMap := optimizer.Meta.RestoreReplicasOfEachBackend(recommendedInstances)
return replicasMap, nil
replicasMap := optimizer.Meta.RestoreReplicasOfEachBackend(correctedInstances)
return &OptimizeResult{
RecommendedInstances: recommendedInstances,
CorrectedInstances: correctedInstances,
ReplicasMap: replicasMap,
Skip: false,
}, nil
}

// CommitOptimizeResult records the optimization event in history.
// This MUST only be called after all API updates succeed to prevent phantom scale events.
func (optimizer *Optimizer) CommitOptimizeResult(result *OptimizeResult) {
if result == nil || result.Skip {
return
}
optimizer.Status.AppendRecommendation(result.RecommendedInstances)
optimizer.Status.AppendCorrected(result.CorrectedInstances)
}
38 changes: 31 additions & 7 deletions pkg/autoscaler/autoscaler/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ type Autoscaler struct {
Meta *ScalingMeta
}

// ScaleResult holds the scaling computation results.
// History should only be committed after successful API update.
type ScaleResult struct {
RecommendedInstances int32
CorrectedInstances int32
Skip bool
}

type ScalingMeta struct {
Config *workload.HomogeneousTarget
Namespace string
Expand Down Expand Up @@ -64,11 +72,14 @@ func (autoscaler *Autoscaler) UpdateAutoscalePolicy(autoscalePolicy *workload.Au
autoscaler.Meta.Generations.AutoscalePolicyGeneration = autoscalePolicy.Generation
}

func (autoscaler *Autoscaler) Scale(ctx context.Context, podLister listerv1.PodLister, autoscalePolicy *workload.AutoscalingPolicy, currentInstancesCount int32) (int32, error) {
// Scale computes the recommended and corrected instance counts without updating history.
// The caller MUST call CommitScaleResult after successful API update to record the scaling event.
// This prevents phantom scale events from being recorded when API updates fail.
func (autoscaler *Autoscaler) Scale(ctx context.Context, podLister listerv1.PodLister, autoscalePolicy *workload.AutoscalingPolicy, currentInstancesCount int32) (*ScaleResult, error) {
unreadyInstancesCount, readyInstancesMetrics, err := autoscaler.Collector.UpdateMetrics(ctx, podLister)
if err != nil {
klog.Errorf("update metrics error: %v", err)
return -1, err
return nil, err
}
// minInstance <- AutoscaleScope, currentInstancesCount(replicas) <- workload
instancesAlgorithm := algorithm.RecommendedInstancesAlgorithm{
Expand All @@ -84,9 +95,9 @@ func (autoscaler *Autoscaler) Scale(ctx context.Context, podLister listerv1.PodL
recommendedInstances, skip := instancesAlgorithm.GetRecommendedInstances()
if skip {
klog.InfoS("skip recommended instances")
return -1, nil
return &ScaleResult{Skip: true}, nil
}
if autoscalePolicy.Spec.Behavior.ScaleUp.PanicPolicy.PanicThresholdPercent != nil && recommendedInstances*100 >= currentInstancesCount*(*autoscalePolicy.Spec.Behavior.ScaleUp.PanicPolicy.PanicThresholdPercent) {
if autoscalePolicy.Spec.Behavior.ScaleUp.PanicPolicy.PanicThresholdPercent != nil && int64(recommendedInstances)*100 >= int64(currentInstancesCount)*int64(*autoscalePolicy.Spec.Behavior.ScaleUp.PanicPolicy.PanicThresholdPercent) {
autoscaler.Status.RefreshPanicMode()
}
CorrectedInstancesAlgorithm := algorithm.CorrectedInstancesAlgorithm{
Expand All @@ -101,7 +112,20 @@ func (autoscaler *Autoscaler) Scale(ctx context.Context, podLister listerv1.PodL
correctedInstances := CorrectedInstancesAlgorithm.GetCorrectedInstances()

klog.InfoS("autoscale controller", "currentInstancesCount", currentInstancesCount, "recommendedInstances", recommendedInstances, "correctedInstances", correctedInstances)
autoscaler.Status.AppendRecommendation(recommendedInstances)
autoscaler.Status.AppendCorrected(correctedInstances)
return correctedInstances, nil

return &ScaleResult{
RecommendedInstances: recommendedInstances,
CorrectedInstances: correctedInstances,
Skip: false,
}, nil
}

// CommitScaleResult records the scaling event in history.
// This MUST only be called after a successful API update to prevent phantom scale events.
func (autoscaler *Autoscaler) CommitScaleResult(result *ScaleResult) {
if result == nil || result.Skip {
return
}
autoscaler.Status.AppendRecommendation(result.RecommendedInstances)
autoscaler.Status.AppendCorrected(result.CorrectedInstances)
}
33 changes: 21 additions & 12 deletions pkg/autoscaler/controller/autoscale_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (ac *AutoscaleController) updateTargetReplicas(ctx context.Context, target
if target.TargetRef.Kind == "" || target.TargetRef.Kind == workload.ModelServingKind.Kind {
instance, err := ac.modelServingLister.ModelServings(namespaceScope).Get(targetRef.Name)
if err != nil {
return err
return fmt.Errorf("failed to get ModelServing %s/%s: %w", namespaceScope, targetRef.Name, err)
}
instance_copy := instance.DeepCopy()

Expand All @@ -202,9 +202,11 @@ func (ac *AutoscaleController) updateTargetReplicas(ctx context.Context, target
}
}
}
if _, err = ac.client.WorkloadV1alpha1().ModelServings(namespaceScope).Update(ctx, instance_copy, metav1.UpdateOptions{}); err == nil {
return nil
_, err = ac.client.WorkloadV1alpha1().ModelServings(namespaceScope).Update(ctx, instance_copy, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update ModelServing %s/%s replicas to %d: %w", namespaceScope, targetRef.Name, replicas, err)
}
return nil
}
return fmt.Errorf("target ref kind %s, name: %s not supported", targetRef.Kind, targetRef.Name)
}
Expand Down Expand Up @@ -279,15 +281,18 @@ func (ac *AutoscaleController) doOptimize(ctx context.Context, binding *workload
replicasMap[param.Target.TargetRef.Name] = currentInstancesCount
}

// Get recommended replicas
recommendedInstances, err := optimizer.Optimize(ctx, ac.podsLister, autoscalePolicy, replicasMap)
// Get recommended replicas (does NOT update history yet)
optimizeResult, err := optimizer.Optimize(ctx, ac.podsLister, autoscalePolicy, replicasMap)
if err != nil {
klog.Errorf("failed to do optimize, err: %v", err)
return err
}
// Do update replicas
if optimizeResult == nil || optimizeResult.Skip {
return nil
}
// Do update replicas for all targets
for _, param := range optimizer.Meta.Config.Params {
instancesCount, exists := recommendedInstances[param.Target.TargetRef.Name]
instancesCount, exists := optimizeResult.ReplicasMap[param.Target.TargetRef.Name]
if !exists {
klog.Warningf("recommended instances not exists, target ref name: %s", param.Target.TargetRef.Name)
continue
Expand All @@ -297,6 +302,8 @@ func (ac *AutoscaleController) doOptimize(ctx context.Context, binding *workload
return err
}
}
// Commit history ONLY after all API updates succeed to prevent phantom scale events
optimizer.CommitOptimizeResult(optimizeResult)

return nil
}
Expand All @@ -316,22 +323,24 @@ func (ac *AutoscaleController) doScale(ctx context.Context, binding *workload.Au
klog.Errorf("failed to get current replicas, err: %v", err)
return err
}
// Get recommended replicas
// Get recommended replicas (does NOT update history yet)
klog.InfoS("do homogeneous scaling for target", "targetRef", target.TargetRef, "currentInstancesCount", currentInstancesCount)
recommendedInstances, err := scaler.Scale(ctx, ac.podsLister, autoscalePolicy, currentInstancesCount)
scaleResult, err := scaler.Scale(ctx, ac.podsLister, autoscalePolicy, currentInstancesCount)
if err != nil {
klog.Errorf("failed to do homogeneous scaling for target %s, err: %v", target.TargetRef.Name, err)
return err
}
if recommendedInstances < 0 {
if scaleResult == nil || scaleResult.Skip {
return nil
}
// Do update replicas
if err := ac.updateTargetReplicas(ctx, &target, recommendedInstances); err != nil {
if err := ac.updateTargetReplicas(ctx, &target, scaleResult.CorrectedInstances); err != nil {
klog.Errorf("failed to update target replicas %s, err: %v", target.TargetRef.Name, err)
return err
}
klog.InfoS("successfully update target replicas", "targetRef", target.TargetRef, "recommendedInstances", recommendedInstances)
// Commit history ONLY after successful API update to prevent phantom scale events
scaler.CommitScaleResult(scaleResult)
klog.InfoS("successfully update target replicas", "targetRef", target.TargetRef, "recommendedInstances", scaleResult.CorrectedInstances)
return nil
}

Expand Down
Loading
Loading