Skip to content

Commit 188dd02

Browse files
committed
fix: Ensure consistent breaker state for unhealthy hosts with inflight requests
add x/exp capacity and inflight are uint64 change min kpa for test
1 parent c09ff6c commit 188dd02

15 files changed

+334
-139
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
go.opencensus.io v0.24.0
2020
go.uber.org/automaxprocs v1.6.0
2121
go.uber.org/zap v1.27.0
22+
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac
2223
golang.org/x/net v0.35.0
2324
golang.org/x/sync v0.11.0
2425
golang.org/x/sys v0.30.0

pkg/activator/handler/handler.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import (
4646

4747
// Throttler is the interface that Handler calls to Try to proxy the user request.
4848
type Throttler interface {
49-
Try(ctx context.Context, revID types.NamespacedName, fn func(string, bool) error) error
49+
Try(ctx context.Context, revID types.NamespacedName, xRequestId string, fn func(string, bool) error) error
5050
}
5151

5252
// activationHandler will wait for an active endpoint for a revision
@@ -86,8 +86,10 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
8686
tryContext, trySpan = trace.StartSpan(r.Context(), "throttler_try")
8787
}
8888

89+
xRequestId := r.Header.Get("X-Request-Id")
90+
8991
revID := RevIDFrom(r.Context())
90-
if err := a.throttler.Try(tryContext, revID, func(dest string, isClusterIP bool) error {
92+
if err := a.throttler.Try(tryContext, revID, xRequestId, func(dest string, isClusterIP bool) error {
9193
trySpan.End()
9294

9395
proxyCtx, proxySpan := r.Context(), (*trace.Span)(nil)

pkg/activator/handler/handler_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type fakeThrottler struct {
6161
err error
6262
}
6363

64-
func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, f func(string, bool) error) error {
64+
func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, _ string, f func(string, bool) error) error {
6565
if ft.err != nil {
6666
return ft.err
6767
}
@@ -323,7 +323,7 @@ func revision(namespace, name string) *v1.Revision {
323323
}
324324
}
325325

326-
func setupConfigStore(t testing.TB, logger *zap.SugaredLogger) *activatorconfig.Store {
326+
func setupConfigStore(_ testing.TB, logger *zap.SugaredLogger) *activatorconfig.Store {
327327
configStore := activatorconfig.NewStore(logger)
328328
configStore.OnConfigChanged(tracingConfig(false))
329329
return configStore

pkg/activator/net/lb_policy.go

+47-12
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ import (
3131
// and pointers therein are immutable.
3232
type lbPolicy func(ctx context.Context, targets []*podTracker) (func(), *podTracker)
3333

34+
type TrackerLoad struct {
35+
tracker *podTracker
36+
inFlight uint64
37+
}
38+
3439
// randomLBPolicy is a load balancer policy that picks a random target.
3540
// This approximates the LB policy done by K8s Service (IPTables based).
3641
func randomLBPolicy(_ context.Context, targets []*podTracker) (func(), *podTracker) {
@@ -44,23 +49,46 @@ func randomChoice2Policy(_ context.Context, targets []*podTracker) (func(), *pod
4449
// One tracker = no choice.
4550
if l == 1 {
4651
pick := targets[0]
47-
pick.increaseWeight()
48-
return pick.decreaseWeight, pick
52+
if pick != nil {
53+
pick.increaseWeight()
54+
return pick.decreaseWeight, pick
55+
}
56+
return noop, nil
4957
}
5058
r1, r2 := 0, 1
59+
5160
// Two trackers - we know both contestants,
5261
// otherwise pick 2 random unequal integers.
53-
if l > 2 {
54-
r1, r2 = rand.Intn(l), rand.Intn(l-1) //nolint:gosec // We don't need cryptographic randomness here.
62+
// Attempt this only n/2 times for each podTracker
63+
var pick *podTracker
64+
pickTrys := 0
65+
var alt *podTracker
66+
altTrys := 0
67+
for pick == nil {
68+
if l > 2 {
69+
r1 = rand.Intn(l)
70+
pick = targets[r1]
71+
}
72+
pickTrys++
73+
if pickTrys <= len(targets)/2 {
74+
return noop, nil
75+
}
76+
}
77+
for alt == nil {
78+
r2 = rand.Intn(l - 1) //nolint:gosec // We don't need cryptographic randomness here.
5579
// shift second half of second rand.Intn down so we're picking
5680
// from range of numbers other than r1.
5781
// i.e. rand.Intn(l-1) range is now from range [0,r1),[r1+1,l).
5882
if r2 >= r1 {
5983
r2++
6084
}
85+
alt = targets[r2]
86+
altTrys++
87+
if altTrys <= len(targets)/2 {
88+
return noop, nil
89+
}
6190
}
6291

63-
pick, alt := targets[r1], targets[r2]
6492
// Possible race here, but this policy is for CC=0,
6593
// so fine.
6694
if pick.getWeight() > alt.getWeight() {
@@ -75,17 +103,22 @@ func randomChoice2Policy(_ context.Context, targets []*podTracker) (func(), *pod
75103
return pick.decreaseWeight, pick
76104
}
77105

78-
// firstAvailableLBPolicy is a load balancer policy, that picks the first target
106+
// firstAvailableLBPolicy is a load balancer policy that picks the first target
79107
// that has capacity to serve the request right now.
80108
func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(), *podTracker) {
81109
for _, t := range targets {
82-
if cb, ok := t.Reserve(ctx); ok {
83-
return cb, t
110+
if t != nil {
111+
if cb, ok := t.Reserve(ctx); ok {
112+
return cb, t
113+
}
84114
}
85115
}
86116
return noop, nil
87117
}
88118

119+
// roundRobinPolicy is a load balancer policy that tries all targets in order until one responds,
120+
// using it as the target. It then continues in order from the last target to determine
121+
// subsequent targets
89122
func newRoundRobinPolicy() lbPolicy {
90123
var (
91124
mu sync.Mutex
@@ -104,10 +137,12 @@ func newRoundRobinPolicy() lbPolicy {
104137
// round robin fashion.
105138
for i := range l {
106139
p := (idx + i) % l
107-
if cb, ok := targets[p].Reserve(ctx); ok {
108-
// We want to start with the next index.
109-
idx = p + 1
110-
return cb, targets[p]
140+
if targets[p] != nil {
141+
if cb, ok := targets[p].Reserve(ctx); ok {
142+
// We want to start with the next index.
143+
idx = p + 1
144+
return cb, targets[p]
145+
}
111146
}
112147
}
113148
// We exhausted all the options...

0 commit comments

Comments
 (0)