Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

taskgroup: add an explicit Throttle type #6

Merged
merged 2 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions taskgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,17 @@ func Listen(f func(error)) ErrorFunc { return func(e error) error { f(e); return
// NoError adapts f to a Task that executes f and reports a nil error.
func NoError(f func()) Task { return func() error { f(); return nil } }

// Limit returns g and a function that starts each task passed to it in g,
// allowing no more than n tasks to be active concurrently. If n ≤ 0, the
// start function is equivalent to g.Go, which enforces no limit.
// Limit returns g and a "start" function that starts each task passed to it in
// g, allowing no more than n tasks to be active concurrently. If n ≤ 0, no
// limit is enforced.
//
// The limiting mechanism is optional, and the underlying group is not
// restricted. A call to the start function will block until a slot is
// available, but calling g.Go directly will add a task unconditionally and
// will not take up a limiter slot.
func (g *Group) Limit(n int) (*Group, func(Task) *Group) {
if n <= 0 {
return g, g.Go
}
adm := make(chan struct{}, n)
return g, func(task Task) *Group {
adm <- struct{}{}
return g.Go(func() error {
defer func() { <-adm }()
return task()
})
}
}
//
// This is a shorthand for constructing a [Throttle] with capacity n and
// calling its Limit method. If n ≤ 0, the start function is equivalent to
// g.Go, which enforces no limit. To share a throttle among multiple groups,
// construct the throttle separately.
func (g *Group) Limit(n int) (*Group, func(Task)) { t := NewThrottle(n); return g, t.Limit(g) }
17 changes: 14 additions & 3 deletions taskgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,21 @@ func TestCapacity(t *testing.T) {

const maxCapacity = 25
const numTasks = 1492
g, start := taskgroup.New(nil).Limit(maxCapacity)

// Verify that multiple groups sharing a throttle respect the combined
// capacity limit.
throttle := taskgroup.NewThrottle(maxCapacity)
var g1, g2 taskgroup.Group
start1 := throttle.Limit(&g1)
start2 := throttle.Limit(&g2)

var p peakValue
var n int32
for i := 0; i < numTasks; i++ {
for i := range numTasks {
start := start1
if i%2 == 1 {
start = start2
}
start(func() error {
p.inc()
defer p.dec()
Expand All @@ -129,7 +139,8 @@ func TestCapacity(t *testing.T) {
return nil
})
}
g.Wait()
g1.Wait()
g2.Wait()
t.Logf("Total tasks completed: %d", n)
if p.max > maxCapacity {
t.Errorf("Exceeded maximum capacity: got %d, want %d", p.max, maxCapacity)
Expand Down
56 changes: 56 additions & 0 deletions throttle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package taskgroup

import "sync/atomic"

// A Throttle rate-limits the number of concurrent goroutines that can execute
// in parallel to some fixed number. A zero Throttle is ready for use, but
// imposes no limit on parallel execution. See [Throttle.Enter] for use.
type Throttle struct {
adm chan struct{}
}

// NewThrottle constructs a [Throttle] with a capacity of n goroutines.
// If n ≤ 0, the resulting Throttle imposes no limit.
func NewThrottle(n int) Throttle {
if n <= 0 {
return Throttle{}
}
return Throttle{adm: make(chan struct{}, n)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extremely mild thought: for large limiters, a mutex+cond+uint may be more efficient? More irritating to program, but it's all hidden away neatly, so maybe?...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some testing on it some while back (before I started using this pattern), and it's actually not—I think the compiler may have some special cases for channel-of-zero-width-values or something.

}

// Enter blocks until a slot is available in t, then returns a [Leaver] that
// the caller must execute to return the slot when it is no longer in use.
func (t Throttle) Enter() Leaver {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This signature almost looks like context.WithCancel... I don't know if there's anything in that, just noticed and wondering if there's some opportunity to move the two closer together. Probably not, but sharing in case it causes any flash of insight.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It kind of does, but I'm not sure I see the rhyme: WithCancel returns the new context and the CancelFunc, on the principle you ought to be using both. But here you kind of already have to have the Throttle in-hand. Am I missing something?

if t.adm == nil {
return func() {}
}
t.adm <- struct{}{}
var done atomic.Bool
return func() {
if done.CompareAndSwap(false, true) {
<-t.adm
}
}
}

// A Leaver returns an in-use throttle slot to its underlying [Throttle].
// It is safe to call a Leaver multiple times; the slot will only be returned
// once.
type Leaver func()

// Leave returns the slot to its [Throttle]. This is a legibility alias for
// calling f.
func (f Leaver) Leave() { f() }

// Limit returns a function that starts each [Task] passed to it in g,
// respecting the rate limit imposed by t. Each call to Limit yields a fresh
// start function, and all the functions returned share the capacity of t.
func (t Throttle) Limit(g *Group) func(Task) {
return func(task Task) {
slot := t.Enter()
g.Go(func() error {
defer slot.Leave()
return task()
})
}
}