Skip to content

Commit 8fecd16

Browse files
neal-zhuzhumaohua
and
zhumaohua
authored
Optimize bytes_queue (#207)
Co-authored-by: zhumaohua <[email protected]>
1 parent 9e4bc32 commit 8fecd16

File tree

3 files changed

+76
-43
lines changed

3 files changed

+76
-43
lines changed

bigcache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ func TestCacheCapacity(t *testing.T) {
426426

427427
// then
428428
assertEqual(t, keys, cache.Len())
429-
assertEqual(t, 81920, cache.Capacity())
429+
assertEqual(t, 40960, cache.Capacity())
430430
}
431431

432432
func TestCacheStats(t *testing.T) {

queue/bytes_queue.go

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,10 @@ import (
77
)
88

99
const (
10-
// Number of bytes used to keep information about entry size
11-
headerEntrySize = 4
10+
// Number of bytes to encode 0 in uvarint format
11+
minimumHeaderSize = 1
1212
// Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index
1313
leftMarginIndex = 1
14-
// Minimum empty blob size in bytes. Empty blob fills space between tail and head in additional memory allocation.
15-
// It keeps entries indexes unchanged
16-
minimumEmptyBlobSize = 32 + headerEntrySize
1714
)
1815

1916
var (
@@ -25,6 +22,7 @@ var (
2522
// BytesQueue is a non-thread safe queue type of fifo based on bytes array.
2623
// For every push operation index of entry is returned. It can be used to read the entry later
2724
type BytesQueue struct {
25+
full bool
2826
array []byte
2927
capacity int
3028
maxCapacity int
@@ -41,6 +39,21 @@ type queueError struct {
4139
message string
4240
}
4341

42+
// getUvarintSize returns the number of bytes to encode x in uvarint format
43+
func getUvarintSize(x uint32) int {
44+
if x < 128 {
45+
return 1
46+
} else if x < 16384 {
47+
return 2
48+
} else if x < 2097152 {
49+
return 3
50+
} else if x < 268435456 {
51+
return 4
52+
} else {
53+
return 5
54+
}
55+
}
56+
4457
// NewBytesQueue initialize new bytes queue.
4558
// Initial capacity is used in bytes array allocation
4659
// When verbose flag is set then information about memory allocation are printed
@@ -49,7 +62,7 @@ func NewBytesQueue(initialCapacity int, maxCapacity int, verbose bool) *BytesQue
4962
array: make([]byte, initialCapacity),
5063
capacity: initialCapacity,
5164
maxCapacity: maxCapacity,
52-
headerBuffer: make([]byte, headerEntrySize),
65+
headerBuffer: make([]byte, binary.MaxVarintLen32),
5366
tail: leftMarginIndex,
5467
head: leftMarginIndex,
5568
rightMargin: leftMarginIndex,
@@ -71,9 +84,10 @@ func (q *BytesQueue) Reset() {
7184
// Returns index for pushed data or error if maximum size queue limit is reached.
7285
func (q *BytesQueue) Push(data []byte) (int, error) {
7386
dataLen := len(data)
87+
headerEntrySize := getUvarintSize(uint32(dataLen))
7488

75-
if q.availableSpaceAfterTail() < dataLen+headerEntrySize {
76-
if q.availableSpaceBeforeHead() >= dataLen+headerEntrySize {
89+
if !q.canInsertAfterTail(dataLen + headerEntrySize) {
90+
if q.canInsertBeforeHead(dataLen + headerEntrySize) {
7791
q.tail = leftMarginIndex
7892
} else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 {
7993
return -1, &queueError{"Full queue. Maximum size limit reached."}
@@ -106,6 +120,7 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
106120
copy(q.array, oldArray[:q.rightMargin])
107121

108122
if q.tail < q.head {
123+
headerEntrySize := getUvarintSize(uint32(q.head - q.tail))
109124
emptyBlobLen := q.head - q.tail - headerEntrySize
110125
q.push(make([]byte, emptyBlobLen), emptyBlobLen)
111126
q.head = leftMarginIndex
@@ -119,14 +134,17 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
119134
}
120135

121136
func (q *BytesQueue) push(data []byte, len int) {
122-
binary.LittleEndian.PutUint32(q.headerBuffer, uint32(len))
137+
headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len))
123138
q.copy(q.headerBuffer, headerEntrySize)
124139

125140
q.copy(data, len)
126141

127142
if q.tail > q.head {
128143
q.rightMargin = q.tail
129144
}
145+
if q.tail == q.head {
146+
q.full = true
147+
}
130148

131149
q.count++
132150
}
@@ -137,10 +155,11 @@ func (q *BytesQueue) copy(data []byte, len int) {
137155

138156
// Pop reads the oldest entry from queue and moves head pointer to the next one
139157
func (q *BytesQueue) Pop() ([]byte, error) {
140-
data, size, err := q.peek(q.head)
158+
data, headerEntrySize, err := q.peek(q.head)
141159
if err != nil {
142160
return nil, err
143161
}
162+
size := len(data)
144163

145164
q.head += headerEntrySize + size
146165
q.count--
@@ -199,32 +218,45 @@ func (q *BytesQueue) peekCheckErr(index int) error {
199218
return errInvalidIndex
200219
}
201220

202-
if index+headerEntrySize >= len(q.array) {
221+
if index >= len(q.array) {
203222
return errIndexOutOfBounds
204223
}
205224
return nil
206225
}
207226

227+
// peek returns the data from index and the number of bytes to encode the length of the data in uvarint format
208228
func (q *BytesQueue) peek(index int) ([]byte, int, error) {
209229
err := q.peekCheckErr(index)
210230
if err != nil {
211231
return nil, 0, err
212232
}
213233

214-
blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize]))
215-
return q.array[index+headerEntrySize : index+headerEntrySize+blockSize], blockSize, nil
234+
blockSize, n := binary.Uvarint(q.array[index:])
235+
return q.array[index+n : index+n+int(blockSize)], n, nil
216236
}
217237

218-
func (q *BytesQueue) availableSpaceAfterTail() int {
238+
// canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue
239+
func (q *BytesQueue) canInsertAfterTail(need int) bool {
240+
if q.full {
241+
return false
242+
}
219243
if q.tail >= q.head {
220-
return q.capacity - q.tail
244+
return q.capacity-q.tail >= need
221245
}
222-
return q.head - q.tail - minimumEmptyBlobSize
246+
// 1. there is exactly need bytes between head and tail, so we do not need
247+
// to reserve extra space for a potential emtpy entry when re-allco this queeu
248+
// 2. still have unused space between tail and head, then we must reserve
249+
// at least headerEntrySize bytes so we can put an empty entry
250+
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
223251
}
224252

225-
func (q *BytesQueue) availableSpaceBeforeHead() int {
253+
// canInsertBeforeHead returns true if it's possible to insert an entry of size of need before the head of the queue
254+
func (q *BytesQueue) canInsertBeforeHead(need int) bool {
255+
if q.full {
256+
return false
257+
}
226258
if q.tail >= q.head {
227-
return q.head - leftMarginIndex - minimumEmptyBlobSize
259+
return q.head-leftMarginIndex == need || q.head-leftMarginIndex >= need+minimumHeaderSize
228260
}
229-
return q.head - q.tail - minimumEmptyBlobSize
261+
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
230262
}

queue/bytes_queue_test.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,13 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereHeadIsBef
145145
queue := NewBytesQueue(25, 0, false)
146146

147147
// when
148-
queue.Push(blob('a', 3)) // header + entry + left margin = 8 bytes
149-
queue.Push(blob('b', 6)) // additional 10 bytes
150-
queue.Pop() // space freed, 7 bytes available at the beginning
151-
queue.Push(blob('c', 6)) // 10 bytes needed, 14 available but not in one segment, allocate additional memory
148+
queue.Push(blob('a', 3)) // header + entry + left margin = 5 bytes
149+
queue.Push(blob('b', 6)) // additional 7 bytes
150+
queue.Pop() // space freed, 4 bytes available at the beginning
151+
queue.Push(blob('c', 6)) // 7 bytes needed, 13 bytes available at the tail
152152

153153
// then
154-
assertEqual(t, 50, queue.Capacity())
154+
assertEqual(t, 25, queue.Capacity())
155155
assertEqual(t, blob('b', 6), pop(queue))
156156
assertEqual(t, blob('c', 6), pop(queue))
157157
}
@@ -163,13 +163,13 @@ func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereHeadIsBefore
163163
queue := NewBytesQueue(25, 0, false)
164164

165165
// when
166-
queue.Push(blob('a', 3)) // header + entry + left margin = 8 bytes
167-
index, _ := queue.Push(blob('b', 6)) // additional 10 bytes
168-
queue.Pop() // space freed, 7 bytes available at the beginning
169-
newestIndex, _ := queue.Push(blob('c', 6)) // 10 bytes needed, 14 available but not in one segment, allocate additional memory
166+
queue.Push(blob('a', 3)) // header + entry + left margin = 5 bytes
167+
index, _ := queue.Push(blob('b', 6)) // additional 7 bytes
168+
queue.Pop() // space freed, 4 bytes available at the beginning
169+
newestIndex, _ := queue.Push(blob('c', 6)) // 7 bytes needed, 13 available at the tail
170170

171171
// then
172-
assertEqual(t, 50, queue.Capacity())
172+
assertEqual(t, 25, queue.Capacity())
173173
assertEqual(t, blob('b', 6), get(queue, index))
174174
assertEqual(t, blob('c', 6), get(queue, newestIndex))
175175
}
@@ -181,19 +181,19 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBef
181181
queue := NewBytesQueue(100, 0, false)
182182

183183
// when
184-
queue.Push(blob('a', 70)) // header + entry + left margin = 75 bytes
185-
queue.Push(blob('b', 10)) // 75 + 10 + 4 = 89 bytes
184+
queue.Push(blob('a', 70)) // header + entry + left margin = 72 bytes
185+
queue.Push(blob('b', 10)) // 72 + 10 + 1 = 83 bytes
186186
queue.Pop() // space freed at the beginning
187-
queue.Push(blob('c', 30)) // 34 bytes used at the beginning, tail pointer is before head pointer
188-
queue.Push(blob('d', 40)) // 44 bytes needed but no available in one segment, allocate new memory
187+
queue.Push(blob('c', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer
188+
queue.Push(blob('d', 40)) // 41 bytes needed but no available in one segment, allocate new memory
189189

190190
// then
191191
assertEqual(t, 200, queue.Capacity())
192192
assertEqual(t, blob('c', 30), pop(queue))
193193
// empty blob fills space between tail and head,
194194
// created when additional memory was allocated,
195195
// it keeps current entries indexes unchanged
196-
assertEqual(t, blob(0, 36), pop(queue))
196+
assertEqual(t, blob(0, 39), pop(queue))
197197
assertEqual(t, blob('b', 10), pop(queue))
198198
assertEqual(t, blob('d', 40), pop(queue))
199199
}
@@ -205,11 +205,11 @@ func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereTailIsBefore
205205
queue := NewBytesQueue(100, 0, false)
206206

207207
// when
208-
queue.Push(blob('a', 70)) // header + entry + left margin = 75 bytes
209-
index, _ := queue.Push(blob('b', 10)) // 75 + 10 + 4 = 89 bytes
208+
queue.Push(blob('a', 70)) // header + entry + left margin = 72 bytes
209+
index, _ := queue.Push(blob('b', 10)) // 72 + 10 + 1 = 83 bytes
210210
queue.Pop() // space freed at the beginning
211-
queue.Push(blob('c', 30)) // 34 bytes used at the beginning, tail pointer is before head pointer
212-
newestIndex, _ := queue.Push(blob('d', 40)) // 44 bytes needed but no available in one segment, allocate new memory
211+
queue.Push(blob('c', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer
212+
newestIndex, _ := queue.Push(blob('d', 40)) // 41 bytes needed but no available in one segment, allocate new memory
213213

214214
// then
215215
assertEqual(t, 200, queue.Capacity())
@@ -225,10 +225,10 @@ func TestAllocateAdditionalSpaceForValueBiggerThanInitQueue(t *testing.T) {
225225

226226
// when
227227
queue.Push(blob('a', 100))
228-
229228
// then
230229
assertEqual(t, blob('a', 100), pop(queue))
231-
assertEqual(t, 230, queue.Capacity())
230+
// 224 = (101 + 11) * 2
231+
assertEqual(t, 224, queue.Capacity())
232232
}
233233

234234
func TestAllocateAdditionalSpaceForValueBiggerThanQueue(t *testing.T) {
@@ -246,7 +246,8 @@ func TestAllocateAdditionalSpaceForValueBiggerThanQueue(t *testing.T) {
246246
queue.Pop()
247247
queue.Pop()
248248
assertEqual(t, make([]byte, 100), pop(queue))
249-
assertEqual(t, 250, queue.Capacity())
249+
// 244 = (101 + 21) * 2
250+
assertEqual(t, 244, queue.Capacity())
250251
}
251252

252253
func TestPopWholeQueue(t *testing.T) {
@@ -343,7 +344,7 @@ func TestMaxSizeLimit(t *testing.T) {
343344
queue.Push(blob('a', 25))
344345
queue.Push(blob('b', 5))
345346
capacity := queue.Capacity()
346-
_, err := queue.Push(blob('c', 15))
347+
_, err := queue.Push(blob('c', 20))
347348

348349
// then
349350
assertEqual(t, 50, capacity)

0 commit comments

Comments
 (0)