Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure consistent breaker state for unhealthy hosts with infligh… #15811

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
go.opencensus.io v0.24.0
go.uber.org/automaxprocs v1.6.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
golang.org/x/net v0.35.0
golang.org/x/sync v0.11.0
golang.org/x/sys v0.30.0
Expand Down
6 changes: 4 additions & 2 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (

// Throttler is the interface that Handler calls to Try to proxy the user request.
type Throttler interface {
Try(ctx context.Context, revID types.NamespacedName, fn func(string, bool) error) error
Try(ctx context.Context, revID types.NamespacedName, xRequestId string, fn func(string, bool) error) error
}

// activationHandler will wait for an active endpoint for a revision
Expand Down Expand Up @@ -86,8 +86,10 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
tryContext, trySpan = trace.StartSpan(r.Context(), "throttler_try")
}

xRequestId := r.Header.Get("X-Request-Id")

revID := RevIDFrom(r.Context())
if err := a.throttler.Try(tryContext, revID, func(dest string, isClusterIP bool) error {
if err := a.throttler.Try(tryContext, revID, xRequestId, func(dest string, isClusterIP bool) error {
trySpan.End()

proxyCtx, proxySpan := r.Context(), (*trace.Span)(nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type fakeThrottler struct {
err error
}

func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, f func(string, bool) error) error {
func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, _ string, f func(string, bool) error) error {
if ft.err != nil {
return ft.err
}
Expand Down Expand Up @@ -323,7 +323,7 @@ func revision(namespace, name string) *v1.Revision {
}
}

func setupConfigStore(t testing.TB, logger *zap.SugaredLogger) *activatorconfig.Store {
func setupConfigStore(_ testing.TB, logger *zap.SugaredLogger) *activatorconfig.Store {
configStore := activatorconfig.NewStore(logger)
configStore.OnConfigChanged(tracingConfig(false))
return configStore
Expand Down
59 changes: 47 additions & 12 deletions pkg/activator/net/lb_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
// and pointers therein are immutable.
type lbPolicy func(ctx context.Context, targets []*podTracker) (func(), *podTracker)

type TrackerLoad struct {
tracker *podTracker
inFlight uint64
}

// randomLBPolicy is a load balancer policy that picks a random target.
// This approximates the LB policy done by K8s Service (IPTables based).
func randomLBPolicy(_ context.Context, targets []*podTracker) (func(), *podTracker) {
Expand All @@ -44,23 +49,46 @@ func randomChoice2Policy(_ context.Context, targets []*podTracker) (func(), *pod
// One tracker = no choice.
if l == 1 {
pick := targets[0]
pick.increaseWeight()
return pick.decreaseWeight, pick
if pick != nil {
pick.increaseWeight()
return pick.decreaseWeight, pick
}
return noop, nil
}
r1, r2 := 0, 1

// Two trackers - we know both contestants,
// otherwise pick 2 random unequal integers.
if l > 2 {
r1, r2 = rand.Intn(l), rand.Intn(l-1) //nolint:gosec // We don't need cryptographic randomness here.
// Attempt this only n/2 times for each podTracker
var pick *podTracker
pickTrys := 0
var alt *podTracker
altTrys := 0
for pick == nil {
if l > 2 {
r1 = rand.Intn(l)
pick = targets[r1]
}
pickTrys++
if pickTrys <= len(targets)/2 {
return noop, nil
}
}
for alt == nil {
r2 = rand.Intn(l - 1) //nolint:gosec // We don't need cryptographic randomness here.
// shift second half of second rand.Intn down so we're picking
// from range of numbers other than r1.
// i.e. rand.Intn(l-1) range is now from range [0,r1),[r1+1,l).
if r2 >= r1 {
r2++
}
alt = targets[r2]
altTrys++
if altTrys <= len(targets)/2 {
return noop, nil
}
}

pick, alt := targets[r1], targets[r2]
// Possible race here, but this policy is for CC=0,
// so fine.
if pick.getWeight() > alt.getWeight() {
Expand All @@ -75,17 +103,22 @@ func randomChoice2Policy(_ context.Context, targets []*podTracker) (func(), *pod
return pick.decreaseWeight, pick
}

// firstAvailableLBPolicy is a load balancer policy, that picks the first target
// firstAvailableLBPolicy is a load balancer policy that picks the first target
// that has capacity to serve the request right now.
func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(), *podTracker) {
for _, t := range targets {
if cb, ok := t.Reserve(ctx); ok {
return cb, t
if t != nil {
if cb, ok := t.Reserve(ctx); ok {
return cb, t
}
}
}
return noop, nil
}

// roundRobinPolicy is a load balancer policy that tries all targets in order until one responds,
// using it as the target. It then continues in order from the last target to determine
// subsequent targets
func newRoundRobinPolicy() lbPolicy {
var (
mu sync.Mutex
Expand All @@ -104,10 +137,12 @@ func newRoundRobinPolicy() lbPolicy {
// round robin fashion.
for i := range l {
p := (idx + i) % l
if cb, ok := targets[p].Reserve(ctx); ok {
// We want to start with the next index.
idx = p + 1
return cb, targets[p]
if targets[p] != nil {
if cb, ok := targets[p].Reserve(ctx); ok {
// We want to start with the next index.
idx = p + 1
return cb, targets[p]
}
}
}
// We exhausted all the options...
Expand Down
Loading