diff --git a/go.mod b/go.mod index 72bed1071ade..06ac89fa3ff8 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( go.opencensus.io v0.24.0 go.uber.org/automaxprocs v1.6.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac golang.org/x/net v0.35.0 golang.org/x/sync v0.11.0 golang.org/x/sys v0.30.0 diff --git a/pkg/activator/handler/handler.go b/pkg/activator/handler/handler.go index 35e29d049998..c7b01ad3f497 100644 --- a/pkg/activator/handler/handler.go +++ b/pkg/activator/handler/handler.go @@ -46,7 +46,7 @@ import ( // Throttler is the interface that Handler calls to Try to proxy the user request. type Throttler interface { - Try(ctx context.Context, revID types.NamespacedName, fn func(string, bool) error) error + Try(ctx context.Context, revID types.NamespacedName, xRequestId string, fn func(string, bool) error) error } // activationHandler will wait for an active endpoint for a revision @@ -86,8 +86,10 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { tryContext, trySpan = trace.StartSpan(r.Context(), "throttler_try") } + xRequestId := r.Header.Get("X-Request-Id") + revID := RevIDFrom(r.Context()) - if err := a.throttler.Try(tryContext, revID, func(dest string, isClusterIP bool) error { + if err := a.throttler.Try(tryContext, revID, xRequestId, func(dest string, isClusterIP bool) error { trySpan.End() proxyCtx, proxySpan := r.Context(), (*trace.Span)(nil) diff --git a/pkg/activator/handler/handler_test.go b/pkg/activator/handler/handler_test.go index 7a72b71c4d6c..d2c44bdbcb69 100644 --- a/pkg/activator/handler/handler_test.go +++ b/pkg/activator/handler/handler_test.go @@ -61,7 +61,7 @@ type fakeThrottler struct { err error } -func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, f func(string, bool) error) error { +func (ft fakeThrottler) Try(_ context.Context, _ types.NamespacedName, _ string, f func(string, bool) error) error { if ft.err != nil { return ft.err } @@ -323,7 +323,7 @@ func revision(namespace, name string) *v1.Revision { } } -func setupConfigStore(t testing.TB, logger *zap.SugaredLogger) *activatorconfig.Store { +func setupConfigStore(_ testing.TB, logger *zap.SugaredLogger) *activatorconfig.Store { configStore := activatorconfig.NewStore(logger) configStore.OnConfigChanged(tracingConfig(false)) return configStore diff --git a/pkg/activator/net/lb_policy.go b/pkg/activator/net/lb_policy.go index 7e7f689c86c2..336b113b04c0 100644 --- a/pkg/activator/net/lb_policy.go +++ b/pkg/activator/net/lb_policy.go @@ -31,6 +31,11 @@ import ( // and pointers therein are immutable. type lbPolicy func(ctx context.Context, targets []*podTracker) (func(), *podTracker) +type TrackerLoad struct { + tracker *podTracker + inFlight uint64 +} + // randomLBPolicy is a load balancer policy that picks a random target. // This approximates the LB policy done by K8s Service (IPTables based). func randomLBPolicy(_ context.Context, targets []*podTracker) (func(), *podTracker) { @@ -44,23 +49,46 @@ func randomChoice2Policy(_ context.Context, targets []*podTracker) (func(), *pod // One tracker = no choice. if l == 1 { pick := targets[0] - pick.increaseWeight() - return pick.decreaseWeight, pick + if pick != nil { + pick.increaseWeight() + return pick.decreaseWeight, pick + } + return noop, nil } r1, r2 := 0, 1 + // Two trackers - we know both contestants, // otherwise pick 2 random unequal integers. - if l > 2 { - r1, r2 = rand.Intn(l), rand.Intn(l-1) //nolint:gosec // We don't need cryptographic randomness here. + // Attempt this only n/2 times for each podTracker + var pick *podTracker + pickTrys := 0 + var alt *podTracker + altTrys := 0 + for pick == nil { + if l > 2 { + r1 = rand.Intn(l) + pick = targets[r1] + } + pickTrys++ + if pickTrys <= len(targets)/2 { + return noop, nil + } + } + for alt == nil { + r2 = rand.Intn(l - 1) //nolint:gosec // We don't need cryptographic randomness here. // shift second half of second rand.Intn down so we're picking // from range of numbers other than r1. // i.e. rand.Intn(l-1) range is now from range [0,r1),[r1+1,l). if r2 >= r1 { r2++ } + alt = targets[r2] + altTrys++ + if altTrys <= len(targets)/2 { + return noop, nil + } } - pick, alt := targets[r1], targets[r2] // Possible race here, but this policy is for CC=0, // so fine. if pick.getWeight() > alt.getWeight() { @@ -75,17 +103,22 @@ func randomChoice2Policy(_ context.Context, targets []*podTracker) (func(), *pod return pick.decreaseWeight, pick } -// firstAvailableLBPolicy is a load balancer policy, that picks the first target +// firstAvailableLBPolicy is a load balancer policy that picks the first target // that has capacity to serve the request right now. func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(), *podTracker) { for _, t := range targets { - if cb, ok := t.Reserve(ctx); ok { - return cb, t + if t != nil { + if cb, ok := t.Reserve(ctx); ok { + return cb, t + } } } return noop, nil } +// roundRobinPolicy is a load balancer policy that tries all targets in order until one responds, +// using it as the target. It then continues in order from the last target to determine +// subsequent targets func newRoundRobinPolicy() lbPolicy { var ( mu sync.Mutex @@ -104,10 +137,12 @@ func newRoundRobinPolicy() lbPolicy { // round robin fashion. for i := range l { p := (idx + i) % l - if cb, ok := targets[p].Reserve(ctx); ok { - // We want to start with the next index. - idx = p + 1 - return cb, targets[p] + if targets[p] != nil { + if cb, ok := targets[p].Reserve(ctx); ok { + // We want to start with the next index. + idx = p + 1 + return cb, targets[p] + } } } // We exhausted all the options... diff --git a/pkg/activator/net/throttler.go b/pkg/activator/net/throttler.go index 0ef298c48db4..ca10b86058eb 100644 --- a/pkg/activator/net/throttler.go +++ b/pkg/activator/net/throttler.go @@ -19,9 +19,13 @@ package net import ( "context" "net/http" + "slices" "sort" "sync" "sync/atomic" + "time" + + "golang.org/x/exp/maps" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/sets" @@ -29,6 +33,7 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/tools/cache" pkgnet "knative.dev/networking/pkg/apis/networking" @@ -62,18 +67,24 @@ const ( ) func newPodTracker(dest string, b breaker) *podTracker { - tracker := &podTracker{ - dest: dest, - b: b, + tracker := podTracker{ + id: string(uuid.NewUUID()), + createdAt: time.Now().UnixMicro(), + dest: dest, + b: b, + healthy: true, } tracker.decreaseWeight = func() { tracker.weight.Add(-1) } - return tracker + return &tracker } type podTracker struct { - dest string - b breaker + id string + createdAt int64 + dest string + b breaker + healthy bool // weight is used for LB policy implementations. weight atomic.Int32 @@ -96,13 +107,27 @@ func (p *podTracker) String() string { return p.dest } -func (p *podTracker) Capacity() int { +func (p *podTracker) Capacity() uint64 { if p.b == nil { return 1 } return p.b.Capacity() } +func (p *podTracker) Pending() int { + if p.b == nil { + return 0 + } + return p.b.Pending() +} + +func (p *podTracker) InFlight() uint64 { + if p.b == nil { + return 0 + } + return p.b.InFlight() +} + func (p *podTracker) UpdateConcurrency(c int) { if p.b == nil { return @@ -118,10 +143,12 @@ func (p *podTracker) Reserve(ctx context.Context) (func(), bool) { } type breaker interface { - Capacity() int + Capacity() uint64 Maybe(ctx context.Context, thunk func()) error UpdateConcurrency(int) Reserve(ctx context.Context) (func(), bool) + Pending() int + InFlight() uint64 } // revisionThrottler is used to throttle requests across the entire revision. @@ -151,7 +178,7 @@ type revisionThrottler struct { breaker breaker // This will be non-empty when we're able to use pod addressing. - podTrackers []*podTracker + podTrackers map[string]*podTracker // Effective trackers that are assigned to this Activator. // This is a subset of podTrackers. @@ -187,7 +214,7 @@ func newRevisionThrottler(revID types.NamespacedName, revBreaker = queue.NewBreaker(breakerParams) lbp = firstAvailableLBPolicy default: - // Otherwise RR. + // Otherwise Round Robin revBreaker = queue.NewBreaker(breakerParams) lbp = newRoundRobinPolicy() } @@ -198,6 +225,7 @@ func newRevisionThrottler(revID types.NamespacedName, logger: logger, protocol: proto, lbPolicy: lbp, + podTrackers: make(map[string]*podTracker), } // Start with unknown @@ -216,11 +244,11 @@ func (rt *revisionThrottler) acquireDest(ctx context.Context) (func(), *podTrack if rt.clusterIPTracker != nil { return noop, rt.clusterIPTracker, true } - f, lbTracker := rt.lbPolicy(ctx, rt.assignedTrackers) - return f, lbTracker, false + callback, pt := rt.lbPolicy(ctx, rt.assignedTrackers) + return callback, pt, false } -func (rt *revisionThrottler) try(ctx context.Context, function func(dest string, isClusterIP bool) error) error { +func (rt *revisionThrottler) try(ctx context.Context, xRequestId string, function func(dest string, isClusterIP bool) error) error { var ret error // Retrying infinitely as long as we receive no dest. Outer semaphore and inner @@ -229,20 +257,32 @@ func (rt *revisionThrottler) try(ctx context.Context, function func(dest string, reenqueue := true for reenqueue { reenqueue = false + + assignedTrackers := rt.assignedTrackers + if len(assignedTrackers) == 0 { + rt.logger.Debugf("%s -> No Assigned trackers\n", xRequestId) + } if err := rt.breaker.Maybe(ctx, func() { - cb, tracker, isClusterIP := rt.acquireDest(ctx) + callback, tracker, isClusterIP := rt.acquireDest(ctx) if tracker == nil { // This can happen if individual requests raced each other or if pod // capacity was decreased after passing the outer semaphore. reenqueue = true + rt.logger.Debugf("%s -> Failed to acquire tracker\n", xRequestId) return } - defer cb() + trackerId := tracker.id + rt.logger.Infof("%s -> Acquired Pod Tracker %s - %s (createdAt %d) Breaker State: capacity: %d, inflight: %d, pending: %d\n", xRequestId, trackerId, tracker.dest, tracker.createdAt, tracker.Capacity(), tracker.InFlight(), tracker.Pending()) + defer func() { + callback() + rt.logger.Debugf("%s -> %s breaker release semaphore\n", xRequestId, trackerId) + }() // We already reserved a guaranteed spot. So just execute the passed functor. ret = function(tracker.dest, isClusterIP) }); err != nil { return err } + rt.logger.Debugf("%s -> Reenqueue request\n", xRequestId) } return ret } @@ -307,16 +347,20 @@ func (rt *revisionThrottler) updateCapacity(backendCount int) { return 0 } - // Sort, so we get more or less stable results. - sort.Slice(rt.podTrackers, func(i, j int) bool { - return rt.podTrackers[i].dest < rt.podTrackers[j].dest - }) - assigned := rt.podTrackers + var assigned []*podTracker if rt.containerConcurrency > 0 { rt.resetTrackers() assigned = assignSlice(rt.podTrackers, ai, ac) + } else { + assigned = maps.Values(rt.podTrackers) } rt.logger.Debugf("Trackers %d/%d: assignment: %v", ai, ac, assigned) + + // Sort, so we get more or less stable results. + sort.Slice(assigned, func(i, j int) bool { + return assigned[i].dest < assigned[j].dest + }) + // The actual write out of the assigned trackers has to be under lock. rt.mux.Lock() defer rt.mux.Unlock() @@ -332,9 +376,9 @@ func (rt *revisionThrottler) updateCapacity(backendCount int) { rt.breaker.UpdateConcurrency(capacity) } -func (rt *revisionThrottler) updateThrottlerState(backendCount int, trackers []*podTracker, clusterIPDest *podTracker) { - rt.logger.Infof("Updating Revision Throttler with: clusterIP = %v, trackers = %d, backends = %d", - clusterIPDest, len(trackers), backendCount) +func (rt *revisionThrottler) updateThrottlerState(backendCount int, newTrackers []*podTracker, healthyDests []string, unhealthyDests []string, clusterIPDest *podTracker) { + rt.logger.Infof("Updating Revision Throttler with: clusterIP = %v, trackers = %d, backends = %d, healthyDests = %s, unhealthyDests = %s", + clusterIPDest, len(newTrackers), backendCount, healthyDests, unhealthyDests) // Update trackers / clusterIP before capacity. Otherwise we can race updating our breaker when // we increase capacity, causing a request to fall through before a tracker is added, causing an @@ -342,9 +386,28 @@ func (rt *revisionThrottler) updateThrottlerState(backendCount int, trackers []* if func() bool { rt.mux.Lock() defer rt.mux.Unlock() - rt.podTrackers = trackers + for _, t := range newTrackers { + if t != nil { + rt.podTrackers[t.dest] = t + } + } + for _, d := range healthyDests { + if !rt.podTrackers[d].healthy { + rt.podTrackers[d].healthy = true + } + } + for _, d := range unhealthyDests { + tracker := rt.podTrackers[d] + if tracker.InFlight() > 0 && rt.podTrackers[d].healthy { + // mark this podTracker as unhealthy, but do not remove the reference if inFlight is above 0 + tracker.healthy = false + } else { + // this podTracker is unhealthy and has no active requests, remove the podTracker reference + delete(rt.podTrackers, d) + } + } rt.clusterIPTracker = clusterIPDest - return clusterIPDest != nil || len(trackers) > 0 + return clusterIPDest != nil || len(rt.podTrackers) > 0 }() { // If we have an address to target, then pass through an accurate // accounting of the number of backends. @@ -380,32 +443,39 @@ func pickIndices(numTrackers, selfIndex, numActivators int) (beginIndex, endInde // for this Activator instance. This only matters in case of direct // to pod IP routing, and is irrelevant, when ClusterIP is used. // assignSlice should receive podTrackers sorted by address. -func assignSlice(trackers []*podTracker, selfIndex, numActivators int) []*podTracker { +func assignSlice(trackers map[string]*podTracker, selfIndex, numActivators int) []*podTracker { // When we're unassigned, doesn't matter what we return. lt := len(trackers) if selfIndex == -1 || lt <= 1 { - return trackers + return maps.Values(trackers) } // If there's just a single activator. Take all the trackers. if numActivators == 1 { - return trackers + return maps.Values(trackers) } // If the number of pods is not divisible by the number of activators, we allocate one pod to each activator exclusively. // examples // 1. we have 20 pods and 3 activators -> we'd get 2 remnants so activator with index 0,1 would each pick up a unique tracker // 2. we have 24 pods and 5 activators -> we'd get 4 remnants so the activator 0,1,2,3 would each pick up a unique tracker + dests := maps.Keys(trackers) + sort.Strings(dests) bi, ei, remnants := pickIndices(lt, selfIndex, numActivators) - x := append(trackers[:0:0], trackers[bi:ei]...) + pickedDests := slices.Clone(dests[bi:ei]) if remnants > 0 { - tail := trackers[len(trackers)-remnants:] + tail := dests[len(trackers)-remnants:] if len(tail) > selfIndex { t := tail[selfIndex] - x = append(x, t) + pickedDests = append(pickedDests, t) } } - return x + trackerSlice := make([]*podTracker, len(pickedDests)) + + for i, d := range pickedDests { + trackerSlice[i] = trackers[d] + } + return trackerSlice } // This function will never be called in parallel but `try` can be called in parallel to this so we need @@ -416,20 +486,16 @@ func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) { // ClusterIP is not yet ready, so we want to send requests directly to the pods. // NB: this will not be called in parallel, thus we can build a new podTrackers - // array before taking out a lock. + // map before taking out a lock. if update.ClusterIPDest == "" { - // Create a map for fast lookup of existing trackers. - trackersMap := make(map[string]*podTracker, len(rt.podTrackers)) - for _, tracker := range rt.podTrackers { - trackersMap[tracker.dest] = tracker - } - - trackers := make([]*podTracker, 0, len(update.Dests)) - // Loop over dests, reuse existing tracker if we have one, otherwise create // a new one. + newTrackers := make([]*podTracker, 0, len(update.Dests)) + currentDests := maps.Keys(rt.podTrackers) + newDestsSet := make(map[string]struct{}, len(update.Dests)) for newDest := range update.Dests { - tracker, ok := trackersMap[newDest] + newDestsSet[newDest] = struct{}{} + tracker, ok := rt.podTrackers[newDest] if !ok { if rt.containerConcurrency == 0 { tracker = newPodTracker(newDest, nil) @@ -440,15 +506,31 @@ func (rt *revisionThrottler) handleUpdate(update revisionDestsUpdate) { InitialCapacity: rt.containerConcurrency, // Presume full unused capacity. })) } + newTrackers = append(newTrackers, tracker) + } else { + if !tracker.healthy { + tracker.healthy = true + } + } + } + healthyDests := make([]string, 0, len(currentDests)) + unhealthyDests := make([]string, 0, len(currentDests)) + for _, d := range currentDests { + _, ok := newDestsSet[d] + // If stale, check if the tracker still has outbound requests + // Remove if no outbound requests and mark unhealthy otherwise + if !ok { + unhealthyDests = append(unhealthyDests, d) + } else { + healthyDests = append(healthyDests, d) } - trackers = append(trackers, tracker) } - rt.updateThrottlerState(len(update.Dests), trackers, nil /*clusterIP*/) + rt.updateThrottlerState(len(update.Dests), newTrackers, healthyDests, unhealthyDests, nil /*clusterIP*/) return } - - rt.updateThrottlerState(len(update.Dests), nil /*trackers*/, newPodTracker(update.ClusterIPDest, nil)) + clusterIPPodTracker := newPodTracker(update.ClusterIPDest, nil) + rt.updateThrottlerState(len(update.Dests), nil /*trackers*/, nil, nil, clusterIPPodTracker) } // Throttler load balances requests to revisions based on capacity. When `Run` is called it listens for @@ -519,12 +601,12 @@ func (t *Throttler) run(updateCh <-chan revisionDestsUpdate) { } // Try waits for capacity and then executes function, passing in a l4 dest to send a request -func (t *Throttler) Try(ctx context.Context, revID types.NamespacedName, function func(dest string, isClusterIP bool) error) error { +func (t *Throttler) Try(ctx context.Context, revID types.NamespacedName, xRequestId string, function func(string, bool) error) error { rt, err := t.getOrCreateRevisionThrottler(revID) if err != nil { return err } - return rt.try(ctx, function) + return rt.try(ctx, xRequestId, function) } func (t *Throttler) getOrCreateRevisionThrottler(revID types.NamespacedName) (*revisionThrottler, error) { @@ -560,7 +642,7 @@ func (t *Throttler) getOrCreateRevisionThrottler(revID types.NamespacedName) (*r // revisionUpdated is used to ensure we have a backlog set up for a revision as soon as it is created // rather than erroring with revision not found until a networking probe succeeds -func (t *Throttler) revisionUpdated(obj interface{}) { +func (t *Throttler) revisionUpdated(obj any) { rev := obj.(*v1.Revision) revID := types.NamespacedName{Namespace: rev.Namespace, Name: rev.Name} @@ -574,7 +656,7 @@ func (t *Throttler) revisionUpdated(obj interface{}) { // revisionDeleted is to clean up revision throttlers after a revision is deleted to prevent unbounded // memory growth -func (t *Throttler) revisionDeleted(obj interface{}) { +func (t *Throttler) revisionDeleted(obj any) { acc, err := kmeta.DeletionHandlingAccessor(obj) if err != nil { t.logger.Warnw("Revision delete failure to process", zap.Error(err)) @@ -670,7 +752,7 @@ func inferIndex(eps []string, ipAddress string) int { return idx } -func (t *Throttler) publicEndpointsUpdated(newObj interface{}) { +func (t *Throttler) publicEndpointsUpdated(newObj any) { endpoints := newObj.(*corev1.Endpoints) t.logger.Info("Updated public Endpoints: ", endpoints.Name) t.epsUpdateCh <- endpoints @@ -721,10 +803,20 @@ func newInfiniteBreaker(logger *zap.SugaredLogger) *infiniteBreaker { } // Capacity returns the current capacity of the breaker -func (ib *infiniteBreaker) Capacity() int { +func (ib *infiniteBreaker) Capacity() uint64 { + return uint64(ib.concurrency.Load()) +} + +// Pending returns the current pending requests the breaker +func (ib *infiniteBreaker) Pending() int { return int(ib.concurrency.Load()) } +// Pending returns the current inflight requests the breaker +func (ib *infiniteBreaker) InFlight() uint64 { + return uint64(ib.concurrency.Load()) +} + func zeroOrOne(x int) int32 { if x == 0 { return 0 diff --git a/pkg/activator/net/throttler_test.go b/pkg/activator/net/throttler_test.go index ed727a26c79b..968d37ea3ba8 100644 --- a/pkg/activator/net/throttler_test.go +++ b/pkg/activator/net/throttler_test.go @@ -19,7 +19,7 @@ package net import ( "context" "errors" - "slices" + "maps" "strconv" "sync" "testing" @@ -74,7 +74,7 @@ func TestThrottlerUpdateCapacity(t *testing.T) { containerConcurrency int isNewInfiniteBreaker bool podTrackers []*podTracker - want int + want uint64 checkAssignedPod bool }{{ name: "capacity: 1, cc: 10", @@ -221,7 +221,11 @@ func TestThrottlerUpdateCapacity(t *testing.T) { } rt.numActivators.Store(tt.numActivators) rt.activatorIndex.Store(tt.activatorIndex) - rt.podTrackers = tt.podTrackers + rtPodTrackers := make(map[string]*podTracker) + for _, pt := range tt.podTrackers { + rtPodTrackers[pt.dest] = pt + } + rt.podTrackers = rtPodTrackers if tt.isNewInfiniteBreaker { rt.breaker = newInfiniteBreaker(logger) } @@ -275,18 +279,19 @@ func TestThrottlerCalculateCapacity(t *testing.T) { } func makeTrackers(num, cc int) []*podTracker { - x := make([]*podTracker, num) + trackers := make([]*podTracker, num) for i := range num { - x[i] = newPodTracker(strconv.Itoa(i), nil) + pt := newPodTracker(strconv.Itoa(i), nil) if cc > 0 { - x[i].b = queue.NewBreaker(queue.BreakerParams{ + pt.b = queue.NewBreaker(queue.BreakerParams{ QueueDepth: 1, MaxConcurrency: cc, InitialCapacity: cc, }) } + trackers[i] = pt } - return x + return trackers } func TestThrottlerErrorNoRevision(t *testing.T) { @@ -315,13 +320,13 @@ func TestThrottlerErrorNoRevision(t *testing.T) { }) // Make sure it now works. - if err := throttler.Try(ctx, revID, func(string, bool) error { return nil }); err != nil { + if err := throttler.Try(ctx, revID, "test", func(string, bool) error { return nil }); err != nil { t.Fatalf("Try() = %v, want no error", err) } // Make sure errors are propagated correctly. innerError := errors.New("inner") - if err := throttler.Try(ctx, revID, func(string, bool) error { return innerError }); !errors.Is(err, innerError) { + if err := throttler.Try(ctx, revID, "test", func(string, bool) error { return innerError }); !errors.Is(err, innerError) { t.Fatalf("Try() = %v, want %v", err, innerError) } @@ -331,7 +336,7 @@ func TestThrottlerErrorNoRevision(t *testing.T) { // Eventually it should now fail. var lastError error wait.PollUntilContextCancel(ctx, 10*time.Millisecond, false, func(context.Context) (bool, error) { - lastError = throttler.Try(ctx, revID, func(string, bool) error { return nil }) + lastError = throttler.Try(ctx, revID, "test", func(string, bool) error { return nil }) return lastError != nil, nil }) if lastError == nil || lastError.Error() != `revision.serving.knative.dev "test-revision" not found` { @@ -550,11 +555,12 @@ func TestThrottlerSuccesses(t *testing.T) { // Make sure our informer event has fired. // We send multiple updates in some tests, so make sure the capacity is exact. - wantCapacity := 1 + var wantCapacity uint64 + wantCapacity = 1 cc := tc.revision.Spec.ContainerConcurrency dests := tc.initUpdates[len(tc.initUpdates)-1].Dests.Len() if *cc != 0 { - wantCapacity = dests * int(*cc) + wantCapacity = uint64(dests) * uint64(*cc) } if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) { rt.mux.RLock() @@ -638,13 +644,13 @@ func TestPodAssignmentFinite(t *testing.T) { if got, want := trackerDestSet(rt.assignedTrackers), sets.New("ip0", "ip4"); !got.Equal(want) { t.Errorf("Assigned trackers = %v, want: %v, diff: %s", got, want, cmp.Diff(want, got)) } - if got, want := rt.breaker.Capacity(), 2*42; got != want { + if got, want := rt.breaker.Capacity(), uint64(2*42); got != want { t.Errorf("TotalCapacity = %d, want: %d", got, want) } - if got, want := rt.assignedTrackers[0].Capacity(), 42; got != want { + if got, want := rt.assignedTrackers[0].Capacity(), uint64(42); got != want { t.Errorf("Exclusive tracker capacity: %d, want: %d", got, want) } - if got, want := rt.assignedTrackers[1].Capacity(), 42; got != want { + if got, want := rt.assignedTrackers[1].Capacity(), uint64(42); got != want { t.Errorf("Shared tracker capacity: %d, want: %d", got, want) } @@ -657,7 +663,7 @@ func TestPodAssignmentFinite(t *testing.T) { if got, want := len(rt.assignedTrackers), 0; got != want { t.Errorf("NumAssignedTrackers = %d, want: %d", got, want) } - if got, want := rt.breaker.Capacity(), 0; got != want { + if got, want := rt.breaker.Capacity(), uint64(0); got != want { t.Errorf("TotalCapacity = %d, want: %d", got, want) } } @@ -687,10 +693,10 @@ func TestPodAssignmentInfinite(t *testing.T) { if got, want := len(rt.assignedTrackers), 3; got != want { t.Errorf("NumAssigned trackers = %d, want: %d", got, want) } - if got, want := rt.breaker.Capacity(), 1; got != want { + if got, want := rt.breaker.Capacity(), uint64(1); got != want { t.Errorf("TotalCapacity = %d, want: %d", got, want) } - if got, want := rt.assignedTrackers[0].Capacity(), 1; got != want { + if got, want := rt.assignedTrackers[0].Capacity(), uint64(1); got != want { t.Errorf("Exclusive tracker capacity: %d, want: %d", got, want) } @@ -703,7 +709,7 @@ func TestPodAssignmentInfinite(t *testing.T) { if got, want := len(rt.assignedTrackers), 0; got != want { t.Errorf("NumAssignedTrackers = %d, want: %d", got, want) } - if got, want := rt.breaker.Capacity(), 0; got != want { + if got, want := rt.breaker.Capacity(), uint64(0); got != want { t.Errorf("TotalCapacity = %d, want: %d", got, want) } } @@ -915,7 +921,7 @@ func (t *Throttler) try(ctx context.Context, requests int, try func(string) erro for range requests { go func() { var result tryResult - if err := t.Try(ctx, revID, func(dest string, _ bool) error { + if err := t.Try(ctx, revID, "test", func(dest string, isClusterIP bool) error { result = tryResult{dest: dest} return try(dest) }); err != nil { @@ -935,7 +941,7 @@ func TestInfiniteBreaker(t *testing.T) { } // Verify initial condition. - if got, want := b.Capacity(), 0; got != want { + if got, want := b.Capacity(), uint64(0); got != want { t.Errorf("Cap=%d, want: %d", got, want) } if _, ok := b.Reserve(context.Background()); ok != true { @@ -949,7 +955,7 @@ func TestInfiniteBreaker(t *testing.T) { } b.UpdateConcurrency(1) - if got, want := b.Capacity(), 1; got != want { + if got, want := b.Capacity(), uint64(1); got != want { t.Errorf("Cap=%d, want: %d", got, want) } @@ -976,7 +982,7 @@ func TestInfiniteBreaker(t *testing.T) { if err := b.Maybe(ctx, nil); err == nil { t.Error("Should have failed, but didn't") } - if got, want := b.Capacity(), 0; got != want { + if got, want := b.Capacity(), uint64(0); got != want { t.Errorf("Cap=%d, want: %d", got, want) } @@ -1150,7 +1156,18 @@ func TestAssignSlice(t *testing.T) { return a.dest == b.dest }) // assignSlice receives the pod trackers sorted. - trackers := []*podTracker{{ + trackers := map[string]*podTracker{ + "dest1": { + dest: "1", + }, + "dest2": { + dest: "2", + }, + "dest3": { + dest: "3", + }, + } + assignedTrackers := []*podTracker{{ dest: "1", }, { dest: "2", @@ -1158,44 +1175,62 @@ func TestAssignSlice(t *testing.T) { dest: "3", }} t.Run("notrackers", func(t *testing.T) { - got := assignSlice([]*podTracker{}, 0 /*selfIdx*/, 1 /*numAct*/) + got := assignSlice(map[string]*podTracker{}, 0 /*selfIdx*/, 1 /*numAct*/) if !cmp.Equal(got, []*podTracker{}, opt) { - t.Errorf("Got=%v, want: %v, diff: %s", got, trackers, + t.Errorf("Got=%v, want: %v, diff: %s", got, assignedTrackers, cmp.Diff([]*podTracker{}, got, opt)) } }) t.Run("idx=1, na=1", func(t *testing.T) { got := assignSlice(trackers, 1, 1) - if !cmp.Equal(got, trackers, opt) { - t.Errorf("Got=%v, want: %v, diff: %s", got, trackers, - cmp.Diff(trackers, got, opt)) + if !cmp.Equal(got, assignedTrackers, opt) { + t.Errorf("Got=%v, want: %v, diff: %s", got, assignedTrackers, + cmp.Diff(assignedTrackers, got, opt)) } }) t.Run("idx=-1", func(t *testing.T) { got := assignSlice(trackers, -1, 1) - if !cmp.Equal(got, trackers, opt) { - t.Errorf("Got=%v, want: %v, diff: %s", got, trackers, - cmp.Diff(trackers, got, opt)) + if !cmp.Equal(got, assignedTrackers, opt) { + t.Errorf("Got=%v, want: %v, diff: %s", got, assignedTrackers, + cmp.Diff(assignedTrackers, got, opt)) } }) t.Run("idx=1 na=3", func(t *testing.T) { - cp := slices.Clone(trackers) + cp := make(map[string]*podTracker) + maps.Copy(cp, trackers) got := assignSlice(cp, 1, 3) - if !cmp.Equal(got, trackers[1:2], opt) { - t.Errorf("Got=%v, want: %v; diff: %s", got, trackers[0:1], - cmp.Diff(trackers[1:2], got, opt)) + if !cmp.Equal(got, assignedTrackers[1:2], opt) { + t.Errorf("Got=%v, want: %v; diff: %s", got, assignedTrackers[0:1], + cmp.Diff(assignedTrackers[1:2], got, opt)) } }) t.Run("len=1", func(t *testing.T) { - got := assignSlice(trackers[0:1], 1, 3) - if !cmp.Equal(got, trackers[0:1], opt) { - t.Errorf("Got=%v, want: %v; diff: %s", got, trackers[0:1], - cmp.Diff(trackers[0:1], got, opt)) + cp := make(map[string]*podTracker) + delete(cp, "dest2") + delete(cp, "dest3") + got := assignSlice(cp, 1, 3) + if !cmp.Equal(got, assignedTrackers[0:1], opt) { + t.Errorf("Got=%v, want: %v; diff: %s", got, assignedTrackers[0:1], + cmp.Diff(assignedTrackers[0:1], got, opt)) } }) t.Run("idx=1, breaker", func(t *testing.T) { - trackers := []*podTracker{{ + trackers := map[string]*podTracker{ + "dest1": { + dest: "1", + b: queue.NewBreaker(testBreakerParams), + }, + "dest2": { + dest: "2", + b: queue.NewBreaker(testBreakerParams), + }, + "dest3": { + dest: "3", + b: queue.NewBreaker(testBreakerParams), + }, + } + assignedTrackers := []podTracker{{ dest: "1", b: queue.NewBreaker(testBreakerParams), }, { @@ -1205,14 +1240,14 @@ func TestAssignSlice(t *testing.T) { dest: "3", b: queue.NewBreaker(testBreakerParams), }} - cp := slices.Clone(trackers) + cp := maps.Clone(trackers) got := assignSlice(cp, 1, 2) - want := trackers[1:2] + want := assignedTrackers[1:2] if !cmp.Equal(got, want, opt) { t.Errorf("Got=%v, want: %v; diff: %s", got, want, cmp.Diff(want, got, opt)) } - if got, want := got[0].b.Capacity(), 0; got != want { + if got, want := got[0].b.Capacity(), uint64(0); got != want { t.Errorf("Capacity for the tail pod = %d, want: %d", got, want) } }) diff --git a/pkg/queue/breaker.go b/pkg/queue/breaker.go index 918f57b743a5..1e8327d466b8 100644 --- a/pkg/queue/breaker.go +++ b/pkg/queue/breaker.go @@ -43,7 +43,7 @@ type BreakerParams struct { // executions in excess of the concurrency limit. Function call attempts // beyond the limit of the queue are failed immediately. type Breaker struct { - inFlight atomic.Int64 + pending atomic.Int64 totalSlots int64 sem *semaphore @@ -83,10 +83,10 @@ func NewBreaker(params BreakerParams) *Breaker { func (b *Breaker) tryAcquirePending() bool { // This is an atomic version of: // - // if inFlight == totalSlots { + // if pending == totalSlots { // return false // } else { - // inFlight++ + // pending++ // return true // } // @@ -96,11 +96,12 @@ func (b *Breaker) tryAcquirePending() bool { // (it fails if we're raced to it) or if we don't fulfill the condition // anymore. for { - cur := b.inFlight.Load() + cur := b.pending.Load() + // 10000 + containerConcurrency = totalSlots if cur == b.totalSlots { return false } - if b.inFlight.CompareAndSwap(cur, cur+1) { + if b.pending.CompareAndSwap(cur, cur+1) { return true } } @@ -108,7 +109,7 @@ func (b *Breaker) tryAcquirePending() bool { // releasePending releases a slot on the pending "queue". func (b *Breaker) releasePending() { - b.inFlight.Add(-1) + b.pending.Add(-1) } // Reserve reserves an execution slot in the breaker, to permit @@ -154,9 +155,9 @@ func (b *Breaker) Maybe(ctx context.Context, thunk func()) error { return nil } -// InFlight returns the number of requests currently in flight in this breaker. -func (b *Breaker) InFlight() int { - return int(b.inFlight.Load()) +// Pending returns the number of requests currently in flight in this breaker. +func (b *Breaker) Pending() int { + return int(b.pending.Load()) } // UpdateConcurrency updates the maximum number of in-flight requests. @@ -165,10 +166,14 @@ func (b *Breaker) UpdateConcurrency(size int) { } // Capacity returns the number of allowed in-flight requests on this breaker. -func (b *Breaker) Capacity() int { +func (b *Breaker) Capacity() uint64 { return b.sem.Capacity() } +func (b *Breaker) InFlight() uint64 { + return b.sem.InFlight() +} + // newSemaphore creates a semaphore with the desired initial capacity. func newSemaphore(maxCapacity, initialCapacity int) *semaphore { queue := make(chan struct{}, maxCapacity) @@ -288,9 +293,15 @@ func (s *semaphore) updateCapacity(size int) { } // Capacity is the capacity of the semaphore. -func (s *semaphore) Capacity() int { +func (s *semaphore) Capacity() uint64 { capacity, _ := unpack(s.state.Load()) - return int(capacity) //nolint:gosec // TODO(dprotaso) - capacity should be uint64 + return capacity +} + +// Pending is the number of the inflight requests of the semaphore. +func (s *semaphore) InFlight() uint64 { + _, inFlight := unpack(s.state.Load()) + return inFlight } // unpack takes an uint64 and returns two uint32 (as uint64) comprised of the leftmost diff --git a/pkg/queue/breaker_test.go b/pkg/queue/breaker_test.go index 547959a1da54..c7e838f82bdc 100644 --- a/pkg/queue/breaker_test.go +++ b/pkg/queue/breaker_test.go @@ -212,12 +212,12 @@ func TestBreakerUpdateConcurrency(t *testing.T) { params := BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 0} b := NewBreaker(params) b.UpdateConcurrency(1) - if got, want := b.Capacity(), 1; got != want { + if got, want := b.Capacity(), uint64(1); got != want { t.Errorf("Capacity() = %d, want: %d", got, want) } b.UpdateConcurrency(0) - if got, want := b.Capacity(), 0; got != want { + if got, want := b.Capacity(), uint64(0); got != want { t.Errorf("Capacity() = %d, want: %d", got, want) } } @@ -294,12 +294,12 @@ func TestSemaphoreRelease(t *testing.T) { func TestSemaphoreUpdateCapacity(t *testing.T) { const initialCapacity = 1 sem := newSemaphore(3, initialCapacity) - if got, want := sem.Capacity(), 1; got != want { + if got, want := sem.Capacity(), uint64(1); got != want { t.Errorf("Capacity = %d, want: %d", got, want) } sem.acquire(context.Background()) sem.updateCapacity(initialCapacity + 2) - if got, want := sem.Capacity(), 3; got != want { + if got, want := sem.Capacity(), uint64(3); got != want { t.Errorf("Capacity = %d, want: %d", got, want) } } diff --git a/pkg/queue/handler.go b/pkg/queue/handler.go index f3ef48bb23bb..57441aa11abb 100644 --- a/pkg/queue/handler.go +++ b/pkg/queue/handler.go @@ -23,6 +23,7 @@ import ( "time" "go.opencensus.io/trace" + "go.uber.org/zap" netheader "knative.dev/networking/pkg/http/header" netstats "knative.dev/networking/pkg/http/stats" "knative.dev/serving/pkg/activator" @@ -30,12 +31,16 @@ import ( // ProxyHandler sends requests to the `next` handler at a rate controlled by // the passed `breaker`, while recording stats to `stats`. -func ProxyHandler(breaker *Breaker, stats *netstats.RequestStats, tracingEnabled bool, next http.Handler) http.HandlerFunc { +func ProxyHandler(breaker *Breaker, stats *netstats.RequestStats, tracingEnabled bool, next http.Handler, logger *zap.SugaredLogger) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if netheader.IsKubeletProbe(r) { next.ServeHTTP(w, r) return } + startTime := time.Now() + requestID := r.Header.Get("x-request-id") + + logger.Debugw("requestReceived", "x-request-id", requestID, "host", r.Host, "path", r.URL.Path, "requestReceivedTimestamp", time.Now().Format(time.RFC3339Nano), "containerStartTimestamp", startTime.Format(time.RFC3339Nano)) if tracingEnabled { proxyCtx, proxySpan := trace.StartSpan(r.Context(), "queue_proxy") diff --git a/pkg/queue/handler_test.go b/pkg/queue/handler_test.go index 201444393cd5..2ceff2b0212b 100644 --- a/pkg/queue/handler_test.go +++ b/pkg/queue/handler_test.go @@ -28,6 +28,8 @@ import ( "testing" "time" + logtesting "knative.dev/pkg/logging/testing" + netheader "knative.dev/networking/pkg/http/header" netstats "knative.dev/networking/pkg/http/stats" "knative.dev/serving/pkg/activator" @@ -38,6 +40,8 @@ const ( ) func TestHandlerBreakerQueueFull(t *testing.T) { + logger := logtesting.TestLogger(t) + // This test sends three requests of which one should fail immediately as the queue // saturates. resp := make(chan struct{}) @@ -48,7 +52,7 @@ func TestHandlerBreakerQueueFull(t *testing.T) { QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1, }) stats := netstats.NewRequestStats(time.Now()) - h := ProxyHandler(breaker, stats, false /*tracingEnabled*/, blockHandler) + h := ProxyHandler(breaker, stats, false /*tracingEnabled*/, blockHandler, logger) req := httptest.NewRequest(http.MethodGet, "http://localhost:8081/time", nil) resps := make(chan *httptest.ResponseRecorder) @@ -82,6 +86,8 @@ func TestHandlerBreakerQueueFull(t *testing.T) { } func TestHandlerBreakerTimeout(t *testing.T) { + logger := logtesting.TestLogger(t) + // This test sends a request which will take a long time to complete. // Then another one with a very short context timeout. // Verifies that the second one fails with timeout. @@ -96,7 +102,7 @@ func TestHandlerBreakerTimeout(t *testing.T) { QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1, }) stats := netstats.NewRequestStats(time.Now()) - h := ProxyHandler(breaker, stats, false /*tracingEnabled*/, blockHandler) + h := ProxyHandler(breaker, stats, false /*tracingEnabled*/, blockHandler, logger) go func() { h(httptest.NewRecorder(), httptest.NewRequest(http.MethodGet, "http://localhost:8081/time", nil)) @@ -121,6 +127,8 @@ func TestHandlerBreakerTimeout(t *testing.T) { } func TestHandlerReqEvent(t *testing.T) { + logger := logtesting.TestLogger(t) + params := BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10} breaker := NewBreaker(params) for _, br := range []*Breaker{breaker, nil} { @@ -154,7 +162,7 @@ func TestHandlerReqEvent(t *testing.T) { proxy := httputil.NewSingleHostReverseProxy(serverURL) stats := netstats.NewRequestStats(time.Now()) - h := ProxyHandler(br, stats, true /*tracingEnabled*/, proxy) + h := ProxyHandler(br, stats, true /*tracingEnabled*/, proxy, logger) writer := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPost, "http://example.com", nil) @@ -173,6 +181,8 @@ func TestHandlerReqEvent(t *testing.T) { } func TestIgnoreProbe(t *testing.T) { + logger := logtesting.TestLogger(t) + // Verifies that probes don't queue. resp := make(chan struct{}) var c atomic.Int32 @@ -214,7 +224,7 @@ func TestIgnoreProbe(t *testing.T) { // Ensure no more than 1 request can be queued. So we'll send 3. breaker := NewBreaker(BreakerParams{QueueDepth: 1, MaxConcurrency: 1, InitialCapacity: 1}) stats := netstats.NewRequestStats(time.Now()) - h := ProxyHandler(breaker, stats, false /*tracingEnabled*/, proxy) + h := ProxyHandler(breaker, stats, false /*tracingEnabled*/, proxy, logger) req := httptest.NewRequest(http.MethodPost, "http://prob.in", nil) req.Header.Set("User-Agent", netheader.KubeProbeUAPrefix+"1.29") // Mark it a probe. @@ -231,6 +241,8 @@ func TestIgnoreProbe(t *testing.T) { } func BenchmarkProxyHandler(b *testing.B) { + logger := logtesting.TestLogger(b) + baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}) stats := netstats.NewRequestStats(time.Now()) @@ -262,7 +274,7 @@ func BenchmarkProxyHandler(b *testing.B) { for _, tc := range tests { reportTicker := time.NewTicker(tc.reportPeriod) - h := ProxyHandler(tc.breaker, stats, true /*tracingEnabled*/, baseHandler) + h := ProxyHandler(tc.breaker, stats, true /*tracingEnabled*/, baseHandler, logger) b.Run("sequential-"+tc.label, func(b *testing.B) { resp := httptest.NewRecorder() for range b.N { diff --git a/pkg/queue/request_metric.go b/pkg/queue/request_metric.go index 2a0a4d84df39..a8b165b27d10 100644 --- a/pkg/queue/request_metric.go +++ b/pkg/queue/request_metric.go @@ -176,7 +176,7 @@ func (h *appRequestMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ startTime := time.Now() if h.breaker != nil { - pkgmetrics.Record(h.statsCtx, queueDepthM.M(int64(h.breaker.InFlight()))) + pkgmetrics.Record(h.statsCtx, queueDepthM.M(int64(h.breaker.Pending()))) } defer func() { // Filter probe requests for revision metrics. diff --git a/pkg/queue/sharedmain/handlers.go b/pkg/queue/sharedmain/handlers.go index 8f8b56d22611..2d2423887476 100644 --- a/pkg/queue/sharedmain/handlers.go +++ b/pkg/queue/sharedmain/handlers.go @@ -71,7 +71,7 @@ func mainHandler( if metricsSupported { composedHandler = requestAppMetricsHandler(logger, composedHandler, breaker, env) } - composedHandler = queue.ProxyHandler(breaker, stats, tracingEnabled, composedHandler) + composedHandler = queue.ProxyHandler(breaker, stats, tracingEnabled, composedHandler, logger) composedHandler = queue.ForwardedShimHandler(composedHandler) composedHandler = handler.NewTimeoutHandler(composedHandler, "request timeout", func(r *http.Request) (time.Duration, time.Duration, time.Duration) { return timeout, responseStartTimeout, idleTimeout diff --git a/pkg/queue/sharedmain/main_test.go b/pkg/queue/sharedmain/main_test.go index 429e13d98e00..a718b3279838 100644 --- a/pkg/queue/sharedmain/main_test.go +++ b/pkg/queue/sharedmain/main_test.go @@ -32,6 +32,7 @@ import ( "github.com/kelseyhightower/envconfig" netheader "knative.dev/networking/pkg/http/header" netstats "knative.dev/networking/pkg/http/stats" + logtesting "knative.dev/pkg/logging/testing" pkgnet "knative.dev/pkg/network" "knative.dev/pkg/tracing" tracingconfig "knative.dev/pkg/tracing/config" @@ -42,6 +43,7 @@ import ( ) func TestQueueTraceSpans(t *testing.T) { + logger := logtesting.TestLogger(t) testcases := []struct { name string prober func() bool @@ -150,7 +152,7 @@ func TestQueueTraceSpans(t *testing.T) { Propagation: tracecontextb3.TraceContextB3Egress, } - h := queue.ProxyHandler(breaker, netstats.NewRequestStats(time.Now()), true /*tracingEnabled*/, proxy) + h := queue.ProxyHandler(breaker, netstats.NewRequestStats(time.Now()), true /*tracingEnabled*/, proxy, logger) h(writer, req) } else { h := health.ProbeHandler(tc.prober, true /*tracingEnabled*/) diff --git a/pkg/reconciler/autoscaling/kpa/kpa.go b/pkg/reconciler/autoscaling/kpa/kpa.go index 3849b4427729..83c0be77add0 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa.go +++ b/pkg/reconciler/autoscaling/kpa/kpa.go @@ -49,7 +49,7 @@ import ( const ( noPrivateServiceName = "No Private Service Name" noTrafficReason = "NoTraffic" - minActivators = 2 + minActivators = 1 ) // podCounts keeps record of various numbers of pods diff --git a/pkg/reconciler/autoscaling/kpa/kpa_test.go b/pkg/reconciler/autoscaling/kpa/kpa_test.go index da2ee9ad4807..85dcf3bb157b 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa_test.go +++ b/pkg/reconciler/autoscaling/kpa/kpa_test.go @@ -525,7 +525,7 @@ func TestReconcile(t *testing.T) { Name: "kpa does not become ready without minScale endpoints when reachable", Key: key, Objects: []runtime.Object{ - kpa(testNamespace, testRevision, withMinScale(2), withScales(1, defaultScale), + kpa(testNamespace, testRevision, withMinScale(1), withScales(1, defaultScale), WithReachabilityReachable, WithPAMetricsService(privateSvc)), defaultSKS, metric(testNamespace, testRevision),