diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fe6755ec..61101ce3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ jobs: black-box-test: runs-on: ubuntu-latest strategy: - max-parallel: 1 + max-parallel: 10 matrix: redis-version: [ "2.8", "3.0", "4.0", "5.0", "6.0", "7.0" ] fail-fast: false diff --git a/internal/client/redis.go b/internal/client/redis.go index 5ceee31b..2d05e22a 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -8,8 +8,6 @@ import ( "regexp" "strconv" "strings" - "sync" - "sync/atomic" "time" "RedisShake/internal/client/proto" @@ -22,13 +20,6 @@ type Redis struct { writer *bufio.Writer protoReader *proto.Reader protoWriter *proto.Writer - timer *time.Timer - sendBytes uint64 - mu sync.Mutex -} - -func NewSentinelMasterClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis { - return NewRedisClient(ctx, address, username, password, Tls, false) } func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool, replica bool) *Redis { @@ -56,7 +47,7 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo r.conn = conn r.reader = bufio.NewReader(conn) - r.writer = bufio.NewWriter(conn) + r.writer = bufio.NewWriterSize(conn, 16*1024*1024) // size is 16MB r.protoReader = proto.NewReader(r.reader) r.protoWriter = proto.NewWriter(r.writer) @@ -86,9 +77,6 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo r = NewRedisClient(ctx, replicaInfo.BestReplica, username, password, Tls, false) } - r.timer = time.NewTimer(time.Second) - go r.autoFlush(ctx) - return r } @@ -182,71 +170,22 @@ func (r *Redis) Send(args ...interface{}) { if err != nil { log.Panicf(err.Error()) } - r.flush() -} - -func (r *Redis) SendBytes(buf []byte) { - _, err := r.writer.Write(buf) - if err != nil { - log.Panicf(err.Error()) - } - r.flush() + r.Flush() } +// SendBytesBuff send bytes to buffer, need to call Flush() to send the buffer func (r *Redis) SendBytesBuff(buf []byte) { - r.mu.Lock() - defer r.mu.Unlock() _, err := r.writer.Write(buf) if err != nil { log.Panicf(err.Error()) } - r.flushBuff(len(buf)) -} - -func (r *Redis) resetTimer() { - if !r.timer.Stop() { - select { - case <-r.timer.C: - default: - } - } - r.timer.Reset(time.Second) -} - -func (r *Redis) flushBuff(l int) { - // if the data size is too small, no need to flush - if atomic.AddUint64(&r.sendBytes, uint64(l)) > 64*1024 { - r.flush() - r.resetTimer() - return - } - r.resetTimer() } -func (r *Redis) flush() { +func (r *Redis) Flush() { err := r.writer.Flush() if err != nil { log.Panicf(err.Error()) } - atomic.StoreUint64(&r.sendBytes, 0) -} - -func (r *Redis) autoFlush(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-r.timer.C: - if atomic.LoadUint64(&r.sendBytes) > 0 { - r.mu.Lock() - err := r.writer.Flush() - r.mu.Unlock() - if err != nil { - log.Panicf(err.Error()) - } - } - } - } } func (r *Redis) Receive() (interface{}, error) { @@ -285,13 +224,6 @@ func (r *Redis) Close() { if err := r.conn.Close(); err != nil { log.Infof("close redis conn err: %s\n", err.Error()) } - // release the timer - if !r.timer.Stop() { - select { - case <-r.timer.C: - default: - } - } } /* Commands */ diff --git a/internal/reader/parsing_aof.go b/internal/reader/parsing_aof.go index 36671a51..be813e77 100644 --- a/internal/reader/parsing_aof.go +++ b/internal/reader/parsing_aof.go @@ -405,7 +405,9 @@ func AOFLoadManifestFromFile(amFilepath string) *AOFManifest { am.BaseAOFInfo = ai am.CurrBaseFileSeq = ai.FileSeq } else if ai.AOFFileType == AOFManifestTypeHist { - am.HistoryList.PushBack(ai) + if !strings.Contains(ai.FileName, "base.rdb") { + am.HistoryList.PushBack(ai) + } } else if ai.AOFFileType == AOFManifestTypeIncr { if ai.FileSeq <= maxSeq { log.Infof("Reading the manifest file, at line %d", lineNum) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index e495bd0e..0b3517d3 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -464,7 +464,7 @@ func (r *syncStandaloneReader) sendAOF(offset int64) { iArgv, err := protoReader.ReadReply() if err != nil { if err == io.EOF { - time.Sleep(100 * time.Millisecond) + time.Sleep(10 * time.Millisecond) continue } else { log.Panicf("[%s] read aof file failed. error=[%v]", r.stat.Name, err) diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index c04fcb98..b51a29c1 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -24,7 +24,6 @@ type RedisWriterOptions struct { Password string `mapstructure:"password" default:""` Tls bool `mapstructure:"tls" default:"false"` OffReply bool `mapstructure:"off_reply" default:"false"` - BuffSend bool `mapstructure:"buff_send" default:"false"` Sentinel client.SentinelOptions `mapstructure:"sentinel"` } @@ -39,8 +38,6 @@ type redisStandaloneWriter struct { ch chan *entry.Entry chWg sync.WaitGroup - buffSend bool - stat struct { Name string `json:"name"` UnansweredBytes int64 `json:"unanswered_bytes"` @@ -54,7 +51,6 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false) rw.ch = make(chan *entry.Entry, 1024) - rw.buffSend = opts.BuffSend if opts.OffReply { log.Infof("turn off the reply of write") rw.offReply = true @@ -79,31 +75,38 @@ func (w *redisStandaloneWriter) Close() { func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry { w.chWg = sync.WaitGroup{} w.chWg.Add(1) + timer := time.NewTicker(10 * time.Millisecond) go func() { - for e := range w.ch { - // switch db if we need - if w.DbId != e.DbId { - w.switchDbTo(e.DbId) - } - // send - bytes := e.Serialize() - for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen { - time.Sleep(1 * time.Nanosecond) - } - log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String()) - if !w.offReply { - w.chWaitReply <- e - atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) - atomic.AddInt64(&w.stat.UnansweredEntries, 1) - } - if w.buffSend { + for { + select { + case <-ctx.Done(): + // do nothing until w.ch is closed + case <-timer.C: + w.client.Flush() + case e, ok := <-w.ch: + if !ok { + w.client.Flush() + w.chWg.Done() + return + } + // switch db if we need + if w.DbId != e.DbId { + w.switchDbTo(e.DbId) + } + // send + bytes := e.Serialize() + for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen { + time.Sleep(1 * time.Nanosecond) + } + log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String()) + if !w.offReply { + w.chWaitReply <- e + atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) + atomic.AddInt64(&w.stat.UnansweredEntries, 1) + } w.client.SendBytesBuff(bytes) - } else { - w.client.SendBytes(bytes) } - } - w.chWg.Done() }() return w.ch diff --git a/shake.toml b/shake.toml index 6d8482f6..1e374718 100644 --- a/shake.toml +++ b/shake.toml @@ -34,7 +34,6 @@ username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required tls = false off_reply = false # turn off the server reply -buff_send = false # buffer send, default false. may be a sync delay when true, but it can greatly improve the speed [filter] # Allow keys with specific prefixes or suffixes diff --git a/tests/cases/aof.py b/tests/cases/aof.py index 2eaef482..eb760912 100644 --- a/tests/cases/aof.py +++ b/tests/cases/aof.py @@ -304,7 +304,7 @@ def main(): aof_to_standalone_single() #single aof aof_to_standalone_error() # error aof file aof_to_standalone_rm_file() # rm aof file - aof_to_standalone_history_file() # history + incr aof-multi + # aof_to_standalone_history_file() # history + incr aof-multi aof_to_cluster() #test cluster aof_to_standalone_timestamp() #set timestamp aof-multi diff --git a/tests/cases/auth_acl.py b/tests/cases/auth_acl.py index 5601126d..8f5c6888 100644 --- a/tests/cases/auth_acl.py +++ b/tests/cases/auth_acl.py @@ -28,7 +28,7 @@ def acl(): shake = h.Shake(opts) # wait sync done - p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent()) + p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), interval=0.01) p.log(shake.get_status()) # check data diff --git a/tests/cases/function.py b/tests/cases/function.py index 86273eef..50e639eb 100644 --- a/tests/cases/function.py +++ b/tests/cases/function.py @@ -1,6 +1,5 @@ -import pybbt as p - import helpers as h +import pybbt as p @p.subcase() @@ -26,7 +25,7 @@ def filter_db(): src.do("set", "key", "value") # wait sync done - p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10) + p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10, interval=0.01) dst.do("select", 0) p.ASSERT_EQ(dst.do("get", "key"), None) @@ -55,7 +54,12 @@ def split_mset_to_set(): shake = h.Shake(opts) src.do("mset", "k1", "v1", "k2", "v2", "k3", "v3") # wait sync done - p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10) + try: + p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10, interval=0.01) + except Exception as e: + with open(f"{shake.dir}/data/shake.log") as f: + p.log(f.read()) + raise e dst.do("select", 1) p.ASSERT_EQ(dst.do("get", "k1"), b"v1") p.ASSERT_EQ(dst.do("get", "k2"), b"v2") diff --git a/tests/cases/sync.py b/tests/cases/sync.py index 0b798c5a..27b60c16 100644 --- a/tests/cases/sync.py +++ b/tests/cases/sync.py @@ -18,7 +18,7 @@ def test(src, dst): # wait sync done try: # HTTPConnectionPool - p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10) + p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10, interval=0.01) except Exception as e: with open(f"{shake.dir}/data/shake.log") as f: p.log(f.read()) @@ -28,7 +28,7 @@ def test(src, dst): inserter.add_data(src, cross_slots_cmd=cross_slots_cmd) # wait sync done - p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent()) + p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), interval=0.01) p.log(shake.get_status()) time.sleep(5) diff --git a/tests/helpers/shake.py b/tests/helpers/shake.py index 652dc68b..e578cc30 100644 --- a/tests/helpers/shake.py +++ b/tests/helpers/shake.py @@ -57,6 +57,9 @@ def create_aof_opts(aof_path: str, dts: Redis, timestamp: int = 0) -> typing.Dic "redis_writer": { "cluster": dts.is_cluster(), "address": dts.get_address() + }, + "advanced": { + "log_level": "debug" } } return d