Skip to content

Commit 3620bd0

Browse files
Merge pull request #173 from tylertreat/rb_poll
RM-19979 Add Poll method to RingBuffer
2 parents 01ff43b + 3c362a2 commit 3620bd0

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

queue/ring.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package queue
1919
import (
2020
"runtime"
2121
"sync/atomic"
22+
"time"
2223
)
2324

2425
// roundUp takes a uint64 greater than 0 and rounds it up to the next
@@ -128,9 +129,24 @@ L:
128129
// to the queue or Dispose is called on the queue. An error will be returned
129130
// if the queue is disposed.
130131
func (rb *RingBuffer) Get() (interface{}, error) {
131-
var n *node
132-
pos := atomic.LoadUint64(&rb.dequeue)
133-
i := 0
132+
return rb.Poll(0)
133+
}
134+
135+
// Poll will return the next item in the queue. This call will block
136+
// if the queue is empty. This call will unblock when an item is added
137+
// to the queue, Dispose is called on the queue, or the timeout is reached. An
138+
// error will be returned if the queue is disposed or a timeout occurs. A
139+
// non-positive timeout will block indefinitely.
140+
func (rb *RingBuffer) Poll(timeout time.Duration) (interface{}, error) {
141+
var (
142+
n *node
143+
pos = atomic.LoadUint64(&rb.dequeue)
144+
i = 0
145+
start time.Time
146+
)
147+
if timeout > 0 {
148+
start = time.Now()
149+
}
134150
L:
135151
for {
136152
if atomic.LoadUint64(&rb.disposed) == 1 {
@@ -150,6 +166,10 @@ L:
150166
pos = atomic.LoadUint64(&rb.dequeue)
151167
}
152168

169+
if timeout > 0 && time.Since(start) >= timeout {
170+
return nil, ErrTimeout
171+
}
172+
153173
if i == 10000 {
154174
runtime.Gosched() // free up the cpu before the next iteration
155175
i = 0

queue/ring_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"sync"
2121
"sync/atomic"
2222
"testing"
23+
"time"
2324

2425
"github.com/stretchr/testify/assert"
2526
)
@@ -176,6 +177,54 @@ func TestRingGetEmpty(t *testing.T) {
176177
wg.Wait()
177178
}
178179

180+
func TestRingPollEmpty(t *testing.T) {
181+
rb := NewRingBuffer(3)
182+
183+
_, err := rb.Poll(1)
184+
assert.Equal(t, ErrTimeout, err)
185+
}
186+
187+
func TestRingPoll(t *testing.T) {
188+
rb := NewRingBuffer(10)
189+
190+
// should be able to Poll() before anything is present, without breaking future Puts
191+
rb.Poll(time.Millisecond)
192+
193+
rb.Put(`test`)
194+
result, err := rb.Poll(0)
195+
if !assert.Nil(t, err) {
196+
return
197+
}
198+
199+
assert.Equal(t, `test`, result)
200+
assert.Equal(t, uint64(0), rb.Len())
201+
202+
rb.Put(`1`)
203+
rb.Put(`2`)
204+
205+
result, err = rb.Poll(time.Millisecond)
206+
if !assert.Nil(t, err) {
207+
return
208+
}
209+
210+
assert.Equal(t, `1`, result)
211+
assert.Equal(t, uint64(1), rb.Len())
212+
213+
result, err = rb.Poll(time.Millisecond)
214+
if !assert.Nil(t, err) {
215+
return
216+
}
217+
218+
assert.Equal(t, `2`, result)
219+
220+
before := time.Now()
221+
_, err = rb.Poll(5 * time.Millisecond)
222+
// This delta is normally 1-3 ms but running tests in CI with -race causes
223+
// this to run much slower. For now, just bump up the threshold.
224+
assert.InDelta(t, 5, time.Since(before).Seconds()*1000, 10)
225+
assert.Equal(t, ErrTimeout, err)
226+
}
227+
179228
func TestRingLen(t *testing.T) {
180229
rb := NewRingBuffer(4)
181230
assert.Equal(t, uint64(0), rb.Len())

0 commit comments

Comments
 (0)