Skip to content

Commit

Permalink
Merge pull request #19 from shengyanli1982/dev
Browse files Browse the repository at this point in the history
Fixed a serious bug where the first few bytes were randomly lost
  • Loading branch information
shengyanli1982 authored Apr 12, 2024
2 parents d200c0f + b872f67 commit 1f31e78
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 96 deletions.
35 changes: 0 additions & 35 deletions element.go

This file was deleted.

23 changes: 18 additions & 5 deletions internal/lockfree/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"unsafe"
)

// 定义一个空的结构体,没有任何字段
// Define an empty struct, with no fields
var emptyValue = struct{}{}

// 创建一个新的节点,节点的值为 emptyValue
// Create a new node, the value of the node is emptyValue
var emptyNode = NewNode(emptyValue)

// LockFreeQueue 是一个无锁队列结构体
Expand Down Expand Up @@ -122,28 +126,37 @@ func (q *LockFreeQueue) Pop() interface{} {
// If the next node of the head node is not nil, it means that the tail node is lagging behind, try to set the tail node of the queue to the next node of the head node
compareAndSwapNode(&q.tail, tail, first)
} else {
// 并返回头节点的值
// And return the value of the head node
result := first.value

// 如果头节点不等于尾节点,尝试将队列的头节点设置为头节点的下一个节点
// If the head node is not equal to the tail node, try to set the head node of the queue to the next node of the head node
if compareAndSwapNode(&q.head, head, first) {
// 如果成功,那么减少队列的长度
// If successful, then decrease the length of the queue
atomic.AddUint64(&q.length, ^uint64(0))

// 并返回头节点的值
// And return the value of the head node
result := first.value

// 然后重置头节点
// Then reset the head node
head.Reset()

// 检查结果是否为空值
// Check if the result is an empty value
if result == emptyValue {
// 如果结果是空值,返回 nil
// If the result is an empty value, return nil
return nil
} else {
// 如果结果不是空值,返回结果
// If the result is not an empty value, return the result
return result
}

}

// 如果设置头节点失败,那么将结果设置为 nil
// If setting the head node fails, then set the result to nil
result = nil
}
}
}
Expand Down
91 changes: 91 additions & 0 deletions internal/writer/element.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package writer

import (
"bytes"
"sync"
)

// Element 是一个结构体,包含一个字节缓冲区和一个更新时间戳
// Element is a structure that contains a byte buffer and an update timestamp
type Element struct {
// buffer 是一个字节缓冲区
// buffer is a byte buffer
buffer bytes.Buffer

// updateAt 是一个更新时间戳
// updateAt is an update timestamp
updateAt int64
}

// NewElement 是一个构造函数,用于创建一个新的 Element 实例
// NewElement is a constructor function for creating a new Element instance
func NewElement() *Element {
// 返回一个新的 Element 实例
// Return a new Element instance
return &Element{}
}

func (e *Element) GetBuffer() *bytes.Buffer {
return &e.buffer
}

func (e *Element) GetUpdateAt() int64 {
return e.updateAt
}

func (e *Element) SetUpdateAt(updateAt int64) {
e.updateAt = updateAt
}

// Reset 是 Element 结构体的一个方法,用于重置 Element 的状态
// Reset is a method of the Element structure, used to reset the state of the Element
func (e *Element) Reset() {
// 清空字节缓冲区
// Empty the byte buffer
e.buffer.Reset()

// 重置更新时间戳
// Reset the update timestamp
e.updateAt = 0
}

// ElementPool 是一个结构体,它包含一个同步池
// ElementPool is a struct that contains a sync pool
type ElementPool struct {
pool *sync.Pool
}

// NewElementPool 是一个函数,它创建并返回一个新的 elementPool
// NewElementPool is a function that creates and returns a new elementPool
func NewElementPool() *ElementPool {
// 创建一个新的同步池
// Create a new sync pool
pool := &sync.Pool{
// New 是一个函数,它创建并返回一个新的 Element
// New is a function that creates and returns a new Element
New: func() interface{} {
return NewElement()
},
}

// 返回一个新的 elementPool,它包含刚刚创建的同步池
// Return a new elementPool that contains the sync pool we just created
return &ElementPool{pool: pool}
}

// Get 是一个方法,它从 elementPool 的同步池中获取一个 Element
// Get is a method that gets an Element from the sync pool of the elementPool
func (p *ElementPool) Get() *Element {
return p.pool.Get().(*Element)
}

// Put 是一个方法,它将一个 Element 放回 elementPool 的同步池中
// Put is a method that puts an Element back into the sync pool of the elementPool
func (p *ElementPool) Put(e *Element) {
// 如果 Element 不为空,则重置它并将其放回同步池中
// If the Element is not nil, reset it and put it back into the sync pool
if e != nil {
e.Reset()
p.pool.Put(e)
}
}
126 changes: 70 additions & 56 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

lf "github.com/shengyanli1982/law/internal/lockfree"
wr "github.com/shengyanli1982/law/internal/writer"
)

// 定义默认的心跳间隔为 500 毫秒
Expand Down Expand Up @@ -79,6 +80,10 @@ type WriteAsyncer struct {
// state 用于存储写异步器的状态
// state is used to store the status of the write asyncer
state Status

// elementpool 用于存储元素池
// elementpool is used to store the element pool
elementpool *wr.ElementPool
}

// NewWriteAsyncer 函数用于创建一个新的 WriteAsyncer 实例
Expand Down Expand Up @@ -128,6 +133,10 @@ func NewWriteAsyncer(writer io.Writer, conf *Config) *WriteAsyncer {
// 初始化 wg
// Initialize wg
wg: sync.WaitGroup{},

// 初始化元素池
// Initialize the element pool
elementpool: wr.NewElementPool(),
}

// 创建一个新的 context.Context 实例,并设置一个取消函数
Expand Down Expand Up @@ -198,16 +207,17 @@ func (wa *WriteAsyncer) Write(p []byte) (n int, err error) {

// 从元素池中获取一个元素
// Get an elem from the elem pool
// elem := wa.elementpool.Get()
elem := NewElement()
elem := wa.elementpool.Get()

// 将数据设置到元素的 buffer 字段
// Set the data to the buffer field of the element
elem.buffer = p
if n, err = elem.GetBuffer().Write(p); err != nil {
return
}

// 将当前的时间设置到元素的 updateAt 字段
// Set the current time to the updateAt field of the element
elem.updateAt = wa.timer.Load()
elem.SetUpdateAt(wa.timer.Load())

// 将元素添加到队列
// Add the element to the queue
Expand Down Expand Up @@ -259,50 +269,47 @@ func (wa *WriteAsyncer) poller() {
// 使用无限循环来不断从队列中获取元素
// Use an infinite loop to continuously get elements from the queue
for {
select {
// 如果接收到 ctx.Done 的信号,那么结束循环
// If the ctx.Done signal is received, then end the loop
case <-wa.ctx.Done():
return
// 尝试从队列中弹出一个元素
// Try to pop an element from the queue
elem := wa.queue.Pop()

// 如果等待了一段时间,那么检查 bufferedWriter 中是否有缓冲的数据并且已经超过了空闲超时时间
// If a period of time has passed, then check whether there is buffered data in the bufferedWriter and it has exceeded the idle timeout
case <-heartbeat.C:
// 获取当前时间
// Get the current time
now := wa.timer.Load()

// 计算当前时间与上次执行时间的差值
// Calculate the difference between the current time and the last execution time
diff := now - wa.state.executeAt.Load()

// 如果 bufferedWriter 中有缓冲的数据,并且已经超过了空闲超时时间
// If there is buffered data in the bufferedWriter and it has exceeded the idle timeout
if wa.bufferedWriter.Buffered() > 0 && diff >= defaultIdleTimeout.Milliseconds() {
// 刷新 bufferedWriter,将所有缓冲的数据写入到 writer
// Flush the bufferedWriter, writing all buffered data to the writer
if err := wa.bufferedWriter.Flush(); err != nil {
// 如果在刷新 bufferedWriter 时发生错误,调用 OnWriteFailure 回调函数
// If an error occurs while flushing the bufferedWriter, call the OnWriteFailure callback function
wa.config.callback.OnWriteFailed(nil, err)
// 如果元素不为空,执行 executeFunc 函数
// If the element is not null, execute the executeFunc function
if elem != nil {
wa.executeFunc(elem.(*wr.Element))
} else {
select {
// 如果接收到 ctx.Done 的信号,那么结束循环
// If the ctx.Done signal is received, then end the loop
case <-wa.ctx.Done():
return

// 如果等待了一段时间,那么检查 bufferedWriter 中是否有缓冲的数据并且已经超过了空闲超时时间
// If a period of time has passed, then check whether there is buffered data in the bufferedWriter and it has exceeded the idle timeout
case <-heartbeat.C:
// 获取当前时间
// Get the current time
now := wa.timer.Load()

// 计算当前时间与上次执行时间的差值
// Calculate the difference between the current time and the last execution time
diff := now - wa.state.executeAt.Load()

// 如果 bufferedWriter 中有缓冲的数据,并且已经超过了空闲超时时间
// If there is buffered data in the bufferedWriter and it has exceeded the idle timeout
if wa.bufferedWriter.Buffered() > 0 && diff >= defaultIdleTimeout.Milliseconds() {
// 刷新 bufferedWriter,将所有缓冲的数据写入到 writer
// Flush the bufferedWriter, writing all buffered data to the writer
if err := wa.bufferedWriter.Flush(); err != nil {
// 如果在刷新 bufferedWriter 时发生错误,调用 OnWriteFailure 回调函数
// If an error occurs while flushing the bufferedWriter, call the OnWriteFailure callback function
wa.config.callback.OnWriteFailed(nil, err)
}

// 更新上次执行时间为当前时间
// Update the last execution time to the current time
wa.state.executeAt.Store(now)
}

// 更新上次执行时间为当前时间
// Update the last execution time to the current time
wa.state.executeAt.Store(now)
}

// 默认情况下,尝试从队列中弹出一个元素
// By default, try to pop an element from the queue
default:
// 尝试从队列中弹出一个元素
// Try to pop an element from the queue
elem := wa.queue.Pop()

// 如果元素不为空,执行 executeFunc 函数
// If the element is not null, execute the executeFunc function
if elem != nil {
wa.executeFunc(elem.(*Element))
}
}
}
Expand Down Expand Up @@ -341,7 +348,7 @@ func (wa *WriteAsyncer) updateTimer() {

// executeFunc 方法用于执行 WriteAsyncer 的写入操作
// The executeFunc method is used to perform the write operation of the WriteAsyncer
func (wa *WriteAsyncer) executeFunc(elem *Element) {
func (wa *WriteAsyncer) executeFunc(elem *wr.Element) {
// 获取当前的 Unix 毫秒时间
// Get the current Unix millisecond time
now := wa.timer.Load()
Expand All @@ -350,25 +357,32 @@ func (wa *WriteAsyncer) executeFunc(elem *Element) {
// Update the last execution time to the current time
wa.state.executeAt.Store(now)

// content 是一个变量,它获取 elem 的缓冲区的字节
// content is a variable that gets the bytes of the buffer of elem
content := elem.GetBuffer().Bytes()

// lastUpdateAt 是一个变量,它获取 elem 的更新时间
// lastUpdateAt is a variable that gets the update time of elem
lastUpdateAt := elem.GetUpdateAt()

// 调用回调函数 OnPopQueue
// Call the callback function OnPopQueue
wa.config.callback.OnPopQueue(elem.buffer, now-elem.updateAt)
wa.config.callback.OnPopQueue(content, now-lastUpdateAt)

// 将元素的数据写入到 bufferedWriter
// Write the data of the element to the bufferedWriter
if _, err := wa.flushBufferedWriter(elem.buffer); err != nil {
if _, err := wa.flushBufferedWriter(content); err != nil {
// 如果写入失败,调用回调函数 OnWriteFailure
// If the write fails, call the callback function OnWriteFailure
wa.config.callback.OnWriteFailed(elem.buffer, err)
wa.config.callback.OnWriteFailed(content, err)
} else {
// 如果写入成功,调用回调函数 OnWriteSuccess
// If the write is successful, call the callback function OnWriteSuccess
wa.config.callback.OnWriteSuccess(elem.buffer)
wa.config.callback.OnWriteSuccess(content)
}

// 重置元素的状态
// Reset the state of the element
elem.Reset()
// 将 elem 放回到 elementpool 中
// Put elem back into the elementpool
wa.elementpool.Put(elem)
}

// cleanQueueToWriter 方法用于将队列中的所有数据写入到 writer
Expand All @@ -389,6 +403,6 @@ func (wa *WriteAsyncer) cleanQueueToWriter() {

// 执行写入操作
// Perform the write operation
wa.executeFunc(elem.(*Element))
wa.executeFunc(elem.(*wr.Element))
}
}

0 comments on commit 1f31e78

Please sign in to comment.