Skip to content

Commit a973d97

Browse files
committed
admission: add logic to fill CPU time token buckets
This commit introduces cpuTimeTokenFiller. cpuTimeTokenFiller starts up a goroutine which periodically adds tokens to a cpuTimeTokenGranter. For example, on an 8 vCPU machine, we may want to allow burstable tier-0 work to use 6 seconds of CPU time per second. Then cpuTimeTokenFiller would add 6 seconds of token every second. cpuTimeTokenFiller is designed to be robust against delayed or dropped time.Timer ticks, as could happen if goroutine scheduling latency is elevated. Fixes: #154470 Release note: None.
1 parent 63044ec commit a973d97

File tree

9 files changed

+587
-8
lines changed

9 files changed

+587
-8
lines changed

pkg/util/admission/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "admission",
55
srcs = [
66
"admission.go",
7+
"cpu_time_token_filler.go",
78
"cpu_time_token_granter.go",
89
"disk_bandwidth.go",
910
"elastic_cpu_grant_coordinator.go",
@@ -58,6 +59,7 @@ go_library(
5859
go_test(
5960
name = "admission_test",
6061
srcs = [
62+
"cpu_time_token_filler_test.go",
6163
"cpu_time_token_granter_test.go",
6264
"disk_bandwidth_test.go",
6365
"elastic_cpu_granter_test.go",
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2021 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package admission
7+
8+
import (
9+
"time"
10+
11+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
12+
"github.com/cockroachdb/errors"
13+
)
14+
15+
// timePerTick is how frequently cpuTimeTokenFiller ticks its time.Ticker & adds
16+
// tokens to the buckets. Must be < 1s. Must divide 1s evenly.
17+
const timePerTick = 1 * time.Millisecond
18+
19+
// cpuTimeTokenFiller starts a goroutine which periodically calls
20+
// cpuTimeTokenAllocator to add tokens to a cpuTimeTokenGranter. For example, on
21+
// an 8 vCPU machine, we may want to allow burstable tier-0 work to use 6 seconds
22+
// of CPU time per second. Then cpuTimeTokenAllocator.rates[tier0][canBurst] would
23+
// equal 6 seconds per second, and cpuTimeTokenFiller would add 6 seconds of token
24+
// every second, but smoothly -- 1ms at a time. See cpuTimeTokenGranter for details
25+
// on the multi-dimensional token buckets owned by cpuTimeTokenGranter; the TLDR is
26+
// there is one bucket per <resource tier, burst qualification> pair.
27+
//
28+
// cpuTimeTokenFiller owns the time.Ticker logic. The details of the token allocation
29+
// are left to the cpuTimeTokenAllocator, in order to improve clarity & testability.
30+
//
31+
// Note that the combination of cpuTimeTokenFiller & cpuTimeTokenAllocator are written
32+
// to be robust against delayed and dropped time.Timer ticks. That
33+
// is, in the presence of delayed and dropped ticks, the correct number of tokens will
34+
// be added to the buckets; they just may be added in a less smooth fashion than
35+
// normal. If ticks are delayed more than roughly 1s, not enough tokens will be
36+
// added to the bucket, but we do not expect this significant of a delay in practice
37+
// (admission control will be running).
38+
//
39+
// See ticker docs, where it is mentioned ticks can be dropped, if receivers are
40+
// slow: https://pkg.go.dev/time#NewTicker
41+
//
42+
// The mechanism by which the goroutine adds the correct number of tokens, in the
43+
// presence of delayed or dropped ticks, is:
44+
// - time is split into intervals of 1s
45+
// - intervals are split into 1s / timePerTick(=1ms) time.Ticker ticks
46+
// - cpuTimeTokenAllocator attempts to allocate remaining tokens for interval evenly
47+
// across remaining ticks in the interval
48+
// - once interval is complete, all remaining tokens needed for that interval
49+
// are added (e.g. see t.allocateTokens(1) below), then a new interval starts
50+
type cpuTimeTokenFiller struct {
51+
allocator tokenAllocator
52+
timeSource timeutil.TimeSource
53+
closeCh chan struct{}
54+
// Used only in unit tests.
55+
tickCh *chan struct{}
56+
}
57+
58+
// tokenAllocator abstracts cpuTimeTokenAllocator for testing.
59+
type tokenAllocator interface {
60+
allocateTokens(remainingTicksInInInterval int64)
61+
resetInterval()
62+
}
63+
64+
func (f *cpuTimeTokenFiller) start() {
65+
ticker := f.timeSource.NewTicker(timePerTick)
66+
intervalStart := f.timeSource.Now()
67+
// Every 1s a new interval starts. every timePerTick time token allocation
68+
// is done. The expected number of ticks left in the interval is passed to
69+
// the allocator. The expected number of ticks left can jump around, if
70+
// time.Timer ticks are delayed or dropped.
71+
go func() {
72+
lastRemainingTicks := int64(0)
73+
for {
74+
select {
75+
case t := <-ticker.Ch():
76+
var remainingTicks int64
77+
elapsedSinceIntervalStart := t.Sub(intervalStart)
78+
if elapsedSinceIntervalStart >= time.Second {
79+
// INVARIANT: During each interval, allocateTokens(1) must be called, before
80+
// resetInterval() can be called.
81+
//
82+
// Without this invariant, cpuTimeTokenAllocator.rates tokens would not be
83+
// allocated every 1s.
84+
if lastRemainingTicks > 1 {
85+
f.allocator.allocateTokens(1)
86+
}
87+
intervalStart = t
88+
f.allocator.resetInterval()
89+
remainingTicks = int64(time.Second / timePerTick)
90+
} else {
91+
remainingSinceIntervalStart := time.Second - elapsedSinceIntervalStart
92+
if remainingSinceIntervalStart < 0 {
93+
panic(errors.AssertionFailedf("remainingSinceIntervalStart is negative %d", remainingSinceIntervalStart))
94+
}
95+
// ceil(a / b) == (a + b - 1) / b, when using integer division.
96+
// Round up so that we don't accumulate tokens to give in a burst on the
97+
// last tick.
98+
remainingTicks =
99+
int64((remainingSinceIntervalStart + timePerTick - 1) / timePerTick)
100+
}
101+
f.allocator.allocateTokens(max(1, remainingTicks))
102+
lastRemainingTicks = remainingTicks
103+
// Only non-nil in unit tests.
104+
if f.tickCh != nil {
105+
*f.tickCh <- struct{}{}
106+
}
107+
case <-f.closeCh:
108+
return
109+
}
110+
}
111+
}()
112+
}
113+
114+
// cpuTimeTokenAllocator allocates tokens to a cpuTimeTokenGranter. See the comment
115+
// above cpuTimeTokenFiller for a high level picture. The responsibility of
116+
// cpuTimeTokenAllocator is to gradually allocate rates tokens every interval,
117+
// while respecting bucketCapacity. We have split up the ticking & token allocation
118+
// logic, in order to improve clarity & testability.
119+
type cpuTimeTokenAllocator struct {
120+
granter *cpuTimeTokenGranter
121+
122+
// Mutable fields. No mutex, since only a single goroutine will call the
123+
// cpuTimeTokenAllocator.
124+
125+
// rates stores the number of token added to each bucket every interval.
126+
rates [numResourceTiers][numBurstQualifications]int64
127+
// bucketCapacity stores the maximum number of tokens that can be in each bucket.
128+
// That is, if a bucket is already at capacity, no more tokens will be added.
129+
bucketCapacity [numResourceTiers][numBurstQualifications]int64
130+
// allocated stores the number of tokens added to each bucket in the current
131+
// interval.
132+
allocated [numResourceTiers][numBurstQualifications]int64
133+
}
134+
135+
var _ tokenAllocator = &cpuTimeTokenAllocator{}
136+
137+
// allocateTokens allocates tokens to a cpuTimeTokenGranter. allocateTokens
138+
// adds rates tokens every interval, while respecting bucketCapacity.
139+
// allocateTokens adds tokens evenly among the expected remaining ticks in
140+
// the interval.
141+
// INVARIANT: remainingTicks >= 1.
142+
// TODO(josh): Expand to cover tenant-specific token buckets too.
143+
func (a *cpuTimeTokenAllocator) allocateTokens(expectedRemainingTicksInInterval int64) {
144+
allocateFunc := func(total int64, allocated int64, remainingTicks int64) (toAllocate int64) {
145+
remainingTokens := total - allocated
146+
// ceil(a / b) == (a + b - 1) / b, when using integer division.
147+
// Round up so that we don't accumulate tokens to give in a burst on the
148+
// last tick.
149+
toAllocate = (remainingTokens + remainingTicks - 1) / remainingTicks
150+
if toAllocate < 0 {
151+
panic(errors.AssertionFailedf("toAllocate is negative %d", toAllocate))
152+
}
153+
if toAllocate+allocated > total {
154+
toAllocate = total - allocated
155+
}
156+
return toAllocate
157+
}
158+
159+
var delta [numResourceTiers][numBurstQualifications]int64
160+
for wc := range a.rates {
161+
for kind := range a.rates[wc] {
162+
toAllocateTokens := allocateFunc(
163+
a.rates[wc][kind], a.allocated[wc][kind], expectedRemainingTicksInInterval)
164+
a.allocated[wc][kind] += toAllocateTokens
165+
delta[wc][kind] = toAllocateTokens
166+
}
167+
}
168+
a.granter.refill(delta, a.bucketCapacity)
169+
}
170+
171+
// resetInterval is called to signal the beginning of a new interval. allocateTokens
172+
// adds rates tokens every interval.
173+
func (a *cpuTimeTokenAllocator) resetInterval() {
174+
for wc := range a.allocated {
175+
for kind := range a.allocated[wc] {
176+
a.allocated[wc][kind] = 0
177+
}
178+
}
179+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2021 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package admission
7+
8+
import (
9+
"fmt"
10+
"strings"
11+
"testing"
12+
"time"
13+
14+
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
15+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
18+
"github.com/cockroachdb/datadriven"
19+
)
20+
21+
func TestCPUTimeTokenFiller(t *testing.T) {
22+
defer leaktest.AfterTest(t)()
23+
defer log.Scope(t).Close(t)
24+
25+
// Fixed time for reproducibility.
26+
unixNanos := int64(1758938600000000000) // 2025-09-24T14:30:00Z
27+
startTime := time.Unix(0, unixNanos).UTC()
28+
testTime := timeutil.NewManualTime(startTime)
29+
30+
var buf strings.Builder
31+
allocator := testTokenAllocator{buf: &buf}
32+
var filler cpuTimeTokenFiller
33+
flushAndReset := func() string {
34+
fmt.Fprintf(&buf, "elapsed: %s\n", testTime.Since(startTime))
35+
str := buf.String()
36+
buf.Reset()
37+
return str
38+
}
39+
40+
tickCh := make(chan struct{})
41+
datadriven.RunTest(t, datapathutils.TestDataPath(t, "cpu_time_token_filler"), func(t *testing.T, d *datadriven.TestData) string {
42+
switch d.Cmd {
43+
case "init":
44+
filler = cpuTimeTokenFiller{
45+
allocator: &allocator,
46+
closeCh: make(chan struct{}),
47+
timeSource: testTime,
48+
tickCh: &tickCh,
49+
}
50+
filler.start()
51+
return flushAndReset()
52+
case "advance":
53+
var dur time.Duration
54+
d.ScanArgs(t, "dur", &dur)
55+
testTime.AdvanceInOneTick(dur)
56+
<-tickCh
57+
return flushAndReset()
58+
case "stop":
59+
close(filler.closeCh)
60+
return flushAndReset()
61+
default:
62+
return fmt.Sprintf("unknown command: %s", d.Cmd)
63+
}
64+
})
65+
}
66+
67+
type testTokenAllocator struct {
68+
buf *strings.Builder
69+
}
70+
71+
func (a *testTokenAllocator) resetInterval() {
72+
fmt.Fprintf(a.buf, "resetInterval()\n")
73+
}
74+
75+
func (a *testTokenAllocator) allocateTokens(remainingTicks int64) {
76+
fmt.Fprintf(a.buf, "allocateTokens(%d)\n", remainingTicks)
77+
}
78+
79+
func TestCPUTimeTokenAllocator(t *testing.T) {
80+
defer leaktest.AfterTest(t)()
81+
defer log.Scope(t).Close(t)
82+
83+
granter := &cpuTimeTokenGranter{}
84+
tier0Granter := &cpuTimeTokenChildGranter{
85+
tier: testTier0,
86+
parent: granter,
87+
}
88+
tier1Granter := &cpuTimeTokenChildGranter{
89+
tier: testTier1,
90+
parent: granter,
91+
}
92+
var requesters [numResourceTiers]*testRequester
93+
requesters[testTier0] = &testRequester{
94+
additionalID: "tier0",
95+
granter: tier0Granter,
96+
}
97+
requesters[testTier1] = &testRequester{
98+
additionalID: "tier1",
99+
granter: tier1Granter,
100+
}
101+
granter.requester[testTier0] = requesters[testTier0]
102+
granter.requester[testTier1] = requesters[testTier1]
103+
104+
allocator := cpuTimeTokenAllocator{
105+
granter: granter,
106+
}
107+
allocator.rates[testTier0][canBurst] = 5
108+
allocator.rates[testTier0][noBurst] = 4
109+
allocator.rates[testTier1][canBurst] = 3
110+
allocator.rates[testTier1][noBurst] = 2
111+
allocator.bucketCapacity = allocator.rates
112+
113+
var buf strings.Builder
114+
flushAndReset := func(printGranter bool) string {
115+
if printGranter {
116+
fmt.Fprint(&buf, granter.String())
117+
}
118+
str := buf.String()
119+
buf.Reset()
120+
return str
121+
}
122+
123+
datadriven.RunTest(t, datapathutils.TestDataPath(t, "cpu_time_token_allocator"), func(t *testing.T, d *datadriven.TestData) string {
124+
switch d.Cmd {
125+
case "resetInterval":
126+
allocator.resetInterval()
127+
return flushAndReset(false /* printGranter */)
128+
case "allocate":
129+
var remainingTicks int64
130+
d.ScanArgs(t, "remaining", &remainingTicks)
131+
allocator.allocateTokens(remainingTicks)
132+
return flushAndReset(true /* printGranter */)
133+
case "clear":
134+
granter.mu.buckets[testTier0][canBurst].tokens = 0
135+
granter.mu.buckets[testTier0][noBurst].tokens = 0
136+
granter.mu.buckets[testTier1][canBurst].tokens = 0
137+
granter.mu.buckets[testTier1][noBurst].tokens = 0
138+
return flushAndReset(true /* printGranter */)
139+
default:
140+
return fmt.Sprintf("unknown command: %s", d.Cmd)
141+
}
142+
})
143+
}

pkg/util/admission/cpu_time_token_granter.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,29 @@ func (stg *cpuTimeTokenGranter) tryGrantLocked() bool {
244244
}
245245
return false
246246
}
247+
248+
// refill adds delta tokens to the corresponding buckets, while respecting
249+
// the capacity info stored in bucketCapacity. That is, if a bucket is already
250+
// at capacity, no more tokens will be added. delta is always positive,
251+
// thus refill will always attempt to grant admission to waiting requests.
252+
func (stg *cpuTimeTokenGranter) refill(
253+
delta [numResourceTiers][numBurstQualifications]int64,
254+
bucketCapacity [numResourceTiers][numBurstQualifications]int64,
255+
) {
256+
stg.mu.Lock()
257+
defer stg.mu.Unlock()
258+
259+
for wc := range stg.mu.buckets {
260+
for kind := range stg.mu.buckets[wc] {
261+
tokens := stg.mu.buckets[wc][kind].tokens + delta[wc][kind]
262+
if tokens > bucketCapacity[wc][kind] {
263+
tokens = bucketCapacity[wc][kind]
264+
}
265+
stg.mu.buckets[wc][kind].tokens = tokens
266+
}
267+
}
268+
269+
// delta is always positive, thus refill should always attempt to grant
270+
// admission to waiting requests.
271+
stg.grantUntilNoWaitingRequestsLocked()
272+
}

pkg/util/admission/cpu_time_token_granter_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,24 @@ func TestCPUTimeTokenGranter(t *testing.T) {
149149
requesters[scanResourceTier(t, d)].tookWithoutPermission(int64(v))
150150
return flushAndReset(false /* init */)
151151

152+
case "refill":
153+
// The delta & the bucket capacity are hard-coded. It is unwiedly
154+
// to make them data-driven arguments, and the payoff would be
155+
// low anyway.
156+
var delta [numResourceTiers][numBurstQualifications]int64
157+
delta[testTier0][canBurst] = 5
158+
delta[testTier0][noBurst] = 4
159+
delta[testTier1][canBurst] = 3
160+
delta[testTier1][noBurst] = 1
161+
var bucketCapacity [numResourceTiers][numBurstQualifications]int64
162+
bucketCapacity[testTier0][canBurst] = 4
163+
bucketCapacity[testTier0][noBurst] = 3
164+
bucketCapacity[testTier1][canBurst] = 10
165+
bucketCapacity[testTier1][noBurst] = 1
166+
granter.refill(delta, bucketCapacity)
167+
fmt.Fprintf(&buf, "refill(%v %v)\n", delta, bucketCapacity)
168+
return flushAndReset(false /* init */)
169+
152170
// For cpuTimeTokenChildGranter, this is a NOP. Still, it will be
153171
// called in production. So best to test it doesn't panic, or similar.
154172
case "continue-grant-chain":

0 commit comments

Comments
 (0)