Skip to content

Commit f456f73

Browse files
Merge pull request #174 from tylertreat/gosched_changes
RM-21797 Remove Gosched call limiting
2 parents 3620bd0 + 6c7399c commit f456f73

File tree

2 files changed

+40
-14
lines changed

2 files changed

+40
-14
lines changed

queue/ring.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ func (rb *RingBuffer) Offer(item interface{}) (bool, error) {
8787
func (rb *RingBuffer) put(item interface{}, offer bool) (bool, error) {
8888
var n *node
8989
pos := atomic.LoadUint64(&rb.queue)
90-
i := 0
9190
L:
9291
for {
9392
if atomic.LoadUint64(&rb.disposed) == 1 {
@@ -111,12 +110,7 @@ L:
111110
return false, nil
112111
}
113112

114-
if i == 10000 {
115-
runtime.Gosched() // free up the cpu before the next iteration
116-
i = 0
117-
} else {
118-
i++
119-
}
113+
runtime.Gosched() // free up the cpu before the next iteration
120114
}
121115

122116
n.data = item
@@ -141,7 +135,6 @@ func (rb *RingBuffer) Poll(timeout time.Duration) (interface{}, error) {
141135
var (
142136
n *node
143137
pos = atomic.LoadUint64(&rb.dequeue)
144-
i = 0
145138
start time.Time
146139
)
147140
if timeout > 0 {
@@ -170,12 +163,7 @@ L:
170163
return nil, ErrTimeout
171164
}
172165

173-
if i == 10000 {
174-
runtime.Gosched() // free up the cpu before the next iteration
175-
i = 0
176-
} else {
177-
i++
178-
}
166+
runtime.Gosched() // free up the cpu before the next iteration
179167
}
180168
data := n.data
181169
n.data = nil

queue/ring_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,44 @@ func BenchmarkRBLifeCycle(b *testing.B) {
328328
wg.Wait()
329329
}
330330

331+
func BenchmarkRBLifeCycleContention(b *testing.B) {
332+
rb := NewRingBuffer(64)
333+
334+
var wwg sync.WaitGroup
335+
var rwg sync.WaitGroup
336+
wwg.Add(10)
337+
rwg.Add(10)
338+
339+
for i := 0; i < 10; i++ {
340+
go func() {
341+
for {
342+
_, err := rb.Get()
343+
if err == ErrDisposed {
344+
rwg.Done()
345+
return
346+
} else {
347+
assert.Nil(b, err)
348+
}
349+
}
350+
}()
351+
}
352+
353+
b.ResetTimer()
354+
355+
for i := 0; i < 10; i++ {
356+
go func() {
357+
for j := 0; j < b.N; j++ {
358+
rb.Put(j)
359+
}
360+
wwg.Done()
361+
}()
362+
}
363+
364+
wwg.Wait()
365+
rb.Dispose()
366+
rwg.Wait()
367+
}
368+
331369
func BenchmarkRBPut(b *testing.B) {
332370
rb := NewRingBuffer(uint64(b.N))
333371

0 commit comments

Comments
 (0)