Skip to content

RSDK-10794: Add inflight request limit for each resource #5133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Jul 22, 2025

Conversation

jmatth
Copy link
Member

@jmatth jmatth commented Jul 11, 2025

No description provided.

@viambot viambot added the safe to test This pull request is marked safe to test from a trusted zone label Jul 11, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 11, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 11, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 14, 2025
@jmatth jmatth requested review from benjirewis and dgottlieb July 14, 2025 14:16
@jmatth jmatth marked this pull request as ready for review July 14, 2025 14:16
@jmatth
Copy link
Member Author

jmatth commented Jul 14, 2025

This is ready for review but still needs some work. I left PR comments on the parts I still have questions about.

@@ -1565,7 +1565,7 @@ func TestPerRequestFTDC(t *testing.T) {
// We can assert that there are two counters in our stats. The fact that `GetEndPosition` was
// called once and we hence spent (negligible) time in that RPC call.
stats := svc.RequestCounter().Stats().(map[string]int64)
test.That(t, len(stats), test.ShouldEqual, 4)
test.That(t, len(stats), test.ShouldEqual, 5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did the extra stat come from? Is that the curr counter we use for comparing against the new limit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the new in flight request counter. Originally it was the maximum concurrent in flight requests over the last minute, with the latest changes it's just the count of in flight requests at the time .Stats() is called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are two counters in our stats

Can we update the comment above this assertion to explain why len(stats) == 5?

robot/web/web.go Outdated
type RequestCounter struct {
requestKeyToStats sync.Map
requestKeyToStats ssync.Map[string, *requestStats]
limiter requestLimiter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to pull the PR to grok everything here. There are lots of new nouns. I think the core of it is that:

  • requestKeyToStats is keyed on the (resource name, api method) pair
  • limiter has a map internally that is just keyed on the resource name.
  • requestLimiter has a map where the value of that map is requestCounter

I'm going to softly float a layer of denormalization and some renaming:

// RequestCounter is used to track and limit incoming requests. It instruments every unary and
// streaming request. Coming in from both external clients and internal modules.
type RequestCounter struct {
	// requestKeyToStats maps individual API calls for each resource to a set of metrics. E.g:
	// `motor-foo.IsPowered` and `motor-foo.GoFor` would each have their own set of stats.
	requestKeyToStats ssync.Map[string, *requestStats]

	// inFlightRequests maps resource names to how many in flight requests are currently targetting
	// that resource name. There can only be `limit` API calls for any resource. E.g: `motor-foo`
	// can have 50 `IsPowered` concurrent calls with 50 more `GoFor` calls. Or instead 100
	// `IsPowered` calls. Before it starts to reject new incoming requests.
	inFlightRequests ssync.Map[string, *inFlightCounter]
	// limit is controlled with the `VIAM_RESOURCE_REQUESTS_LIMIT` env variable. Its default is 100.
	limit int
}

And add something about how streaming RPCs count against limits? I suppose each open stream is just counted as a single "in flight" request?

Out of curiosity -- does internal signaling count against these limits?

Based on my observation about the purpose curr and max, I also feel inFlightCounter just becomes an *atomic.Int64. Maybe it can be a non-pointer, but I'm not 100% on the limitations of sync.Map.

Copy link
Member Author

@jmatth jmatth Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity -- does internal signaling count against these limits?

No. Currently we are only counting+limiting requests to apis that start with /viam.component, /viam.service, or /viam.robot and have a resource name*, or have no resource name but are going to a /viam.robot.v1.RobotService/ API. As far as I can tell internal signalling lives under proto.rpc.webrtc.v1.SignalingService/.

*: Or at least have a field called name that we assume is referring to a resource, or a field called controller that we assume is a resource for input controller APIs.

robot/web/web.go Outdated
})
for k, v := range rc.limiter.counts.Range {
max := v.max
v.max = v.curr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what's happening here. This is not a pattern I've come across. I think you just want to use an atomic int64 here and do away with max and the mutex.

@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 14, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 14, 2025
@viambot viambot added the safe to test This pull request is marked safe to test from a trusted zone label Jul 15, 2025
robot/web/web.go Outdated
requestKey := buildRCKey(m, w.apiMethod)
w.requestKey.Store(&requestKey)
w.requestKey.Store(&streamRequestKey{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgottlieb Benji's question got me taking a closer look at this. Is it a problem that we just call Load here instead of Swap and then checking if we lost the race?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded above thinking both you and Benji will be notified there

@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 16, 2025
@jmatth
Copy link
Member Author

jmatth commented Jul 16, 2025

Ok, I think this is ready for review again. We're now just reporting the number of in flight requests whenever FTDC happens to write rather than trying to track the max in between each FTDC write, and I added a special case for the one service Bohdan found that uses a different field name to refer to resources in its messages. Also there are tests.

I'd like to figure out a better system for identifying resources in gRPC messages but in the interest of helping everyone who's suddenly hitting stream limits I'd like to merge this and iterate on it as necessary later.

@jmatth jmatth requested review from benjirewis and dgottlieb July 16, 2025 21:38
Copy link
Member

@dgottlieb dgottlieb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great to me. I'm most concerned with documenting some of the business logic. All of the choices look very reasonable, just need some clarity on "must be done this way" versus "being defensive against the future unknowns".

I also think the request counter stuff is outgrowing this file. But I don't want that kind of code movement to muddle/slow down this PR.

robot/web/web.go Outdated
}
}

func (rc *RequestCounter) ensureKey(resource string) *atomic.Int64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be simplified down to:

func (rc *RequestCounter) ensureKey(resource string) *atomic.Int64 {
    counter, _ = rc.inFlightRequests.LoadOrStore(resource, &atomic.Int64{})
    return counter
}

?

Copy link
Member Author

@jmatth jmatth Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could. I wrote it this way to avoid allocating an extra atomic.Int64 for every call after the first that just creates GC pressure. If you think that's not worth worrying about we can simplify the function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense! Please leave a comment

robot/web/web.go Outdated
// streaming RPCs both count against the limit.`limit` defaults to 100 but
// can be configured with the `VIAM_RESOURCE_REQUESTS_LIMIT`
// environment variable.
inFlightRequests ssync.Map[string, *atomic.Int64]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What Josh is saying about copying being illegal is correct. Go chose to make atomic objects "values" rather than pointers under the hood. So one can embed a mutex or atomic.Int64 without needing to allocate on the heap.

But those can no longer be copied. When a mutex, for example, is "copied" there are now two mutexes. Each can be individually locked/unlocked.

I don't think immutability is relevant here. If one wanted an immutable int64 -- there's no need to make it atomic -- the two properties cannot meaningfully coexist. It's perfectly fine for any number of threads to concurrently read from the same memory address. There's no reason to have an atomic if one never intends to store into it.

robot/web/web.go Outdated
}
}

func (rc *RequestCounter) ensureKey(resource string) *atomic.Int64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should disambiguate the method name ensureKey. If it makes sense to have ensureKey initialize map entries for both the stats and inflight maps, the name ensureKey makes sense.

Otherwise we should choose something that makes it more obviously we're only ensuring inFlightRequests has a given key.

robot/web/web.go Outdated
rc.requestKeyToStats.Range(func(requestKeyI, requestStatsI any) bool {
requestKey := requestKeyI.(string)
requestStats := requestStatsI.(*requestStats)
for requestKey, requestStats := range rc.requestKeyToStats.Range {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so this is a legit usage of the new language feature for custom implemented range/iterator functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. All both of the function signatures it supports are declared/documented in the iter package.

robot/web/web.go Outdated
name string
}

func extractViamAPI(fullMethod string) apiMethod {
// Extract Service and Method name from `fullMethod` values such as:
// - `/viam.component.motor.v1.MotorService/IsMoving` -> MotorService/IsMoving
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the comment here so the example return values are in terms of the new apiMethod type?

robot/web/web.go Outdated
@@ -648,10 +767,18 @@ func (rc *RequestCounter) UnaryInterceptor(
ctx context.Context, req any, info *googlegrpc.UnaryServerInfo, handler googlegrpc.UnaryHandler,
) (resp any, err error) {
apiMethod := extractViamAPI(info.FullMethod)

if resource := buildResourceLimitKey(req, apiMethod); resource != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we want this after the stat increments. Such that a resource limit exceeded error would register as both:

  • A call to the RPC method
  • An error being returned from the RPC method

robot/web/web.go Outdated
rc: rc,
requestKey: atomic.Pointer[streamRequestKey]{},
}
defer wrappedStream.tryDecr()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamInterceptors hurt my head, so forgive me if this is a dumb question. Based on the code:

  • Stream handler(srv, &wrappedStream) methods only return when the stream is closed?
  • And during that time there may be multiple RecvMsg and SendMsg calls?

If we're only interested in resource limiting an entire stream -- should we increment here for simplicity? Or does the first RecvMsg add an important element with regards to the resource limit key we're tracking? I assume that's where we learn of the resource name.

I feel this unconditional tryDecr is very close to be an accidental bug. If tryIncr fails because of a resource limit, we do not want to decrement. And I believe we correctly don't decrement. But only because tryDecr checks if the requestKey was set. And right now, we only set the requestKey if we successfully incremented.

None of that is obvious. And if we wanted to go down a path where we do increment stats, despite failing to increment, we'd now introduce a double decrementing bug.

Part of me wants to just ignore streaming APIs and do it as a low priority follow-up. I know that'd be losing some of the parity we have today (this change isn't trying to maintain full parity). But we don't have good examples of streaming APIs being problematic and reasoning about streams is much harder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to come up with a more guided actionable. Choose one of these three:

  • Think if there's a safer way to know when to decrement
  • If not, definitely document the assumption tryDecr is making when an increment fails.
  • If you're actually just on board with undoing the stream limiting for this PR, I'm good with that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have undone the stream limiting for this PR, but to somewhat close the loop here: I traced a callstack for a streaming API call and our WebRTC code closes the stream when the handler returns. I'm not sure what normal gRPC over HTTP2 does but with our WebRTC system I'm pretty confident saying that if a handler spins off a goroutine to handle a stream and then returns, the stream will stop working.

robot/web/web.go Outdated
return method.name
}

func buildResourceLimitKey(clientMsg any, method apiMethod) string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// buildResourceLimitKey returns a string of the form:
// - `foo-motor.viam.component.motor.v1.MotorService` for requests with a `name` field hitting a Viam resource API or
// - `viam.robot.v1.RobotService` for requests on the robot service directly without a `name` field

I'm happy pretending controllers don't meaningfully exist here.

test.That(t, stats[statsKey], test.ShouldEqual, 0)
}

func TestPerResourceLimitsAndFTDC(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seemed pretty onerous to instrument. I imagine doing the same for a streaming example would be more so?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support for stream APIs has been removed for now.

robot/web/web.go Outdated

if resource := buildResourceLimitKey(req, apiMethod); resource != "" {
if ok := rc.incrInFlight(resource); !ok {
return nil, &RequestLimitExceededError{
Copy link
Member

@cheukt cheukt Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driveby, but can we log on server side too? if we're worried about spammy logs we can make sure to only log every 30s or 1min

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we haven't already come up with a reason to circumvent the existing log deduplication code?

@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 22, 2025
@jmatth jmatth requested a review from dgottlieb July 22, 2025 15:09
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a modified version of a file in Bohdan's ongoing jobmanager PR (#5104).

@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Jul 22, 2025
@jmatth jmatth merged commit e933632 into viamrobotics:main Jul 22, 2025
32 of 34 checks passed
@jmatth jmatth deleted the RSDK-10794 branch July 22, 2025 17:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
safe to test This pull request is marked safe to test from a trusted zone
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants