Skip to content

taskgroup: deprecate Trigger and Listen #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ExampleTrigger() {
const badTask = 5

// Construct a group in which any task error cancels the context.
g := taskgroup.New(taskgroup.Trigger(cancel))
g := taskgroup.New(cancel)

for i := range 10 {
g.Go(func() error {
Expand All @@ -67,10 +67,11 @@ func ExampleTrigger() {
func ExampleListen() {
// The taskgroup itself will only report the first non-nil task error, but
// you can use an error listener used to accumulate all of them.
// Calls to the listener are synchronized, so we don't need a lock.
var all []error
g := taskgroup.New(taskgroup.Listen(func(e error) {
g := taskgroup.New(func(e error) {
all = append(all, e)
}))
})
g.Go(func() error { return errors.New("badness 1") })
g.Go(func() error { return errors.New("badness 2") })
g.Go(func() error { return errors.New("badness 3") })
Expand Down
2 changes: 1 addition & 1 deletion examples/copytree/copytree.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
}

ctx, cancel := context.WithCancel(context.Background())
g, start := taskgroup.New(taskgroup.Trigger(cancel)).Limit(*maxWorkers)
g, start := taskgroup.New(cancel).Limit(*maxWorkers)

err := filepath.Walk(*srcPath, func(path string, fi os.FileInfo, err error) error {
if err != nil {
Expand Down
85 changes: 67 additions & 18 deletions taskgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package taskgroup

import (
"fmt"
"reflect"
"sync"
"sync/atomic"
)
Expand All @@ -31,9 +33,9 @@ type Group struct {
// path reads active and only acquires μ if it discovers setup is needed.
active atomic.Uint32

μ sync.Mutex // guards err
μ sync.Mutex // guards the fields below
err error // error returned from Wait
onError ErrorFunc // called each time a task returns non-nil
onError errorFunc // called each time a task returns non-nil
}

// activate resets the state of the group and marks it as active. This is
Expand All @@ -50,20 +52,63 @@ func (g *Group) activate() {
// New constructs a new empty group with the specified error filter.
// See [Group.OnError] for a description of how errors are filtered.
// If ef == nil, no filtering is performed.
func New(ef ErrorFunc) *Group { return new(Group).OnError(ef) }
func New(ef any) *Group { return new(Group).OnError(ef) }

var (
triggerType = reflect.TypeOf(func() {})
listenType = reflect.TypeOf(func(error) {})
filterType = reflect.TypeOf(func(error) error { return nil })
)

// OnError sets the error filter for g. If ef == nil, the error filter is
// removed and errors are no longer filtered. Otherwise, each non-nil error
// reported by a task running in g is passed to ef, and the value it returns
// replaces the task's error.
// reported by a task running in g is passed to ef.
//
// The concrete type of ef must be a function with one of the following
// signature schemes, or OnError will panic.
//
// If ef is:
//
// func()
//
// then ef is called once per reported error, and the error is not modified.
//
// If ef is:
//
// func(error)
//
// then ef is called with each reported error, and the error is not modified.
//
// If ef is:
//
// func(error) error
//
// then ef is called with each reported error, and its result replaces the
// reported value. This permits ef to suppress or replace the error value
// selectively.
//
// Calls to ef are synchronized so that it is safe for ef to manipulate local
// data structures without additional locking. It is safe to call OnError while
// tasks are active in g.
func (g *Group) OnError(ef ErrorFunc) *Group {
func (g *Group) OnError(ef any) *Group {
var filter errorFunc
v := reflect.ValueOf(ef)
if !v.IsValid() {
// OK, ef == nil, nothing to do
} else if t := v.Type(); t.ConvertibleTo(triggerType) {
f := v.Convert(triggerType).Interface().(func())
filter = func(err error) error { f(); return err }
} else if t.ConvertibleTo(listenType) {
f := v.Convert(listenType).Interface().(func(error))
filter = func(err error) error { f(err); return err }
} else if t.ConvertibleTo(filterType) {
filter = errorFunc(v.Convert(filterType).Interface().(func(error) error))
} else {
panic(fmt.Sprintf("unsupported filter type %T", ef))
}
g.μ.Lock()
defer g.μ.Unlock()
g.onError = ef
g.onError = filter
return g
}

Expand Down Expand Up @@ -99,7 +144,7 @@ func (g *Group) handleError(err error) {
// Wait blocks until all the goroutines currently active in the group have
// returned, and all reported errors have been delivered to the callback. It
// returns the first non-nil error reported by any of the goroutines in the
// group and not filtered by an ErrorFunc.
// group and not filtered by an OnError callback.
//
// As with sync.WaitGroup, new tasks can be added to g during a call to Wait
// only if the group contains at least one active task when Wait is called and
Expand All @@ -119,25 +164,29 @@ func (g *Group) Wait() error {
return g.err
}

// An ErrorFunc is called by a group each time a task reports an error. Its
// return value replaces the reported error, so the ErrorFunc can filter or
// An errorFunc is called by a group each time a task reports an error. Its
// return value replaces the reported error, so the errorFunc can filter or
// suppress errors by modifying or discarding the input error.
type ErrorFunc func(error) error
type errorFunc func(error) error

func (ef ErrorFunc) filter(err error) error {
func (ef errorFunc) filter(err error) error {
if ef == nil {
return err
}
return ef(err)
}

// Trigger creates an ErrorFunc that calls f each time a task reports an error.
// The resulting ErrorFunc returns task errors unmodified.
func Trigger(f func()) ErrorFunc { return func(e error) error { f(); return e } }
// Trigger creates an OnError callback that calls f each time a task reports an
// error. The resulting callback returns task errors unmodified.
//
// Deprecated: Pass f directly to [New] or [Group.OnError].
func Trigger(f func()) any { return f }

// Listen creates an ErrorFunc that reports each non-nil task error to f. The
// resulting ErrorFunc returns task errors unmodified.
func Listen(f func(error)) ErrorFunc { return func(e error) error { f(e); return e } }
// Listen creates an OnError callback that reports each non-nil task error to
// f. The resulting callback returns task errors unmodified.
//
// Deprecated: Pass f directly to [New] or [Group.OnError].
func Listen(f func(error)) any { return f }

// NoError adapts f to a Task that executes f and reports a nil error.
func NoError(f func()) Task { return func() error { f(); return nil } }
Expand Down