Skip to content

Commit

Permalink
taskgroup: deprecate Trigger and Listen (#10)
Browse files Browse the repository at this point in the history
Instead of making the caller explicitly adapt functions to the error filter
interface, make the OnError method (and thereby the New constructor) accept an
argument of dynamic type and do the adaptation automatically.

For now, the Trigger and Listen helpers are still present, but marked as
deprecated. I will remove them in a future release once known existing use is
updated.
  • Loading branch information
creachadair authored Oct 6, 2024
1 parent 68ef45d commit 1e124a5
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 22 deletions.
7 changes: 4 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ExampleTrigger() {
const badTask = 5

// Construct a group in which any task error cancels the context.
g := taskgroup.New(taskgroup.Trigger(cancel))
g := taskgroup.New(cancel)

for i := range 10 {
g.Go(func() error {
Expand All @@ -67,10 +67,11 @@ func ExampleTrigger() {
func ExampleListen() {
// The taskgroup itself will only report the first non-nil task error, but
// you can use an error listener used to accumulate all of them.
// Calls to the listener are synchronized, so we don't need a lock.
var all []error
g := taskgroup.New(taskgroup.Listen(func(e error) {
g := taskgroup.New(func(e error) {
all = append(all, e)
}))
})
g.Go(func() error { return errors.New("badness 1") })
g.Go(func() error { return errors.New("badness 2") })
g.Go(func() error { return errors.New("badness 3") })
Expand Down
2 changes: 1 addition & 1 deletion examples/copytree/copytree.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
}

ctx, cancel := context.WithCancel(context.Background())
g, start := taskgroup.New(taskgroup.Trigger(cancel)).Limit(*maxWorkers)
g, start := taskgroup.New(cancel).Limit(*maxWorkers)

err := filepath.Walk(*srcPath, func(path string, fi os.FileInfo, err error) error {
if err != nil {
Expand Down
85 changes: 67 additions & 18 deletions taskgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package taskgroup

import (
"fmt"
"reflect"
"sync"
"sync/atomic"
)
Expand All @@ -31,9 +33,9 @@ type Group struct {
// path reads active and only acquires μ if it discovers setup is needed.
active atomic.Uint32

μ sync.Mutex // guards err
μ sync.Mutex // guards the fields below
err error // error returned from Wait
onError ErrorFunc // called each time a task returns non-nil
onError errorFunc // called each time a task returns non-nil
}

// activate resets the state of the group and marks it as active. This is
Expand All @@ -50,20 +52,63 @@ func (g *Group) activate() {
// New constructs a new empty group with the specified error filter.
// See [Group.OnError] for a description of how errors are filtered.
// If ef == nil, no filtering is performed.
func New(ef ErrorFunc) *Group { return new(Group).OnError(ef) }
func New(ef any) *Group { return new(Group).OnError(ef) }

var (
triggerType = reflect.TypeOf(func() {})
listenType = reflect.TypeOf(func(error) {})
filterType = reflect.TypeOf(func(error) error { return nil })
)

// OnError sets the error filter for g. If ef == nil, the error filter is
// removed and errors are no longer filtered. Otherwise, each non-nil error
// reported by a task running in g is passed to ef, and the value it returns
// replaces the task's error.
// reported by a task running in g is passed to ef.
//
// The concrete type of ef must be a function with one of the following
// signature schemes, or OnError will panic.
//
// If ef is:
//
// func()
//
// then ef is called once per reported error, and the error is not modified.
//
// If ef is:
//
// func(error)
//
// then ef is called with each reported error, and the error is not modified.
//
// If ef is:
//
// func(error) error
//
// then ef is called with each reported error, and its result replaces the
// reported value. This permits ef to suppress or replace the error value
// selectively.
//
// Calls to ef are synchronized so that it is safe for ef to manipulate local
// data structures without additional locking. It is safe to call OnError while
// tasks are active in g.
func (g *Group) OnError(ef ErrorFunc) *Group {
func (g *Group) OnError(ef any) *Group {
var filter errorFunc
v := reflect.ValueOf(ef)
if !v.IsValid() {
// OK, ef == nil, nothing to do
} else if t := v.Type(); t.ConvertibleTo(triggerType) {
f := v.Convert(triggerType).Interface().(func())
filter = func(err error) error { f(); return err }
} else if t.ConvertibleTo(listenType) {
f := v.Convert(listenType).Interface().(func(error))
filter = func(err error) error { f(err); return err }
} else if t.ConvertibleTo(filterType) {
filter = errorFunc(v.Convert(filterType).Interface().(func(error) error))
} else {
panic(fmt.Sprintf("unsupported filter type %T", ef))
}
g.μ.Lock()
defer g.μ.Unlock()
g.onError = ef
g.onError = filter
return g
}

Expand Down Expand Up @@ -99,7 +144,7 @@ func (g *Group) handleError(err error) {
// Wait blocks until all the goroutines currently active in the group have
// returned, and all reported errors have been delivered to the callback. It
// returns the first non-nil error reported by any of the goroutines in the
// group and not filtered by an ErrorFunc.
// group and not filtered by an OnError callback.
//
// As with sync.WaitGroup, new tasks can be added to g during a call to Wait
// only if the group contains at least one active task when Wait is called and
Expand All @@ -119,25 +164,29 @@ func (g *Group) Wait() error {
return g.err
}

// An ErrorFunc is called by a group each time a task reports an error. Its
// return value replaces the reported error, so the ErrorFunc can filter or
// An errorFunc is called by a group each time a task reports an error. Its
// return value replaces the reported error, so the errorFunc can filter or
// suppress errors by modifying or discarding the input error.
type ErrorFunc func(error) error
type errorFunc func(error) error

func (ef ErrorFunc) filter(err error) error {
func (ef errorFunc) filter(err error) error {
if ef == nil {
return err
}
return ef(err)
}

// Trigger creates an ErrorFunc that calls f each time a task reports an error.
// The resulting ErrorFunc returns task errors unmodified.
func Trigger(f func()) ErrorFunc { return func(e error) error { f(); return e } }
// Trigger creates an OnError callback that calls f each time a task reports an
// error. The resulting callback returns task errors unmodified.
//
// Deprecated: Pass f directly to [New] or [Group.OnError].
func Trigger(f func()) any { return f }

// Listen creates an ErrorFunc that reports each non-nil task error to f. The
// resulting ErrorFunc returns task errors unmodified.
func Listen(f func(error)) ErrorFunc { return func(e error) error { f(e); return e } }
// Listen creates an OnError callback that reports each non-nil task error to
// f. The resulting callback returns task errors unmodified.
//
// Deprecated: Pass f directly to [New] or [Group.OnError].
func Listen(f func(error)) any { return f }

// NoError adapts f to a Task that executes f and reports a nil error.
func NoError(f func()) Task { return func() error { f(); return nil } }
Expand Down

0 comments on commit 1e124a5

Please sign in to comment.