Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 添加 MultipleBytes 实现多协程安全读写 #272

Merged
merged 10 commits into from
Jan 25, 2025
Merged
56 changes: 56 additions & 0 deletions iox/concurrent_multiple_bytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package iox

import (
"sync"
)

// ConcurrentMultipleBytes 是 MultipleBytes 的线程安全装饰器
type ConcurrentMultipleBytes struct {
mb *MultipleBytes
lock sync.Mutex
}

// NewConcurrentMultipleBytes 创建一个新的线程安全的 MultipleBytes 实例
// sliceCount 参数用于预分配内部切片数组的容量
func NewConcurrentMultipleBytes(sliceCount int) *ConcurrentMultipleBytes {
return &ConcurrentMultipleBytes{
mb: NewMultipleBytes(sliceCount),
}
}

// Read 实现 io.Reader 接口
// 从当前位置读取数据到 p 中,如果没有数据可读返回 io.EOF
func (c *ConcurrentMultipleBytes) Read(p []byte) (n int, err error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.mb.Read(p)
}

// Write 实现 io.Writer 接口
// 将 p 中的数据写入到内部缓冲区
func (c *ConcurrentMultipleBytes) Write(p []byte) (n int, err error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.mb.Write(p)
}

// Reset 重置读取位置到开始处
func (c *ConcurrentMultipleBytes) Reset() {
c.lock.Lock()
defer c.lock.Unlock()
c.mb.Reset()
}
124 changes: 124 additions & 0 deletions iox/concurrent_multiple_bytes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package iox

import (
"io"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestConcurrentMultipleBytes(t *testing.T) {
t.Run("基本读写功能", func(t *testing.T) {
cmb := NewConcurrentMultipleBytes(2)
data := []byte{1, 2, 3, 4}

// 写入数据
n, err := cmb.Write(data)
assert.Equal(t, len(data), n)
assert.Nil(t, err)

// 读取数据
read := make([]byte, 4)
n, err = cmb.Read(read)
assert.Equal(t, len(data), n)
assert.Nil(t, err)
assert.Equal(t, data, read[:n])
})

t.Run("并发读写", func(t *testing.T) {
cmb := NewConcurrentMultipleBytes(3)
var wg sync.WaitGroup

// 并发写入
for i := 0; i < 3; i++ {
wg.Add(1)
go func(val byte) {
defer wg.Done()
n, err := cmb.Write([]byte{val})
assert.Equal(t, 1, n)
assert.Nil(t, err)
}(byte(i + 1))
}
wg.Wait()

// 并发读取
results := make([][]byte, 3)
for i := 0; i < 3; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
read := make([]byte, 1)
n, err := cmb.Read(read)
if err != nil && err != io.EOF {
assert.Nil(t, err)
return
}
results[idx] = read[:n]
}(i)
}
wg.Wait()

// 验证总读取字节数
total := 0
for _, res := range results {
total += len(res)
}
assert.Equal(t, 3, total)
})

t.Run("边界场景", func(t *testing.T) {
cmb := NewConcurrentMultipleBytes(1)

// 空切片写入
n, err := cmb.Write([]byte{})
assert.Equal(t, 0, n)
assert.Nil(t, err)

// 空切片读取
read := make([]byte, 1)
n, err = cmb.Read(read)
assert.Equal(t, 0, n)
assert.Equal(t, io.EOF, err)
})

t.Run("Reset功能", func(t *testing.T) {
cmb := NewConcurrentMultipleBytes(1)
data := []byte{1, 2}

// 写入数据
n, err := cmb.Write(data)
assert.Equal(t, len(data), n)
assert.Nil(t, err)

// 读取一部分
read := make([]byte, 1)
n, err = cmb.Read(read)
assert.Equal(t, 1, n)
assert.Nil(t, err)

// 重置
cmb.Reset()

// 重新读取
read = make([]byte, 2)
n, err = cmb.Read(read)
assert.Equal(t, 2, n)
assert.Nil(t, err)
assert.Equal(t, data, read[:n])
})
}
91 changes: 91 additions & 0 deletions iox/multiple_bytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package iox

import (
"io"
)

// MultipleBytes 是一个实现了 io.Reader 和 io.Writer 接口的结构体
// 它可以安全地在多个 goroutine 之间共享
type MultipleBytes struct {
data [][]byte
idx1 int // 第几个切片
idx2 int // data[idx1] 中的下标
}

// NewMultipleBytes 创建一个新的 MultipleBytes 实例
// sliceCount 参数用于预分配内部切片数组的容量
func NewMultipleBytes(sliceCount int) *MultipleBytes {
return &MultipleBytes{
data: make([][]byte, 0, sliceCount),
}
}

// Read 实现 io.Reader 接口
// 从当前位置读取数据到 p 中,如果没有数据可读返回 io.EOF
func (m *MultipleBytes) Read(p []byte) (n int, err error) {
// 如果没有数据或者已经读完了所有数据
if len(m.data) == 0 || (m.idx1 >= len(m.data)) {
return 0, io.EOF
}

totalRead := 0
for m.idx1 < len(m.data) {
currentSlice := m.data[m.idx1]
remaining := len(currentSlice) - m.idx2
if remaining <= 0 {
m.idx1++
m.idx2 = 0
continue
}

toRead := len(p) - totalRead
if toRead <= 0 {
break
}

if remaining > toRead {
n = copy(p[totalRead:], currentSlice[m.idx2:m.idx2+toRead])
m.idx2 += n
} else {
n = copy(p[totalRead:], currentSlice[m.idx2:])
m.idx1++
m.idx2 = 0
}
totalRead += n
}

return totalRead, nil
}

// Write 实现 io.Writer 接口
// 将 p 中的数据写入到内部缓冲区
func (m *MultipleBytes) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}

// 直接将输入切片追加到内部存储
m.data = append(m.data, p)

return len(p), nil
}

// Reset 重置读取位置到开始处
func (m *MultipleBytes) Reset() {
m.idx1 = 0
m.idx2 = 0
}
Loading
Loading