diff --git a/internal/client/redis.go b/internal/client/redis.go index 5672bf5c..949a0d34 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -92,7 +92,18 @@ func (r *Redis) Send(args ...string) { if err != nil { log.Panicf(err.Error()) } - r.flush() + r.Flush() +} + +func (r *Redis) SendToBuffer(args ...string) { + argsInterface := make([]interface{}, len(args)) + for inx, item := range args { + argsInterface[inx] = item + } + err := r.protoWriter.WriteArgs(argsInterface) + if err != nil { + log.Panicf(err.Error()) + } } func (r *Redis) SendBytes(buf []byte) { @@ -100,10 +111,17 @@ func (r *Redis) SendBytes(buf []byte) { if err != nil { log.Panicf(err.Error()) } - r.flush() + r.Flush() +} + +func (r *Redis) SendBytesToBuffer(buf []byte) { + _, err := r.writer.Write(buf) + if err != nil { + log.Panicf(err.Error()) + } } -func (r *Redis) flush() { +func (r *Redis) Flush() { err := r.writer.Flush() if err != nil { log.Panicf(err.Error()) diff --git a/internal/config/config.go b/internal/config/config.go index b30ba953..e97a4a65 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -34,7 +34,7 @@ type AdvancedOptions struct { // ignore: redis-shake will skip restore the key when meet "Target key name is busy" error. RDBRestoreCommandBehavior string `mapstructure:"rdb_restore_command_behavior" default:"panic"` - PipelineCountLimit uint64 `mapstructure:"pipeline_count_limit" default:"1024"` + PipelineCountLimit uint32 `mapstructure:"pipeline_count_limit" default:"1024"` TargetRedisClientMaxQuerybufLen int64 `mapstructure:"target_redis_client_max_querybuf_len" default:"1024000000"` TargetRedisProtoMaxBulkLen uint64 `mapstructure:"target_redis_proto_max_bulk_len" default:"512000000"` diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index 4e636a4a..d6205e6f 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -38,6 +38,9 @@ type redisStandaloneWriter struct { UnansweredBytes int64 `json:"unanswered_bytes"` UnansweredEntries int64 `json:"unanswered_entries"` } + + // the number of commands cached in the pipeline + bufCnt atomic.Uint32 } func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer { @@ -58,6 +61,9 @@ func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer { } func (w *redisStandaloneWriter) Close() { + if w.bufCnt.Load() > 0 { + w.client.Flush() + } if !w.offReply { close(w.chWaitReply) w.chWg.Wait() @@ -81,7 +87,13 @@ func (w *redisStandaloneWriter) Write(e *entry.Entry) { atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) atomic.AddInt64(&w.stat.UnansweredEntries, 1) } - w.client.SendBytes(bytes) + + limit := config.Opt.Advanced.PipelineCountLimit + w.client.SendBytesToBuffer(bytes) + if w.bufCnt.Add(1) >= limit { + w.bufCnt.CompareAndSwap(limit, 0) + w.client.Flush() + } } func (w *redisStandaloneWriter) switchDbTo(newDbId int) {