Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 添加 MultipleBytes 实现多协程安全读写 #272

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions iox/multiple_bytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package iox

import (
"io"
"sync"
)

// MultipleBytes 是一个实现了 io.Reader 和 io.Writer 接口的结构体
// 它可以安全地在多个 goroutine 之间共享
type MultipleBytes struct {
data [][]byte
idx1 int // 第几个切片
idx2 int // data[idx1] 中的下标
mutex sync.RWMutex
}

// NewMultipleBytes 创建一个新的 MultipleBytes 实例
// capacity 参数用于预分配内部缓冲区的容量
func NewMultipleBytes(capacity int) *MultipleBytes {
return &MultipleBytes{
data: make([][]byte, 0, 1),
}
}

// Read 实现 io.Reader 接口
// 从当前位置读取数据到 p 中,如果没有数据可读返回 io.EOF
func (m *MultipleBytes) Read(p []byte) (n int, err error) {
m.mutex.RLock()
defer m.mutex.RUnlock()

// 如果没有数据或者已经读完了所有数据
if len(m.data) == 0 || (m.idx1 >= len(m.data)) {
return 0, io.EOF
}

totalRead := 0
for m.idx1 < len(m.data) {
currentSlice := m.data[m.idx1]
remaining := len(currentSlice) - m.idx2
if remaining <= 0 {
m.idx1++
m.idx2 = 0
continue
}

toRead := len(p) - totalRead
if toRead <= 0 {
break
}

if remaining > toRead {
n = copy(p[totalRead:], currentSlice[m.idx2:m.idx2+toRead])
m.idx2 += n
} else {
n = copy(p[totalRead:], currentSlice[m.idx2:])
m.idx1++
m.idx2 = 0
}
totalRead += n
}

return totalRead, nil
}

// Write 实现 io.Writer 接口
// 将 p 中的数据写入到内部缓冲区
func (m *MultipleBytes) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}

m.mutex.Lock()
defer m.mutex.Unlock()

// 创建新的切片来存储数据
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不需要,我们就是为了规避 copy 而引入的,所以这里不要 copy,直接 append

newSlice := make([]byte, len(p))
copy(newSlice, p)
m.data = append(m.data, newSlice)

return len(p), nil
}

// Len 返回当前缓冲区中的数据总长度
func (m *MultipleBytes) Len() int {
m.mutex.RLock()
defer m.mutex.RUnlock()

total := 0
for _, slice := range m.data {
total += len(slice)
}
return total
}

// Cap 返回当前缓冲区的容量
func (m *MultipleBytes) Cap() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个方法先不需要实现,因为目前来看没什么使用场景,而且 Cap 没有啥意义,因为是允许扩容的

m.mutex.RLock()
defer m.mutex.RUnlock()

total := 0
for _, slice := range m.data {
total += cap(slice)
}
return total
}

// Reset 重置读取位置到开始处
func (m *MultipleBytes) Reset() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.idx1 = 0
m.idx2 = 0
}

// Clear 清空缓冲区并重置读取位置
func (m *MultipleBytes) Clear() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同样不需要

m.mutex.Lock()
defer m.mutex.Unlock()
m.data = m.data[:0]
m.idx1 = 0
m.idx2 = 0
}

// Bytes 返回内部缓冲区的副本
func (m *MultipleBytes) Bytes() []byte {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个也不需要

m.mutex.RLock()
defer m.mutex.RUnlock()

total := 0
for _, slice := range m.data {
total += len(slice)
}

result := make([]byte, total)
pos := 0
for _, slice := range m.data {
pos += copy(result[pos:], slice)
}

return result
}
147 changes: 147 additions & 0 deletions iox/multiple_bytes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package iox

import (
"io"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMultipleBytes_ReadWrite(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write 测试用例比较简单,但是 Read 需要更加复杂的测试用例

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read 要考虑更加多的边缘场景,包括但不限于:

  • 只有一个 slice,读够了,没读够;
  • 多个 slice,读数据横跨了多个 slice 才读够、恰好读够,没读够;
  • idx1 和 idx2 指向第一个、最好一个的交叉验证;

testCases := []struct {
name string
write []byte
readSize int
wantRead []byte
wantErr error
}{
{
name: "empty read",
write: []byte{},
readSize: 1,
wantRead: []byte{},
wantErr: io.EOF,
},
{
name: "single byte",
write: []byte{1},
readSize: 1,
wantRead: []byte{1},
wantErr: nil,
},
{
name: "multiple bytes",
write: []byte{1, 2, 3, 4, 5},
readSize: 3,
wantRead: []byte{1, 2, 3},
wantErr: nil,
},
{
name: "read more than available",
write: []byte{1, 2},
readSize: 4,
wantRead: []byte{1, 2},
wantErr: nil,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mb := NewMultipleBytes(len(tc.write))
n, err := mb.Write(tc.write)
assert.Equal(t, len(tc.write), n)
assert.Nil(t, err)

read := make([]byte, tc.readSize)
n, err = mb.Read(read)
if tc.wantErr != nil {
assert.Equal(t, tc.wantErr, err)
} else {
assert.Nil(t, err)
assert.Equal(t, tc.wantRead, read[:n])
}
})
}
}

func TestMultipleBytes_Reset(t *testing.T) {
mb := NewMultipleBytes(4)
data := []byte{1, 2, 3, 4}

// 写入数据
n, err := mb.Write(data)
assert.Equal(t, len(data), n)
assert.Nil(t, err)

// 第一次读取
read := make([]byte, 2)
n, err = mb.Read(read)
assert.Equal(t, 2, n)
assert.Nil(t, err)
assert.Equal(t, []byte{1, 2}, read)

// 重置
mb.Reset()

// 重置后再次读取
read = make([]byte, 4)
n, err = mb.Read(read)
assert.Equal(t, 4, n)
assert.Nil(t, err)
assert.Equal(t, data, read)
}

func TestMultipleBytes_Clear(t *testing.T) {
mb := NewMultipleBytes(4)
data := []byte{1, 2, 3, 4}

// 写入数据
n, err := mb.Write(data)
assert.Equal(t, len(data), n)
assert.Nil(t, err)

// 清空
mb.Clear()

// 清空后读取
read := make([]byte, 1)
n, err = mb.Read(read)
assert.Equal(t, 0, n)
assert.Equal(t, io.EOF, err)

// 验证长度
assert.Equal(t, 0, mb.Len())
}

func TestMultipleBytes_Bytes(t *testing.T) {
mb := NewMultipleBytes(4)
data := []byte{1, 2, 3, 4}

// 写入数据
n, err := mb.Write(data)
assert.Equal(t, len(data), n)
assert.Nil(t, err)

// 获取副本
copy := mb.Bytes()
assert.Equal(t, data, copy)

// 修改副本不影响原数据
copy[0] = 5
original := mb.Bytes()
assert.Equal(t, data, original)
}