Skip to content

Commit 5cecad0

Browse files
authored
Merge pull request #307 from panjf2000/dev
ver: release 2.9.0
2 parents bd6ee4b + fb82167 commit 5cecad0

10 files changed

+587
-39
lines changed

.github/pull_request_template.md

+1-9
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
1-
---
2-
name: Pull request
3-
about: Propose changes to the code
4-
title: ''
5-
labels: ''
6-
assignees: ''
7-
---
8-
91
<!--
10-
Thank you for contributing to `ants`! Please fill this out to help us make the most of your pull request.
2+
Thank you for contributing to `ants`! Please fill this out to help us review your pull request more efficiently.
113
124
Was this change discussed in an issue first? That can help save time in case the change is not a good fit for the project. Not all pull requests get merged.
135

ants.go

+6
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ var (
6666
// ErrTimeout will be returned after the operations timed out.
6767
ErrTimeout = errors.New("operation timed out")
6868

69+
// ErrInvalidPoolIndex will be returned when trying to retrieve a pool with an invalid index.
70+
ErrInvalidPoolIndex = errors.New("invalid pool index")
71+
72+
// ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy.
73+
ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy")
74+
6975
// workerChanCap determines whether the channel of a worker should be a buffered channel
7076
// to get the best performance. Inspired by fasthttp at
7177
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139

ants_benchmark_test.go

+37-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package ants
2525
import (
2626
"runtime"
2727
"sync"
28+
"sync/atomic"
2829
"testing"
2930
"time"
3031

@@ -47,18 +48,22 @@ func demoPoolFunc(args interface{}) {
4748
time.Sleep(time.Duration(n) * time.Millisecond)
4849
}
4950

51+
var stopLongRunningFunc int32
52+
5053
func longRunningFunc() {
51-
for {
54+
for atomic.LoadInt32(&stopLongRunningFunc) == 0 {
5255
runtime.Gosched()
5356
}
5457
}
5558

59+
var stopLongRunningPoolFunc int32
60+
5661
func longRunningPoolFunc(arg interface{}) {
5762
if ch, ok := arg.(chan struct{}); ok {
5863
<-ch
5964
return
6065
}
61-
for {
66+
for atomic.LoadInt32(&stopLongRunningPoolFunc) == 0 {
6267
runtime.Gosched()
6368
}
6469
}
@@ -133,6 +138,24 @@ func BenchmarkAntsPool(b *testing.B) {
133138
}
134139
}
135140

141+
func BenchmarkAntsMultiPool(b *testing.B) {
142+
var wg sync.WaitGroup
143+
p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime))
144+
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck
145+
146+
b.ResetTimer()
147+
for i := 0; i < b.N; i++ {
148+
wg.Add(RunTimes)
149+
for j := 0; j < RunTimes; j++ {
150+
_ = p.Submit(func() {
151+
demoFunc()
152+
wg.Done()
153+
})
154+
}
155+
wg.Wait()
156+
}
157+
}
158+
136159
func BenchmarkGoroutinesThroughput(b *testing.B) {
137160
for i := 0; i < b.N; i++ {
138161
for j := 0; j < RunTimes; j++ {
@@ -165,3 +188,15 @@ func BenchmarkAntsPoolThroughput(b *testing.B) {
165188
}
166189
}
167190
}
191+
192+
func BenchmarkAntsMultiPoolThroughput(b *testing.B) {
193+
p, _ := NewMultiPool(10, PoolCap/10, RoundRobin, WithExpiryDuration(DefaultExpiredTime))
194+
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck
195+
196+
b.ResetTimer()
197+
for i := 0; i < b.N; i++ {
198+
for j := 0; j < RunTimes; j++ {
199+
_ = p.Submit(demoFunc)
200+
}
201+
}
202+
}

ants_test.go

+110
Original file line numberDiff line numberDiff line change
@@ -985,3 +985,113 @@ func TestDefaultPoolReleaseTimeout(t *testing.T) {
985985
err := ReleaseTimeout(2 * time.Second)
986986
assert.NoError(t, err)
987987
}
988+
989+
func TestMultiPool(t *testing.T) {
990+
_, err := NewMultiPool(10, -1, 8)
991+
assert.ErrorIs(t, err, ErrInvalidLoadBalancingStrategy)
992+
993+
mp, err := NewMultiPool(10, 5, RoundRobin)
994+
testFn := func() {
995+
for i := 0; i < 50; i++ {
996+
err = mp.Submit(longRunningFunc)
997+
assert.NoError(t, err)
998+
}
999+
assert.EqualValues(t, mp.Waiting(), 0)
1000+
_, err = mp.WaitingByIndex(-1)
1001+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1002+
_, err = mp.WaitingByIndex(11)
1003+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1004+
assert.EqualValues(t, 50, mp.Running())
1005+
_, err = mp.RunningByIndex(-1)
1006+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1007+
_, err = mp.RunningByIndex(11)
1008+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1009+
assert.EqualValues(t, 0, mp.Free())
1010+
_, err = mp.FreeByIndex(-1)
1011+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1012+
_, err = mp.FreeByIndex(11)
1013+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1014+
assert.EqualValues(t, 50, mp.Cap())
1015+
assert.False(t, mp.IsClosed())
1016+
for i := 0; i < 10; i++ {
1017+
n, _ := mp.WaitingByIndex(i)
1018+
assert.EqualValues(t, 0, n)
1019+
n, _ = mp.RunningByIndex(i)
1020+
assert.EqualValues(t, 5, n)
1021+
n, _ = mp.FreeByIndex(i)
1022+
assert.EqualValues(t, 0, n)
1023+
}
1024+
atomic.StoreInt32(&stopLongRunningFunc, 1)
1025+
assert.NoError(t, mp.ReleaseTimeout(3*time.Second))
1026+
assert.Zero(t, mp.Running())
1027+
assert.True(t, mp.IsClosed())
1028+
atomic.StoreInt32(&stopLongRunningFunc, 0)
1029+
}
1030+
testFn()
1031+
1032+
mp.Reboot()
1033+
testFn()
1034+
1035+
mp, err = NewMultiPool(10, 5, LeastTasks)
1036+
testFn()
1037+
1038+
mp.Reboot()
1039+
testFn()
1040+
1041+
mp.Tune(10)
1042+
}
1043+
1044+
func TestMultiPoolWithFunc(t *testing.T) {
1045+
_, err := NewMultiPoolWithFunc(10, -1, longRunningPoolFunc, 8)
1046+
assert.ErrorIs(t, err, ErrInvalidLoadBalancingStrategy)
1047+
1048+
mp, err := NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, RoundRobin)
1049+
testFn := func() {
1050+
for i := 0; i < 50; i++ {
1051+
err = mp.Invoke(i)
1052+
assert.NoError(t, err)
1053+
}
1054+
assert.EqualValues(t, mp.Waiting(), 0)
1055+
_, err = mp.WaitingByIndex(-1)
1056+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1057+
_, err = mp.WaitingByIndex(11)
1058+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1059+
assert.EqualValues(t, 50, mp.Running())
1060+
_, err = mp.RunningByIndex(-1)
1061+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1062+
_, err = mp.RunningByIndex(11)
1063+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1064+
assert.EqualValues(t, 0, mp.Free())
1065+
_, err = mp.FreeByIndex(-1)
1066+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1067+
_, err = mp.FreeByIndex(11)
1068+
assert.ErrorIs(t, err, ErrInvalidPoolIndex)
1069+
assert.EqualValues(t, 50, mp.Cap())
1070+
assert.False(t, mp.IsClosed())
1071+
for i := 0; i < 10; i++ {
1072+
n, _ := mp.WaitingByIndex(i)
1073+
assert.EqualValues(t, 0, n)
1074+
n, _ = mp.RunningByIndex(i)
1075+
assert.EqualValues(t, 5, n)
1076+
n, _ = mp.FreeByIndex(i)
1077+
assert.EqualValues(t, 0, n)
1078+
}
1079+
atomic.StoreInt32(&stopLongRunningPoolFunc, 1)
1080+
assert.NoError(t, mp.ReleaseTimeout(3*time.Second))
1081+
assert.Zero(t, mp.Running())
1082+
assert.True(t, mp.IsClosed())
1083+
atomic.StoreInt32(&stopLongRunningPoolFunc, 0)
1084+
}
1085+
testFn()
1086+
1087+
mp.Reboot()
1088+
testFn()
1089+
1090+
mp, err = NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, LeastTasks)
1091+
testFn()
1092+
1093+
mp.Reboot()
1094+
testFn()
1095+
1096+
mp.Tune(10)
1097+
}

0 commit comments

Comments
 (0)