-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcollector_test.go
140 lines (110 loc) · 3.09 KB
/
collector_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package sync
import (
"context"
"iter"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/anchore/go-sync/internal/stats"
)
func Test_CollectCancelRepeat(t *testing.T) {
// iterating these tests many times tends to make problems apparent much more quickly,
// when they may succeed under certain conditions
for i := 0; i < 1000; i++ {
Test_CollectCancel(t)
}
}
func Test_CollectCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
e := &errGroupExecutor{} // use errgroup executor as it will block before executing 3
e.g.SetLimit(2)
ctx = SetContextExecutor(ctx, "", e)
executed3 := false
wg := sync.WaitGroup{}
wg.Add(1)
err := Collect(&ctx, "", ToSeq([]int{1, 2, 3}), func(i int) (string, error) {
switch i {
case 1:
// cancel
cancel()
// ensure 2 doesn't block
wg.Done()
case 2:
// ensure only 1 and 2 execute by waiting here
wg.Wait()
case 3:
executed3 = true
}
return "", nil
}, func(i int, s string) {})
// should not have an error, even though context was canceled
require.NoError(t, err)
// should not have executed 3
require.False(t, executed3)
}
func Test_CollectSlice(t *testing.T) {
const count = 1000
const maxConcurrency = 5
concurrency := stats.Tracked[int]{}
var values []int
ctx := SetContextExecutor(context.Background(), "", NewExecutor(maxConcurrency))
err := CollectSlice(&ctx, "", countIter(count), func(i int) (int, error) {
defer concurrency.Incr()()
time.Sleep(1 * time.Millisecond)
return i * 10, nil
}, &values)
require.NoError(t, err)
require.Len(t, values, count)
for i := 0; i < count; i++ {
require.Contains(t, values, i*10)
}
require.LessOrEqual(t, concurrency.Max(), maxConcurrency)
}
func Test_CollectMap(t *testing.T) {
const count = 1000
const maxConcurrency = 5
concurrency := stats.Tracked[int]{}
values := map[int]int{}
ctx := SetContextExecutor(context.Background(), "", NewExecutor(maxConcurrency))
err := CollectMap(&ctx, "", countIter(count), func(i int) (int, error) {
defer concurrency.Incr()()
time.Sleep(1 * time.Millisecond)
return i * 10, nil
}, values)
require.NoError(t, err)
require.Len(t, values, count)
for i := 0; i < count; i++ {
require.Equal(t, values[i], i*10)
}
require.LessOrEqual(t, concurrency.Max(), maxConcurrency)
}
func Test_Collect2(t *testing.T) {
const count = 1000
const maxConcurrency = 5
concurrency := stats.Tracked[int]{}
values := map[int]int{}
ctx := SetContextExecutor(context.Background(), "", NewExecutor(maxConcurrency))
err := Collect2(&ctx, "", ToIndexSeq(ToSlice(countIter(count))), func(idx, i int) (int, error) {
defer concurrency.Incr()()
time.Sleep(1 * time.Millisecond)
return i * 10, nil
}, func(idx int, i int, out int) {
values[i] = out + idx
})
require.NoError(t, err)
require.Len(t, values, count)
for i := 0; i < count; i++ {
require.Equal(t, values[i], (i*10)+i)
}
require.LessOrEqual(t, concurrency.Max(), maxConcurrency)
}
func countIter(count int) iter.Seq[int] {
return func(yield func(int) bool) {
for i := 0; i < count; i++ {
if !yield(i) {
return
}
}
}
}