diff --git a/config.go b/config.go index 4cb8417..0c1e083 100644 --- a/config.go +++ b/config.go @@ -2,122 +2,70 @@ package law import lf "github.com/shengyanli1982/law/internal/lockfree" -// DefaultBufferSize 是默认的缓冲区大小 +// DefaultBufferSize 默认缓冲区大小 // DefaultBufferSize is the default buffer size const DefaultBufferSize = 2048 -// Config 是配置结构体,包含了日志器、缓冲区大小和回调函数 -// Config is a structure that contains a logger, buffer size, and a callback function +// Config 配置结构体 +// Config is the configuration structure type Config struct { - // buffSize 是缓冲区的大小 - // buffSize is the size of the buffer - buffSize int - - // callback 是回调函数,用于处理特定事件 - // callback is a callback function for handling specific events - callback Callback - - // queue 是一个队列,它用于存储即将处理的事件。 - // queue is a queue that is used to store events that are about to be processed. - queue Queue + buffSize int // 缓冲区大小 / buffer size + callback Callback // 回调函数 / callback function + queue Queue // 队列实现 / queue implementation } -// NewConfig 是一个构造函数,用于创建一个新的 Config 实例 -// NewConfig is a constructor function for creating a new Config instance +// NewConfig 创建新的配置实例 +// NewConfig creates a new configuration instance func NewConfig() *Config { - // 返回一个新的 Config 实例 - // Return a new Config instance return &Config{ - // 设置缓冲区大小为默认值 - // Set the buffer size to the default value buffSize: DefaultBufferSize, - - // 创建一个空的回调函数 - // Create an empty callback function callback: newEmptyCallback(), - - // 创建一个无锁队列 - // Create an unlocked queue - queue: lf.NewLockFreeQueue(), + queue: lf.NewLockFreeQueue(), } } -// DefaultConfig 是一个函数,返回一个新的默认配置实例 -// DefaultConfig is a function that returns a new default configuration instance +// DefaultConfig 返回默认配置 +// DefaultConfig returns the default configuration func DefaultConfig() *Config { - // 调用 NewConfig 函数创建一个新的配置实例 - // Call the NewConfig function to create a new configuration instance return NewConfig() } -// WithBufferSize 是 Config 结构体的一个方法,用于设置缓冲区大小 -// WithBufferSize is a method of the Config structure, used to set the buffer size +// WithBufferSize 设置缓冲区大小 +// WithBufferSize sets the buffer size func (c *Config) WithBufferSize(size int) *Config { - // 设置缓冲区大小 - // Set the buffer size c.buffSize = size - - // 返回配置实例,以便进行链式调用 - // Return the configuration instance for chaining return c } -// WithCallback 是 Config 结构体的一个方法,用于设置回调函数 -// WithCallback is a method of the Config structure, used to set the callback function +// WithCallback 设置回调函数 +// WithCallback sets the callback function func (c *Config) WithCallback(cb Callback) *Config { - // 设置回调函数 - // Set the callback function c.callback = cb - - // 返回配置实例,以便进行链式调用 - // Return the configuration instance for chaining return c } -// WithQueue 是 Config 结构体的一个方法,用于设置队列 -// WithQueue is a method of the Config structure, used to set the queue +// WithQueue 设置队列实现 +// WithQueue sets the queue implementation func (c *Config) WithQueue(q Queue) *Config { - // 设置队列 - // Set the queue c.queue = q - - // 返回配置实例,以便进行链式调用 - // Return the configuration instance for chaining return c } -// isConfigValid 是一个函数,用于检查配置是否有效。如果配置无效,它将使用默认值进行修复。 -// isConfigValid is a function to check if the configuration is valid. If the configuration is invalid, it will fix it with default values. +// isConfigValid 验证并修正配置 +// isConfigValid validates and corrects the configuration func isConfigValid(conf *Config) *Config { - // 如果配置不为空 - // If the configuration is not null if conf != nil { - - // 如果缓冲区大小小于或等于0,将其设置为默认缓冲区大小 - // If the buffer size is less than or equal to 0, set it to the default buffer size if conf.buffSize <= 0 { conf.buffSize = DefaultBufferSize } - - // 如果回调函数为空,创建一个新的空回调函数 - // If the callback function is null, create a new empty callback function if conf.callback == nil { conf.callback = newEmptyCallback() } - - // 如果队列为空,创建一个新的无锁队列 - // If the queue is null, create a new unlocked queue if conf.queue == nil { conf.queue = lf.NewLockFreeQueue() } - } else { - // 如果配置为空,使用默认配置 - // If the configuration is null, use the default configuration conf = DefaultConfig() } - - // 返回修复后的配置 - // Return the fixed configuration return conf } diff --git a/interface.go b/interface.go index 99e2e8f..2a5f386 100644 --- a/interface.go +++ b/interface.go @@ -1,51 +1,47 @@ package law -// Writer 是一个接口,定义了写操作的行为。 -// Writer is an interface that defines the behavior of write operations. +// Writer 定义了写入器接口 +// Writer defines the writer interface type Writer interface { - // Write 方法接受一个字节切片,返回写入的字节数和可能的错误。 - // The Write method accepts a byte slice and returns the number of bytes written and a possible error. + // Write 写入数据,返回写入的字节数和可能的错误 + // Write writes data and returns the number of bytes written and any error Write([]byte) (int, error) - // Stop 方法用于停止写操作。 - // The Stop method is used to stop write operations. + // Stop 停止写入器 + // Stop stops the writer Stop() } -// Callback 是一个接口,定义了队列操作和写操作的回调函数。 -// Callback is an interface that defines callback functions for queue operations and write operations. +// Callback 定义了回调接口 +// Callback defines the callback interface type Callback interface { - // OnWriteFailed 是一个方法,当写操作失败时会被调用。 - // 它接受两个参数:一个字节切片(表示写入内容)和一个错误(表示失败的原因)。 - // OnWriteFailed is a method that is called when a write operation fails. - // It takes two parameters: a byte slice (indicating the content to be written) and an error (indicating the reason for the failure). + // OnWriteFailed 当写入失败时被调用 + // OnWriteFailed is called when writing fails OnWriteFailed(content []byte, reason error) } -// emptyCallback 是一个实现了 Callback 接口的结构体,但所有方法的实现都为空。 -// emptyCallback is a struct that implements the Callback interface, but all method implementations are empty. +// emptyCallback 空回调实现 +// emptyCallback is an empty callback implementation type emptyCallback struct{} -// OnWriteFailed 是 emptyCallback 结构体实现 Callback 接口的方法,但此方法没有任何实现。 -// OnWriteFailed is a method of the emptyCallback struct that implements the Callback interface, but this method has no implementation. +// OnWriteFailed 空回调的写入失败处理方法(无操作) +// OnWriteFailed handles write failures for empty callback (no-op) func (c *emptyCallback) OnWriteFailed([]byte, error) {} -// newEmptyCallback 是一个构造函数,用于创建一个新的 emptyCallback 实例。 -// newEmptyCallback is a constructor function for creating a new emptyCallback instance. +// newEmptyCallback 创建新的空回调实例 +// newEmptyCallback creates a new empty callback instance func newEmptyCallback() Callback { - // 返回一个新的 emptyCallback 实例。 - // Return a new emptyCallback instance. return &emptyCallback{} } -// Queue 是一个接口,定义了队列的基本操作:Push 和 Pop。 -// Queue is an interface that defines the basic operations of a queue: Push and Pop. +// Queue 定义了队列接口 +// Queue defines the queue interface type Queue interface { - // Push 方法用于将值添加到队列中。 - // The Push method is used to add a value to the queue. + // Push 将值推入队列 + // Push pushes a value into the queue Push(value interface{}) - // Pop 方法用于从队列中取出一个值。 - // The Pop method is used to take a value out of the queue. + // Pop 从队列中取出值 + // Pop retrieves a value from the queue Pop() interface{} } diff --git a/writer.go b/writer.go index b601815..9f8209f 100644 --- a/writer.go +++ b/writer.go @@ -14,276 +14,158 @@ import ( wr "github.com/shengyanli1982/law/internal/writer" ) -// 定义默认的心跳间隔为 500 毫秒 -// Define the default heartbeat interval as 500 milliseconds +// 默认心跳间隔和空闲超时时间 +// Default heartbeat interval and idle timeout duration const defaultHeartbeatInterval = 500 * time.Millisecond - -// 定义默认的空闲超时为 5 秒 -// Define the default idle timeout as 5 seconds const defaultIdleTimeout = 5 * time.Second -// 定义一个错误,表示写异步器已经关闭 -// Define an error indicating that the write asyncer is closed -var ErrorWriteAsyncerIsClosed = errors.New("write asyncer is closed") +// 错误定义 +// Error definitions +var ( + ErrorWriteAsyncerIsClosed = errors.New("write asyncer is closed") + ErrorWriteContentIsNil = errors.New("write content is nil") +) -// WriteAsyncer 结构体用于实现写异步器 -// The WriteAsyncer struct is used to implement the write asyncer +// WriteAsyncer 异步写入器结构体 +// WriteAsyncer is an asynchronous writer structure type WriteAsyncer struct { - // config 用于存储写异步器的配置 - // config is used to store the configuration of the write asyncer - config *Config - - // writer 用于写入数据 - // writer is used to write data - writer io.Writer - - // bufferedWriter 用于缓冲写入的数据 - // bufferedWriter is used to buffer the data to be written - bufferedWriter *bufio.Writer - - // timer 用于控制写入的时间 - // timer is used to control the time of writing - timer atomic.Int64 - - // once 用于确保某个操作只执行一次 - // once is used to ensure that an operation is performed only once - once sync.Once - - // ctx 用于控制写异步器的生命周期 - // ctx is used to control the lifecycle of the write asyncer - ctx context.Context - - // cancel 用于取消写异步器的操作 - // cancel is used to cancel the operation of the write asyncer - cancel context.CancelFunc - - // wg 用于等待写异步器的所有操作完成 - // wg is used to wait for all operations of the write asyncer to complete - wg sync.WaitGroup - - // state 用于存储写异步器的状态 - // state is used to store the status of the write asyncer - state *wr.Status - - // bufferpool 用于存储元素池 - // bufferpool is used to store the element pool - bufferpool *wr.BufferPool + config *Config // 配置信息 / Configuration + writer io.Writer // 底层写入器 / Underlying writer + bufferedWriter *bufio.Writer // 带缓冲的写入器 / Buffered writer + timer atomic.Int64 // 计时器 / Timer + once sync.Once // 确保只执行一次的控制器 / Once controller + ctx context.Context // 上下文 / Context + cancel context.CancelFunc // 取消函数 / Cancel function + wg sync.WaitGroup // 等待组 / Wait group + state *wr.Status // 状态管理器 / Status manager + bufferpool *wr.BufferPool // 缓冲池 / Buffer pool } -// NewWriteAsyncer 函数用于创建一个新的 WriteAsyncer 实例 -// The NewWriteAsyncer function is used to create a new WriteAsyncer instance +// NewWriteAsyncer 创建新的异步写入器 +// NewWriteAsyncer creates a new asynchronous writer func NewWriteAsyncer(writer io.Writer, conf *Config) *WriteAsyncer { - // 如果 writer 参数为 nil,那么将其设置为 os.Stdout - // If the writer parameter is nil, then set it to os.Stdout if writer == nil { writer = os.Stdout } - // 检查配置是否有效,如果无效则使用默认配置 - // Check if the configuration is valid, if not, use the default configuration conf = isConfigValid(conf) - // 创建一个新的 WriteAsyncer 实例 - // Create a new WriteAsyncer instance wa := &WriteAsyncer{ - // 设置配置 - // Set the configuration - config: conf, - - // 设置写入器 - // Set the writer - writer: writer, - - // 创建一个新的带有指定缓冲区大小的 bufio.Writer 实例 - // Create a new bufio.Writer instance with the specified buffer size + config: conf, + writer: writer, bufferedWriter: bufio.NewWriterSize(writer, conf.buffSize), - - // 初始化状态 - // Initialize the status - state: wr.NewStatus(), - - // 初始化计时器 - // Initialize the timer - timer: atomic.Int64{}, - - // 初始化 once - // Initialize once - once: sync.Once{}, - - // 初始化 wg - // Initialize wg - wg: sync.WaitGroup{}, - - // 初始化元素池 - // Initialize the element pool - bufferpool: wr.NewBufferPool(), + state: wr.NewStatus(), + timer: atomic.Int64{}, + once: sync.Once{}, + wg: sync.WaitGroup{}, + bufferpool: wr.NewBufferPool(), } - // 创建一个新的 context.Context 实例,并设置一个取消函数 - // Create a new context.Context instance and set a cancel function wa.ctx, wa.cancel = context.WithCancel(context.Background()) - - // 设置下一次执行的时间为当前时间 - // Set the time of the next execution to the current time wa.state.SetExecuteAt(time.Now().UnixMilli()) - - // 设置 running 为 true,表示 WriteAsyncer 正在运行 - // Set running to true, indicating that WriteAsyncer is running wa.state.SetRunning(true) - // 增加 wg 的计数 - // Increase the count of wg wa.wg.Add(2) + go wa.poller() // 启动轮询器 / Start poller + go wa.updateTimer() // 启动计时器更新器 / Start timer updater - // 启动 poller 协程 - // Start the poller goroutine - go wa.poller() - - // 启动 updateTimer 协程 - // Start the updateTimer goroutine - go wa.updateTimer() - - // 返回新创建的 WriteAsyncer 实例 - // Return the newly created WriteAsyncer instance return wa } -// Stop 方法用于停止 WriteAsyncer -// The Stop method is used to stop the WriteAsyncer +// Stop 停止异步写入器 +// Stop stops the asynchronous writer func (wa *WriteAsyncer) Stop() { - // 使用 once.Do 方法确保以下的操作只执行一次 - // Use the once.Do method to ensure that the following operations are performed only once wa.once.Do(func() { - // 将 running 状态设置为 false,表示 WriteAsyncer 已经停止 - // Set the running status to false, indicating that the WriteAsyncer has stopped wa.state.SetRunning(false) - - // 调用 cancel 函数取消 WriteAsyncer 的所有操作 - // Call the cancel function to cancel all operations of the WriteAsyncer wa.cancel() - - // 等待 WriteAsyncer 的所有操作完成 - // Wait for all operations of the WriteAsyncer to complete wa.wg.Wait() - - // 将队列中的所有数据写入到 writer - // Write all data in the queue to the writer wa.cleanQueueToWriter() - - // 刷新 bufferedWriter,将所有缓冲的数据写入到 writer - // Flush the bufferedWriter, writing all buffered data to the writer wa.bufferedWriter.Flush() - - // 重置 bufferedWriter,将其设置为 io.Discard - // Reset the bufferedWriter, setting it to io.Discard wa.bufferedWriter.Reset(io.Discard) }) } -// Write 方法用于将数据写入到 WriteAsyncer -// The Write method is used to write data to the WriteAsyncer +// Write 实现写入方法 +// Write implements the write method func (wa *WriteAsyncer) Write(p []byte) (n int, err error) { - // 如果 WriteAsyncer 已经停止,那么返回错误 - // If the WriteAsyncer has stopped, then return an error if !wa.state.IsRunning() { return 0, ErrorWriteAsyncerIsClosed } - // 从元素池中获取一个元素 - // Get an buff from the buff pool + if p == nil { + return 0, ErrorWriteContentIsNil + } + + l := len(p) + if l <= 0 { + return 0, nil + } + buff := wa.bufferpool.Get() + buff.Grow(l) - // 将数据设置到元素的 buffer 字段 - // Set the data to the buffer field of the element if n, err = buff.Write(p); err != nil { - return + wa.bufferpool.Put(buff) + return 0, err } - // 将元素添加到队列 - // Add the element to the queue wa.config.queue.Push(buff) - - // 返回数据的长度和 nil 错误 - // Return the length of the data and a nil error - return len(p), nil + return l, nil } -// flushBufferedWriter 方法用于将数据写入到 bufferedWriter -// The flushBufferedWriter method is used to write data to the bufferedWriter -func (wa *WriteAsyncer) flushBufferedWriter(p []byte) (int, error) { - // 如果数据的长度大于 bufferedWriter 的可用空间,并且 bufferedWriter 中已经有缓冲的数据 - // If the length of the data is greater than the available space of the bufferedWriter, and there is already buffered data in the bufferedWriter - if len(p) > wa.bufferedWriter.Available() && wa.bufferedWriter.Buffered() > 0 { - // 刷新 bufferedWriter,将所有缓冲的数据写入到 writer - // Flush the bufferedWriter, writing all buffered data to the writer +// flushBufferedWriter 刷新缓冲的写入器 +// flushBufferedWriter flushes the buffered writer +func (wa *WriteAsyncer) flushBufferedWriter(content []byte) (int, error) { + sizeOfContent := len(content) + if sizeOfContent == 0 { + return 0, nil + } + + // 如果内容大小超过可用空间且缓冲区非空,则先刷新 + // If content size exceeds available space and buffer is not empty, flush first + if sizeOfContent > wa.bufferedWriter.Available() && wa.bufferedWriter.Buffered() > 0 { if err := wa.bufferedWriter.Flush(); err != nil { - // 如果刷新操作失败,返回写入的长度(此时为0)和错误 - // If the flush operation fails, return the length of the write (which is now 0) and the error return 0, err } } - // 将数据写入到 bufferedWriter,并返回写入的长度和错误 - // Write the data to the bufferedWriter and return the length of the write and the error - return wa.bufferedWriter.Write(p) + // 如果内容大小超过缓冲区大小,直接写入 + // If content size exceeds buffer size, write directly + if sizeOfContent >= wa.config.buffSize { + return wa.writer.Write(content) + } + + return wa.bufferedWriter.Write(content) } -// poller 方法用于从队列中获取元素并执行相应的函数 -// The poller method is used to get elements from the queue and execute the corresponding functions +// poller 轮询器,处理写入请求和心跳检查 +// poller handles write requests and heartbeat checks func (wa *WriteAsyncer) poller() { - // 创建一个新的定时器,用于定时检查队列 - // Create a new timer for periodically checking the queue - heartbeat := time.NewTicker(defaultHeartbeatInterval) + const checkInterval = defaultHeartbeatInterval + heartbeat := time.NewTicker(checkInterval) - // 使用 defer 语句确保在函数结束时停止定时器并完成减少 WaitGroup 的计数 - // Use a defer statement to ensure that the timer is stopped and the count of WaitGroup is reduced when the function ends defer func() { heartbeat.Stop() wa.wg.Done() }() - // 使用无限循环来不断从队列中获取元素 - // Use an infinite loop to continuously get elements from the queue for { - // 尝试从队列中弹出一个元素 - // Try to pop an element from the queue - element := wa.config.queue.Pop() - - // 如果元素不为空,执行 executeFunc 函数 - // If the element is not null, execute the executeFunc function - if element != nil { + if element := wa.config.queue.Pop(); element != nil { wa.executeFunc(element.(*bytes.Buffer)) - } else { - select { - // 如果接收到 ctx.Done 的信号,那么结束循环 - // If the ctx.Done signal is received, then end the loop - case <-wa.ctx.Done(): - return - - // 如果等待了一段时间,那么检查 bufferedWriter 中是否有缓冲的数据并且已经超过了空闲超时时间 - // If a period of time has passed, then check whether there is buffered data in the bufferedWriter and it has exceeded the idle timeout - case <-heartbeat.C: - // 获取当前时间 - // Get the current time - now := wa.timer.Load() + continue + } - // 计算当前时间与上次执行时间的差值 - // Calculate the difference between the current time and the last execution time - diff := now - wa.state.GetExecuteAt() + select { + case <-wa.ctx.Done(): + return - // 如果 bufferedWriter 中有缓冲的数据,并且已经超过了空闲超时时间 - // If there is buffered data in the bufferedWriter and it has exceeded the idle timeout - if wa.bufferedWriter.Buffered() > 0 && diff >= defaultIdleTimeout.Milliseconds() { - // 刷新 bufferedWriter,将所有缓冲的数据写入到 writer - // Flush the bufferedWriter, writing all buffered data to the writer + case <-heartbeat.C: + if wa.bufferedWriter.Buffered() > 0 { + now := wa.timer.Load() + if (now - wa.state.GetExecuteAt()) >= defaultIdleTimeout.Milliseconds() { if err := wa.bufferedWriter.Flush(); err != nil { - // 如果在刷新 bufferedWriter 时发生错误,调用 OnWriteFailure 回调函数 - // If an error occurs while flushing the bufferedWriter, call the OnWriteFailure callback function wa.config.callback.OnWriteFailed(nil, err) } - - // 更新上次执行时间为当前时间 - // Update the last execution time to the current time wa.state.SetExecuteAt(now) } } @@ -291,88 +173,50 @@ func (wa *WriteAsyncer) poller() { } } -// updateTimer 方法用于更新 WriteAsyncer 的 timer 字段 -// The updateTimer method is used to update the timer field of the WriteAsyncer +// updateTimer 更新计时器 +// updateTimer updates the timer func (wa *WriteAsyncer) updateTimer() { - // 创建一个每秒触发一次的定时器 - // Create a timer that triggers once per second ticker := time.NewTicker(time.Second) - // 使用 defer 语句确保在函数返回时停止定时器并减少 WaitGroup 的计数 - // Use a defer statement to ensure that the timer is stopped and the WaitGroup count is decremented when the function returns defer func() { ticker.Stop() wa.wg.Done() }() - // 使用无限循环来不断检查定时器和 ctx.Done 通道 - // Use an infinite loop to continuously check the timer and ctx.Done channel for { select { - // 如果 ctx.Done 通道接收到数据,那么返回,结束这个函数 - // If the ctx.Done channel receives data, then return and end this function case <-wa.ctx.Done(): return - // 如果定时器触发,那么更新 timer 字段为当前的 Unix 毫秒时间 - // If the timer triggers, then update the timer field to the current Unix millisecond time case <-ticker.C: wa.timer.Store(time.Now().UnixMilli()) } } } -// executeFunc 方法用于执行 WriteAsyncer 的写入操作 -// The executeFunc method is used to perform the write operation of the WriteAsyncer +// executeFunc 执行写入操作 +// executeFunc executes the write operation func (wa *WriteAsyncer) executeFunc(buff *bytes.Buffer) { - // 获取当前的 Unix 毫秒时间 - // Get the current Unix millisecond time - now := wa.timer.Load() - - // 更新上次执行时间为当前时间 - // Update the last execution time to the current time - wa.state.SetExecuteAt(now) - - // content 是一个变量,它获取 elem 的缓冲区的字节 - // content is a variable that gets the bytes of the buffer of elem + wa.state.SetExecuteAt(wa.timer.Load()) content := buff.Bytes() - // 将元素的数据写入到 bufferedWriter - // Write the data of the element to the bufferedWriter if _, err := wa.flushBufferedWriter(content); err != nil { - // 如果写入失败,那么将 content 复制到一个新的切片中。因为 Buffer 会被重置,原有的数据会被覆盖。 - // If the write fails, then copy content to a new slice. Because the Buffer will be reset, the original data will be overwritten. failContent := make([]byte, len(content)) copy(failContent, content) - - // 如果写入失败,调用回调函数 OnWriteFailure - // If the write fails, call the callback function OnWriteFailure wa.config.callback.OnWriteFailed(failContent, err) } - // 将 elem 放回到 elementpool 中 - // Put elem back into the elementpool wa.bufferpool.Put(buff) } -// cleanQueueToWriter 方法用于将队列中的所有数据写入到 writer -// The cleanQueueToWriter method is used to write all data in the queue to the writer +// cleanQueueToWriter 清理队列中的所有内容到写入器 +// cleanQueueToWriter cleans all content in the queue to the writer func (wa *WriteAsyncer) cleanQueueToWriter() { - // 使用无限循环来不断从队列中取出元素并执行写入操作 - // Use an infinite loop to continuously take elements from the queue and perform write operations for { - // 从队列中取出一个元素 - // Take an element from the queue elem := wa.config.queue.Pop() - - // 如果元素为 nil,那么跳出循环 - // If the element is nil, then break the loop if elem == nil { break } - - // 执行写入操作 - // Perform the write operation wa.executeFunc(elem.(*bytes.Buffer)) } } diff --git a/writer_test.go b/writer_test.go index 96fe3f0..5d082f2 100644 --- a/writer_test.go +++ b/writer_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "sync" "testing" "time" @@ -92,7 +93,6 @@ func TestWriteAsyncer_EarlyShutdown(t *testing.T) { func TestWriteAsyncer_OnWriteFailed(t *testing.T) { - // Will print callback.OnWriteFailed 10 times, faultyWriter.Write 1 time, buff is 66 bytes t.Run("Message large than bufferSize", func(t *testing.T) { conf := NewConfig().WithCallback(&callback{t: t}).WithBufferSize(60) @@ -109,7 +109,6 @@ func TestWriteAsyncer_OnWriteFailed(t *testing.T) { time.Sleep(time.Second) }) - // Will print callback.OnWriteFailed 1 times, faultyWriter.Write 1 time, buff is 594 bytes (Because of the buffer size is 600) t.Run("Message less than bufferSize", func(t *testing.T) { conf := NewConfig().WithCallback(&callback{t: t}).WithBufferSize(600) @@ -126,3 +125,120 @@ func TestWriteAsyncer_OnWriteFailed(t *testing.T) { time.Sleep(time.Second) }) } + +func TestWriteAsyncer_EdgeCases(t *testing.T) { + t.Run("nil writer defaults to stdout", func(t *testing.T) { + w := NewWriteAsyncer(nil, nil) + assert.NotNil(t, w) + w.Stop() + }) + + t.Run("nil content", func(t *testing.T) { + buff := bytes.NewBuffer(make([]byte, 0)) + w := NewWriteAsyncer(buff, nil) + defer w.Stop() + + _, err := w.Write(nil) + assert.ErrorIs(t, err, ErrorWriteContentIsNil) + }) + + t.Run("empty content", func(t *testing.T) { + buff := bytes.NewBuffer(make([]byte, 0)) + w := NewWriteAsyncer(buff, nil) + defer w.Stop() + + n, err := w.Write([]byte{}) + assert.Nil(t, err) + assert.Equal(t, 0, n) + }) + + t.Run("multiple stop calls", func(t *testing.T) { + buff := bytes.NewBuffer(make([]byte, 0)) + w := NewWriteAsyncer(buff, nil) + + w.Stop() + w.Stop() + }) +} + +func TestWriteAsyncer_Concurrent(t *testing.T) { + t.Run("concurrent writes", func(t *testing.T) { + buff := bytes.NewBuffer(make([]byte, 0, 1024)) + w := NewWriteAsyncer(buff, nil) + defer w.Stop() + + var wg sync.WaitGroup + writers := 10 + iterations := 100 + + wg.Add(writers) + for i := 0; i < writers; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < iterations; j++ { + content := []byte(fmt.Sprintf("w%d-%d", id, j)) + _, err := w.Write(content) + assert.Nil(t, err) + } + }(i) + } + wg.Wait() + time.Sleep(time.Second) + assert.Greater(t, buff.Len(), 0) + }) +} + +func BenchmarkWriteAsyncer(b *testing.B) { + b.Run("small writes", func(b *testing.B) { + buff := bytes.NewBuffer(make([]byte, 0, b.N*10)) + w := NewWriteAsyncer(buff, nil) + defer w.Stop() + + data := []byte("small") + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Write(data) + } + }) + + b.Run("large writes", func(b *testing.B) { + buff := bytes.NewBuffer(make([]byte, 0, b.N*len(largeBytes))) + w := NewWriteAsyncer(buff, nil) + defer w.Stop() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.Write(largeBytes) + } + }) +} + +func TestWriteAsyncer_BufferHandling(t *testing.T) { + t.Run("buffer flush on size exceed", func(t *testing.T) { + buff := bytes.NewBuffer(make([]byte, 0)) + conf := NewConfig().WithBufferSize(10) + w := NewWriteAsyncer(buff, conf) + defer w.Stop() + + _, err := w.Write([]byte("small")) + assert.Nil(t, err) + + _, err = w.Write([]byte("this is a large content")) + assert.Nil(t, err) + + time.Sleep(time.Second) + assert.Contains(t, buff.String(), "small") + }) + + t.Run("buffer flush on idle timeout", func(t *testing.T) { + buff := bytes.NewBuffer(make([]byte, 0)) + w := NewWriteAsyncer(buff, nil) + defer w.Stop() + + _, err := w.Write([]byte("test")) + assert.Nil(t, err) + + time.Sleep(defaultIdleTimeout + time.Second) + assert.Equal(t, "test", buff.String()) + }) +}