diff --git a/container/gqueue/gqueue.go b/container/gqueue/gqueue.go index 67f9593f87d..5923db1b43b 100644 --- a/container/gqueue/gqueue.go +++ b/container/gqueue/gqueue.go @@ -27,7 +27,6 @@ import ( // Queue is a concurrent-safe queue built on doubly linked list and channel. type Queue struct { limit int // Limit for queue size. - length *gtype.Int64 // Queue length. list *glist.List // Underlying list structure for data maintaining. closed *gtype.Bool // Whether queue is closed. events chan struct{} // Events for data writing. @@ -45,7 +44,6 @@ const ( func New(limit ...int) *Queue { q := &Queue{ closed: gtype.NewBool(), - length: gtype.NewInt64(), } if len(limit) > 0 && limit[0] > 0 { q.limit = limit[0] @@ -62,7 +60,6 @@ func New(limit ...int) *Queue { // Push pushes the data `v` into the queue. // Note that it would panic if Push is called after the queue is closed. func (q *Queue) Push(v interface{}) { - q.length.Add(1) if q.limit > 0 { q.C <- v } else { @@ -76,9 +73,7 @@ func (q *Queue) Push(v interface{}) { // Pop pops an item from the queue in FIFO way. // Note that it would return nil immediately if Pop is called after the queue is closed. func (q *Queue) Pop() interface{} { - item := <-q.C - q.length.Add(-1) - return item + return <-q.C } // Close closes the queue. @@ -101,13 +96,18 @@ func (q *Queue) Close() { } // Len returns the length of the queue. -// Note that the result might not be accurate as there's an +// Note that the result might not be accurate if using unlimited queue size as there's an // asynchronous channel reading the list constantly. func (q *Queue) Len() (length int64) { - return q.length.Val() + bufferedSize := int64(len(q.C)) + if q.limit > 0 { + return bufferedSize + } + return int64(q.list.Size()) + bufferedSize } // Size is alias of Len. +// Deprecated: use Len instead. func (q *Queue) Size() int64 { return q.Len() } @@ -123,14 +123,11 @@ func (q *Queue) asyncLoopFromListToChannel() { for !q.closed.Val() { <-q.events for !q.closed.Val() { - if length := q.list.Len(); length > 0 { - if length > defaultBatchSize { - length = defaultBatchSize - } - for _, v := range q.list.PopFronts(length) { - // When q.C is closed, it will panic here, especially q.C is being blocked for writing. - // If any error occurs here, it will be caught by recover and be ignored. - q.C <- v + if bufferLength := q.list.Len(); bufferLength > 0 { + // When q.C is closed, it will panic here, especially q.C is being blocked for writing. + // If any error occurs here, it will be caught by recover and be ignored. + for i := 0; i < bufferLength; i++ { + q.C <- q.list.PopFront() } } else { break diff --git a/container/gqueue/gqueue_z_unit_test.go b/container/gqueue/gqueue_z_unit_test.go index fb171d4cc7e..9fee57afc36 100644 --- a/container/gqueue/gqueue_z_unit_test.go +++ b/container/gqueue/gqueue_z_unit_test.go @@ -18,14 +18,31 @@ import ( func TestQueue_Len(t *testing.T) { gtest.C(t, func(t *gtest.T) { - max := 100 - for n := 10; n < max; n++ { - q1 := gqueue.New(max) - for i := 0; i < max; i++ { + var ( + maxNum = 100 + maxTries = 100 + ) + for n := 10; n < maxTries; n++ { + q1 := gqueue.New(maxNum) + for i := 0; i < maxNum; i++ { q1.Push(i) } - t.Assert(q1.Len(), max) - t.Assert(q1.Size(), max) + t.Assert(q1.Len(), maxNum) + t.Assert(q1.Size(), maxNum) + } + }) + gtest.C(t, func(t *gtest.T) { + var ( + maxNum = 100 + maxTries = 100 + ) + for n := 10; n < maxTries; n++ { + q1 := gqueue.New() + for i := 0; i < maxNum; i++ { + q1.Push(i) + } + t.AssertLE(q1.Len(), maxNum) + t.AssertLE(q1.Size(), maxNum) } }) } @@ -71,3 +88,19 @@ func TestQueue_Close(t *testing.T) { q1.Close() }) } + +func Test_Issue2509(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New() + q.Push(1) + q.Push(2) + q.Push(3) + t.Assert(q.Len(), 3) + t.Assert(<-q.C, 1) + t.Assert(q.Len(), 2) + t.Assert(<-q.C, 2) + t.Assert(q.Len(), 1) + t.Assert(<-q.C, 3) + t.Assert(q.Len(), 0) + }) +}