From 19fe833286fa55665a4a3552ee76014821f0a878 Mon Sep 17 00:00:00 2001 From: Elijah Roussos Date: Sun, 16 Mar 2025 17:40:13 -0400 Subject: [PATCH] feat: scale buffer --- pkg/apis/autoscaling/annotation_validation.go | 23 +++++++++++- pkg/apis/autoscaling/register.go | 11 ++++++ pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go | 5 +++ .../autoscaling/v1alpha1/pa_lifecycle_test.go | 36 +++++++++++++++++++ pkg/autoscaler/scaling/autoscaler.go | 2 ++ pkg/autoscaler/scaling/autoscaler_test.go | 22 ++++++++++++ pkg/autoscaler/scaling/multiscaler.go | 6 ++++ pkg/http/request_log.go | 1 + .../autoscaling/kpa/resources/decider.go | 6 ++++ .../autoscaling/kpa/resources/decider_test.go | 17 +++++++++ 10 files changed, 128 insertions(+), 1 deletion(-) diff --git a/pkg/apis/autoscaling/annotation_validation.go b/pkg/apis/autoscaling/annotation_validation.go index 54d5f3c113a4..e6ccd83fb440 100644 --- a/pkg/apis/autoscaling/annotation_validation.go +++ b/pkg/apis/autoscaling/annotation_validation.go @@ -60,7 +60,8 @@ func ValidateAnnotations(ctx context.Context, config *autoscalerconfig.Config, a Also(validateScaleDownDelay(anns)). Also(validateMetric(config, anns)). Also(validateAlgorithm(anns)). - Also(validateInitialScale(config, anns)) + Also(validateInitialScale(config, anns)). + Also(validateScaleBuffer(anns)) } func validateClass(m map[string]string) *apis.FieldError { @@ -275,3 +276,23 @@ func validateInitialScale(config *autoscalerconfig.Config, m map[string]string) } return nil } + +func validateScaleBuffer(m map[string]string) *apis.FieldError { + max, errs := getIntGE0(m, MaxScaleAnnotation) + scaleBuffer, err := getIntGE0(m, ScaleBufferAnnotation) + errs = errs.Also(err) + + if scaleBuffer > max { + errs = errs.Also(&apis.FieldError{ + Message: fmt.Sprintf("scale-buffer=%d is greater than max-scale=%d", scaleBuffer, max), + Paths: []string{ScaleBufferAnnotationKey, MaxScaleAnnotationKey}, + }) + } else if scaleBuffer < 0 { + errs = errs.Also(&apis.FieldError{ + Message: fmt.Sprintf("scale-buffer=%d is less than 0", scaleBuffer), + Paths: []string{ScaleBufferAnnotationKey}, + }) + } + + return errs +} diff --git a/pkg/apis/autoscaling/register.go b/pkg/apis/autoscaling/register.go index 8ed7042c31d3..8a6b9dfb39c2 100644 --- a/pkg/apis/autoscaling/register.go +++ b/pkg/apis/autoscaling/register.go @@ -221,6 +221,13 @@ const ( // min-scale value while also preserving the ability to scale to zero. // ActivationScale must be >= 2. ActivationScaleKey = GroupName + "/activation-scale" + + // ScaleBuffer is the number of replicas that should be added to the desired scale + // to provide a static buffer of replicas to handle sudden spikes in traffic. + // This is useful for services that have a high startup time or for services that + // have a high variance in traffic. For example, if ScaleBuffer = 2, the desired + // scale would be the desired scale + 2. + ScaleBufferAnnotationKey = GroupName + "/scale-buffer" ) var ( @@ -280,4 +287,8 @@ var ( WindowAnnotation = kmap.KeyPriority{ WindowAnnotationKey, } + ScaleBufferAnnotation = kmap.KeyPriority{ + ScaleBufferAnnotationKey, + GroupName + "/scaleBuffer", + } ) diff --git a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go index 9cff0ab9f547..a661e98032c7 100644 --- a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go +++ b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go @@ -128,6 +128,11 @@ func (pa *PodAutoscaler) TargetBC() (float64, bool) { return pa.annotationFloat64(autoscaling.TargetBurstCapacityAnnotation) } +// ScaleBuffer returns the contents of the scale-buffer annotation or a default. +func (pa *PodAutoscaler) ScaleBuffer() (int32, bool) { + return pa.annotationInt32(autoscaling.ScaleBufferAnnotation) +} + func (pa *PodAutoscaler) annotationDuration(k kmap.KeyPriority) (time.Duration, bool) { if _, s, ok := k.Get(pa.Annotations); ok { d, err := time.ParseDuration(s) diff --git a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go index e2e23399c2da..60cb69403f9f 100644 --- a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go +++ b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go @@ -1024,6 +1024,42 @@ func TestActivationScaleAnnotation(t *testing.T) { }) } } +func TestScaleBufferAnnotation(t *testing.T) { + cases := []struct { + name string + annotations map[string]string + wantValue int32 + wantOK bool + }{{ + name: "not present", + annotations: map[string]string{}, + wantValue: 0, + wantOK: false, + }, { + name: "present", + annotations: map[string]string{autoscaling.ScaleBufferAnnotationKey: "5"}, + wantValue: 5, + wantOK: true, + }, { + name: "invalid", + annotations: map[string]string{autoscaling.ScaleBufferAnnotationKey: "5s"}, + wantValue: 0, + wantOK: false, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + autoscaler := pa(tc.annotations) + gotValue, gotOK := autoscaler.ScaleBuffer() + if gotValue != tc.wantValue { + t.Errorf("got = %v, want: %v", gotValue, tc.wantValue) + } + if gotOK != tc.wantOK { + t.Errorf("OK = %v, want: %v", gotOK, tc.wantOK) + } + }) + } +} func pa(annotations map[string]string) *PodAutoscaler { return &PodAutoscaler{ diff --git a/pkg/autoscaler/scaling/autoscaler.go b/pkg/autoscaler/scaling/autoscaler.go index 0986c415e687..5c9014912061 100644 --- a/pkg/autoscaler/scaling/autoscaler.go +++ b/pkg/autoscaler/scaling/autoscaler.go @@ -197,6 +197,8 @@ func (a *autoscaler) Scale(logger *zap.SugaredLogger, now time.Time) ScaleResult // We want to keep desired pod count in the [maxScaleDown, maxScaleUp] range. desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp)) desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp)) + desiredStablePodCount += spec.ScaleBuffer + desiredPanicPodCount += spec.ScaleBuffer // If ActivationScale > 1, then adjust the desired pod counts if a.deciderSpec.ActivationScale > 1 { diff --git a/pkg/autoscaler/scaling/autoscaler_test.go b/pkg/autoscaler/scaling/autoscaler_test.go index 63e6e6f67142..38139d95e45b 100644 --- a/pkg/autoscaler/scaling/autoscaler_test.go +++ b/pkg/autoscaler/scaling/autoscaler_test.go @@ -76,6 +76,7 @@ func TestAutoscalerScaleDownDelay(t *testing.T) { MaxScaleUpRate: 10, PanicThreshold: 100, ScaleDownDelay: 5 * time.Minute, + ScaleBuffer: 0, } as := New(context.Background(), testNamespace, testRevision, metrics, pc, spec) @@ -137,6 +138,7 @@ func TestAutoscalerScaleDownDelayZero(t *testing.T) { MaxScaleUpRate: 10, PanicThreshold: 100, ScaleDownDelay: 0, + ScaleBuffer: 0, } as := New(context.Background(), testNamespace, testRevision, metrics, pc, spec) @@ -545,6 +547,26 @@ func TestAutoscalerUseOnePodAsMinimumIfEndpointsNotFound(t *testing.T) { expectScale(t, a, time.Now(), ScaleResult{10, expectedEBC(10, 81, 888, 0), true}) } +func TestAutoscalerScaleWithBuffer(t *testing.T) { + metrics := &metricClient{StableConcurrency: 100, PanicConcurrency: 100} + a, pc := newTestAutoscaler(10, 77, metrics) + expectScale(t, a, time.Now(), ScaleResult{10, expectedEBC(10, 77, 100, 1), true}) + + pc.readyCount = 10 + a.Update(&DeciderSpec{ + TargetValue: 1, + TotalValue: 1 / targetUtilization, + ActivatorCapacity: 21, + TargetBurstCapacity: 71, + PanicThreshold: 2, + MaxScaleDownRate: 10, + MaxScaleUpRate: 10, + StableWindow: stableWindow, + ScaleBuffer: 10, + }) + expectScale(t, a, time.Now(), ScaleResult{100, expectedEBC(1, 71, 100, 10), true}) +} + func TestAutoscalerUpdateTarget(t *testing.T) { metrics := &metricClient{StableConcurrency: 100, PanicConcurrency: 101} a, pc := newTestAutoscaler(10, 77, metrics) diff --git a/pkg/autoscaler/scaling/multiscaler.go b/pkg/autoscaler/scaling/multiscaler.go index d1f59b511bc2..acd6743ad73a 100644 --- a/pkg/autoscaler/scaling/multiscaler.go +++ b/pkg/autoscaler/scaling/multiscaler.go @@ -82,6 +82,12 @@ type DeciderSpec struct { // min-scale value while also preserving the ability to scale to zero. // ActivationScale must be >= 2. ActivationScale int32 + // ScaleBuffer is the number of replicas that should be added to the desired scale + // to provide a static buffer of replicas to handle sudden spikes in traffic. + // This is useful for services that have a high startup time or for services that + // have a high variance in traffic. For example, if ScaleBuffer = 2, the desired + // scale would be the desired scale + 2. + ScaleBuffer int32 } // DeciderStatus is the current scale recommendation. diff --git a/pkg/http/request_log.go b/pkg/http/request_log.go index 1803bb7e6e96..c7c95529a4f3 100644 --- a/pkg/http/request_log.go +++ b/pkg/http/request_log.go @@ -146,6 +146,7 @@ func (h *RequestLogHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { err := recover() latency := time.Since(startTime).Seconds() if err != nil { + fmt.Printf("error: %e", err) h.write(t, h.inputGetter(r, &RequestLogResponse{ Code: http.StatusInternalServerError, Latency: latency, diff --git a/pkg/reconciler/autoscaling/kpa/resources/decider.go b/pkg/reconciler/autoscaling/kpa/resources/decider.go index 22669519d173..b8d6c1f40ba7 100644 --- a/pkg/reconciler/autoscaling/kpa/resources/decider.go +++ b/pkg/reconciler/autoscaling/kpa/resources/decider.go @@ -71,6 +71,11 @@ func MakeDecider(pa *autoscalingv1alpha1.PodAutoscaler, config *autoscalerconfig activationScale = mnzr } + var scaleBuffer int32 + if sb, ok := pa.ScaleBuffer(); ok { + scaleBuffer = sb + } + return &scaling.Decider{ ObjectMeta: *pa.ObjectMeta.DeepCopy(), Spec: scaling.DeciderSpec{ @@ -87,6 +92,7 @@ func MakeDecider(pa *autoscalingv1alpha1.PodAutoscaler, config *autoscalerconfig InitialScale: GetInitialScale(config, pa), Reachable: pa.Spec.Reachability != autoscalingv1alpha1.ReachabilityUnreachable, ActivationScale: activationScale, + ScaleBuffer: scaleBuffer, }, } } diff --git a/pkg/reconciler/autoscaling/kpa/resources/decider_test.go b/pkg/reconciler/autoscaling/kpa/resources/decider_test.go index 264247c0278f..6e50c6b666bc 100644 --- a/pkg/reconciler/autoscaling/kpa/resources/decider_test.go +++ b/pkg/reconciler/autoscaling/kpa/resources/decider_test.go @@ -171,6 +171,16 @@ func TestMakeDecider(t *testing.T) { d.Spec.ActivationScale = 3 d.Annotations[autoscaling.ActivationScaleKey] = "3" }), + }, { + name: "with scale-buffer annotation", + pa: pa(func(pa *autoscalingv1alpha1.PodAutoscaler) { + pa.Annotations[autoscaling.ScaleBufferAnnotationKey] = "3" + }), + want: decider(withTarget(100.0), withPanicThreshold(2.0), withTotal(100), withScaleBufferAnnotation("3"), + func(d *scaling.Decider) { + d.Spec.ScaleBuffer = 3 + d.Annotations[autoscaling.ScaleBufferAnnotationKey] = "3" + }), }} for _, tc := range cases { @@ -307,6 +317,7 @@ func decider(options ...deciderOption) *scaling.Decider { StableWindow: config.StableWindow, InitialScale: 1, Reachable: true, + ScaleBuffer: 0, }, } for _, fn := range options { @@ -382,6 +393,12 @@ func withPanicThresholdPercentageAnnotation(percentage string) deciderOption { } } +func withScaleBufferAnnotation(scaleBuffer string) deciderOption { + return func(decider *scaling.Decider) { + decider.Annotations[autoscaling.ScaleBufferAnnotationKey] = scaleBuffer + } +} + var config = &autoscalerconfig.Config{ EnableScaleToZero: true, ContainerConcurrencyTargetFraction: 1.0,