Skip to content

Commit

Permalink
fix issue gogf#2509 (gogf#2530)
Browse files Browse the repository at this point in the history
  • Loading branch information
gqcn authored Mar 21, 2023
1 parent 3a8fc1e commit 676022e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
29 changes: 13 additions & 16 deletions container/gqueue/gqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
// Queue is a concurrent-safe queue built on doubly linked list and channel.
type Queue struct {
limit int // Limit for queue size.
length *gtype.Int64 // Queue length.
list *glist.List // Underlying list structure for data maintaining.
closed *gtype.Bool // Whether queue is closed.
events chan struct{} // Events for data writing.
Expand All @@ -45,7 +44,6 @@ const (
func New(limit ...int) *Queue {
q := &Queue{
closed: gtype.NewBool(),
length: gtype.NewInt64(),
}
if len(limit) > 0 && limit[0] > 0 {
q.limit = limit[0]
Expand All @@ -62,7 +60,6 @@ func New(limit ...int) *Queue {
// Push pushes the data `v` into the queue.
// Note that it would panic if Push is called after the queue is closed.
func (q *Queue) Push(v interface{}) {
q.length.Add(1)
if q.limit > 0 {
q.C <- v
} else {
Expand All @@ -76,9 +73,7 @@ func (q *Queue) Push(v interface{}) {
// Pop pops an item from the queue in FIFO way.
// Note that it would return nil immediately if Pop is called after the queue is closed.
func (q *Queue) Pop() interface{} {
item := <-q.C
q.length.Add(-1)
return item
return <-q.C
}

// Close closes the queue.
Expand All @@ -101,13 +96,18 @@ func (q *Queue) Close() {
}

// Len returns the length of the queue.
// Note that the result might not be accurate as there's an
// Note that the result might not be accurate if using unlimited queue size as there's an
// asynchronous channel reading the list constantly.
func (q *Queue) Len() (length int64) {
return q.length.Val()
bufferedSize := int64(len(q.C))
if q.limit > 0 {
return bufferedSize
}
return int64(q.list.Size()) + bufferedSize
}

// Size is alias of Len.
// Deprecated: use Len instead.
func (q *Queue) Size() int64 {
return q.Len()
}
Expand All @@ -123,14 +123,11 @@ func (q *Queue) asyncLoopFromListToChannel() {
for !q.closed.Val() {
<-q.events
for !q.closed.Val() {
if length := q.list.Len(); length > 0 {
if length > defaultBatchSize {
length = defaultBatchSize
}
for _, v := range q.list.PopFronts(length) {
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
// If any error occurs here, it will be caught by recover and be ignored.
q.C <- v
if bufferLength := q.list.Len(); bufferLength > 0 {
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
// If any error occurs here, it will be caught by recover and be ignored.
for i := 0; i < bufferLength; i++ {
q.C <- q.list.PopFront()
}
} else {
break
Expand Down
45 changes: 39 additions & 6 deletions container/gqueue/gqueue_z_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,31 @@ import (

func TestQueue_Len(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
max := 100
for n := 10; n < max; n++ {
q1 := gqueue.New(max)
for i := 0; i < max; i++ {
var (
maxNum = 100
maxTries = 100
)
for n := 10; n < maxTries; n++ {
q1 := gqueue.New(maxNum)
for i := 0; i < maxNum; i++ {
q1.Push(i)
}
t.Assert(q1.Len(), max)
t.Assert(q1.Size(), max)
t.Assert(q1.Len(), maxNum)
t.Assert(q1.Size(), maxNum)
}
})
gtest.C(t, func(t *gtest.T) {
var (
maxNum = 100
maxTries = 100
)
for n := 10; n < maxTries; n++ {
q1 := gqueue.New()
for i := 0; i < maxNum; i++ {
q1.Push(i)
}
t.AssertLE(q1.Len(), maxNum)
t.AssertLE(q1.Size(), maxNum)
}
})
}
Expand Down Expand Up @@ -71,3 +88,19 @@ func TestQueue_Close(t *testing.T) {
q1.Close()
})
}

func Test_Issue2509(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
q := gqueue.New()
q.Push(1)
q.Push(2)
q.Push(3)
t.Assert(q.Len(), 3)
t.Assert(<-q.C, 1)
t.Assert(q.Len(), 2)
t.Assert(<-q.C, 2)
t.Assert(q.Len(), 1)
t.Assert(<-q.C, 3)
t.Assert(q.Len(), 0)
})
}

0 comments on commit 676022e

Please sign in to comment.