-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecutor_errgroup_test.go
156 lines (129 loc) · 3.01 KB
/
executor_errgroup_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package sync
import (
"context"
"sync"
"testing"
"github.com/stretchr/testify/require"
)
func Test_errGroupExecutorRepeated(t *testing.T) {
// iterating these tests many times tends to make problems apparent much more quickly,
// when they may succeed under certain conditions
for i := 0; i < 1000; i++ {
Test_errGroupExecutor(t)
}
}
func Test_errGroupExecutor(t *testing.T) {
// this test sets up specific wait groups to ensure that the maximum concurrency is honored
// by stepping through and holding specific locks while conditions are verified
e := errGroupExecutor{maxConcurrency: 2}
e.g.SetLimit(2)
wg1 := &sync.WaitGroup{}
wg1.Add(1)
wg2 := &sync.WaitGroup{}
wg2.Add(1)
wg3 := &sync.WaitGroup{}
wg3.Add(1)
order := List[string]{}
executed := ""
wgReady := &sync.WaitGroup{}
wgReady.Add(2)
e.Go(func() {
order.Append("pre wg1")
wgReady.Done()
wg1.Wait()
order.Append("post wg1")
executed += "1_"
wg3.Done()
})
e.Go(func() {
order.Append("pre wg2")
wgReady.Done()
wg2.Wait()
order.Append("post wg2")
executed += "2_"
})
wgReady.Wait()
// errgroup execution is blocking, so the next e.Go will block, so continue on the first before we deadlock
wg1.Done()
e.Go(func() {
order.Append("pre wg3")
wg3.Wait()
order.Append("post wg3")
executed += "3_"
wg2.Done()
})
e.Wait(context.Background())
require.Equal(t, "1_3_2_", executed)
require.True(t,
order.indexOf("post wg1") < order.indexOf("post wg3") &&
order.indexOf("post wg3") < order.indexOf("post wg2"),
)
}
func Test_errGroupExecutorCancelRepeat(t *testing.T) {
for i := 0; i < 100; i++ {
Test_errGroupExecutorCancel(t)
}
}
func Test_errGroupExecutorCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
e := &errGroupExecutor{}
e.g.SetLimit(2)
wgs := [3]sync.WaitGroup{}
wns := [3]sync.WaitGroup{}
for i := range wgs {
wns[i].Add(1)
wgs[i].Add(1)
}
executed := [3]bool{}
e.Go(func() {
t.Logf("waiting 0")
wns[0].Done()
wgs[0].Wait()
t.Logf("done 0")
executed[0] = true
})
e.Go(func() {
t.Logf("waiting 1")
wns[1].Done()
wgs[1].Wait()
t.Logf("done 1")
executed[1] = true
})
go func() {
wns[0].Wait()
wns[1].Wait()
// 0 and 1 are currently executing, waiting
cancel()
wns[2].Wait()
e.Go(func() {
t.Logf("waiting 2")
wgs[2].Wait()
t.Logf("done 2")
executed[2] = true
})
for i := range wgs {
wgs[i].Done()
}
}()
// should be waiting in 0, 1 not executed 2
e.Wait(ctx)
wns[2].Done()
// should not have executed 2
require.False(t, executed[2])
}
func Test_errGroupExecutorSubcontext(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
ctx := context.TODO()
ctx = SetContextExecutor(ctx, "", newErrGroupExecutor(1))
ContextExecutor(&ctx, "").Go(func() {
// context should be replaced with a secondary executor
ContextExecutor(&ctx, "").Go(func() {
// context should be replaced again with a tertiary executor
ContextExecutor(&ctx, "").Go(func() {
wg.Done()
})
})
})
wg.Wait() // only done by sub-executor
}