-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrungroup.go
138 lines (117 loc) · 3.45 KB
/
rungroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package rungroup
import (
"context"
"errors"
"fmt"
"sync"
"github.com/bharat-rajani/rungroup/pkg/concurrent"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
// The errMap contains errors for all goroutines where key is id of goroutine.
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
errMap *concurrent.ConcurrentMap
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go or GoWithFunc
// returns a non-nil error on interruptor routine
// or the first time Wait returns, whichever occurs first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// WithContextErrorMap takes context(ctx) and concurrent map and
// returns a new Group and an associated Context derived from ctx.
//
// The only difference in WithContextErrorMap is that it provides error tracking of goroutines.
// Error tracking uses concurrent map where key is routine id and value is error from goroutine.
func WithContextErrorMap(ctx context.Context, errMap concurrent.ConcurrentMap) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel, errMap: &errMap}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from interrupter routines.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group if its interruptor routine ; its error will be
// returned by Wait.
// Interrupter is a flag signifies if a goroutine can interrupt other goroutines in group.
func (g *Group) Go(f func() error, interrupter bool, id string) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
if g.errMap != nil {
(*g.errMap).Store(id, err)
}
if interrupter {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}
}()
}
// GetErrByID returns the error associated with goroutine id
func (g *Group) GetErrByID(id string) (error, bool) {
if g.errMap == nil {
return ErrGroupNilMap, false
}
errVal, ok := (*g.errMap).Load(id)
if ok {
castedErrVal, ok := errVal.(error)
if ok {
return castedErrVal, ok
} else {
return fmt.Errorf("cannot cast %v into error", errVal), false
}
}
return nil, ok
}
// GoWithFunc is a closure over a normal input func.
// This is done to ensure that all goroutines wait for error(func execution)
// or context cancellation.
func (g *Group) GoWithFunc(f func(ctx context.Context) error,
ctx context.Context,
interrupter bool, id string) {
gFunc := func() error {
errChan := make(chan error, 1)
closure := func(errChan chan<- error) {
defer func() {
close(errChan)
}()
err := f(ctx)
if err != nil {
errChan <- err
return
}
}
closure(errChan)
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
return err
}
}
g.Go(gFunc, interrupter, id)
}
// ErrGroupNilMap is thrown while calling group.GetErrByID() when Group.errMap is nil
var ErrGroupNilMap error = errors.New("uninitialized error map in rungroup, use: WithContextErrMap")