Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ RedisShake is a powerful tool for Redis data transformation and migration, offer

6. **Advanced Data Processing**: Enables custom [script-based data transformation](https://tair-opensource.github.io/RedisShake/zh/filter/function.html) and easy-to-use [data filter rules](https://tair-opensource.github.io/RedisShake/zh/filter/filter.html).

7. **New Features**: Supported acyclic two-way synchronization, enabling bidirectional data migration between Redis instances. But the two-way synchronization feature only supports sync_reader mode, not rdb_reader and scan_reader mode. You must confirm that the redis mode is consistent with the cluster-enable configuration, otherwise the two-way synchronization feature will not work.

## How to Get RedisShake

1. Download from [Releases](https://github.com/tair-opensource/RedisShake/releases).
Expand Down Expand Up @@ -67,6 +69,21 @@ address = "127.0.0.1:6380"
block_key_prefix = ["temp:", "cache:"]
```

2. Using the acyclic two-way synchronization feature, you need to add the following configuration to the `shake.toml` file:
```toml
cluster = false # Set to true if the source is a Redis cluster
address = "127.0.0.1:9001" # For clusters, specify the address of any cluster node; use the master or slave address in master-slave mode
sync_rdb = true # Set to false if RDB synchronization is not required
sync_aof = true # Set to false if AOF synchronization is not required
bisync = true # set to true for Data Synchronization Between Different Two Redis (Preventing Infinite Sync Loops)
#prefix = "bsy&88@*" # as prefix of marking keys


[redis_writer]
cluster = true # set to true if target is a redis cluster
address = "127.0.0.1:8001" # when cluster is true, set address to one of the cluster node
```

2. Run RedisShake:
```shell
./redis-shake shake.toml
Expand Down
154 changes: 148 additions & 6 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package main
import (
"RedisShake/internal/client"
"context"
"github.com/cespare/xxhash/v2"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -66,6 +68,8 @@ func main() {

// create reader
var theReader reader.Reader
var isolation entry.Isolation

switch {
case v.IsSet("sync_reader"):
opts := new(reader.SyncReaderOptions)
Expand All @@ -74,6 +78,12 @@ func main() {
if err != nil {
log.Panicf("failed to read the SyncReader config entry. err: %v", err)
}
isolation.OriginAddr = opts.Address
isolation.OriginUser = opts.Username
isolation.OriginIsCluster = opts.Cluster
isolation.BySync = opts.BiSync
isolation.Prefix = opts.Prefix

if opts.Cluster {
log.Infof("create SyncClusterReader")
log.Infof("* address (should be the address of one node in the Redis cluster): %s", opts.Address)
Expand Down Expand Up @@ -146,6 +156,7 @@ func main() {
if err != nil {
log.Panicf("failed to read the FileWriter config entry. err: %v", err)
}
isolation.TargetAddr = opts.Filepath
theWriter = writer.NewFileWriter(ctx, opts)
case v.IsSet("redis_writer"):
opts := new(writer.RedisWriterOptions)
Expand All @@ -154,6 +165,10 @@ func main() {
if err != nil {
log.Panicf("failed to read the RedisStandaloneWriter config entry. err: %v", err)
}
isolation.TargetUser = opts.Username
isolation.TargetAddr = opts.Address
isolation.TargetIsCLuster = opts.Cluster

if opts.OffReply && config.Opt.Advanced.RDBRestoreCommandBehavior == "panic" {
log.Panicf("the RDBRestoreCommandBehavior can't be 'panic' when the server not reply to commands")
}
Expand Down Expand Up @@ -230,17 +245,144 @@ func main() {
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)

var is_rep_key = false // Is it a loopback command?

// write
for _, theEntry := range entries {
theEntry.Parse()
theWriter.Write(theEntry)
if isolation.BySync {
argv := theEntry.Argv

if len(theEntry.Keys) == 0 && argv[0] != "FLUSHALL" && argv[0] != "FLUSHDB" {
continue
}

command := strings.Join(argv, " ")
timestamp := time.Now().Unix()
bisyncCommand := []string{
"SETEX", // SETEX
isolation.Prefix + ":" + strconv.FormatUint(xxhash.Sum64String(command), 10), // new key
"60", // ttl
isolation.OriginAddr + "@" + strconv.FormatInt(timestamp, 10), // value
}

if isolation.TargetIsCLuster && isolation.OriginIsCluster {
clusterReader, ok := theReader.(*reader.SyncClusterReader)
if !ok {
log.Panicf("Type assertion failed: reader is not *syncClusterReader")
}
clusterWrite, ok := theWriter.(*writer.RedisClusterWriter)
if !ok {
log.Panicf("Type assertion failed: writer is not *RedisClusterWriter")
}

value, err := clusterReader.OriginClient.Get(context.Background(), bisyncCommand[1]).Result()
//If the source key does not begin with the prefix and the command hash does not exist, immediately mark it in the target cluster.
if err != nil && value == "" && (argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" || !strings.HasPrefix(theEntry.Keys[0], isolation.Prefix)) {
if argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" {
log.Infof("Writing key:[%s] from %s to %s ", argv[0], isolation.OriginAddr, isolation.TargetAddr)
} else {
log.Infof("Writing key:[%s] from %s to %s ", theEntry.Keys[0], isolation.OriginAddr, isolation.TargetAddr)
}

seconds, err := strconv.ParseInt(bisyncCommand[2], 10, 64)
err = clusterWrite.TargetClient.SetEx(context.Background(), bisyncCommand[1], bisyncCommand[3], time.Duration(seconds)*time.Second).Err()
if err != nil {
log.Panicf("SETEX failed: %w", err)
}
} else {
is_rep_key = true
}
} else if !isolation.TargetIsCLuster && !isolation.OriginIsCluster {
standaloneReader, ok := theReader.(*reader.SyncStandaloneReader)
if !ok {
log.Panicf("Type assertion failed: reader is not *SyncStandaloneReader")
}
standalonWriter, ok := theWriter.(*writer.RedisStandaloneWriter)
if !ok {
log.Panicf("Type assertion failed: writer is not *RedisStandaloneWriter")
}

value, _ := standaloneReader.OriginClient.Get(context.Background(), bisyncCommand[1]).Result()
if value == "" && (argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" || !strings.HasPrefix(theEntry.Keys[0], isolation.Prefix)) {
if argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" {
log.Infof("Writing key:[%s] from %s to %s ", argv[0], isolation.OriginAddr, isolation.TargetAddr)
} else {
log.Infof("Writing key:[%s] from %s to %s ", theEntry.Keys[0], isolation.OriginAddr, isolation.TargetAddr)
}
seconds, err := strconv.ParseInt(bisyncCommand[2], 10, 64)
err = standalonWriter.TargetClient.SetEx(context.Background(), bisyncCommand[1], bisyncCommand[3], time.Duration(seconds)*time.Second).Err()
if err != nil {
log.Panicf("SETEX failed: %w", err)
}
} else {
is_rep_key = true
}
} else if isolation.TargetIsCLuster && !isolation.OriginIsCluster {

standaloneReader, ok := theReader.(*reader.SyncStandaloneReader)
if !ok {
log.Panicf("Type assertion failed: reader is not *SyncStandaloneReader")
}

clusterWrite, ok := theWriter.(*writer.RedisClusterWriter)
if !ok {
log.Panicf("Type assertion failed: writer is not *RedisClusterWriter")
}

value, err := standaloneReader.OriginClient.Get(context.Background(), bisyncCommand[1]).Result()
if err != nil && value == "" && (argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" || !strings.HasPrefix(theEntry.Keys[0], isolation.Prefix)) {
if argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" {
log.Infof("Writing key:[%s] from %s to %s ", argv[0], isolation.OriginAddr, isolation.TargetAddr)
} else {
log.Infof("Writing key:[%s] from %s to %s ", theEntry.Keys[0], isolation.OriginAddr, isolation.TargetAddr)
}
seconds, err := strconv.ParseInt(bisyncCommand[2], 10, 64)
err = clusterWrite.TargetClient.SetEx(context.Background(), bisyncCommand[1], bisyncCommand[3], time.Duration(seconds)*time.Second).Err()
if err != nil {
log.Panicf("SETEX failed: %w", err)
}
} else {
is_rep_key = true
}

} else {
clusterReader, ok := theReader.(*reader.SyncClusterReader)
if !ok {
log.Panicf("Type assertion failed: reader is not *syncClusterReader")
}
standalonWriter, ok := theWriter.(*writer.RedisStandaloneWriter)
if !ok {
log.Panicf("Type assertion failed: writer is not *RedisStandaloneWriter")
}
value, _ := clusterReader.OriginClient.Get(context.Background(), bisyncCommand[1]).Result()
if value == "" && (argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" || !strings.HasPrefix(theEntry.Keys[0], isolation.Prefix)) {
if argv[0] == "FLUSHALL" || argv[0] == "FLUSHDB" {
log.Infof("Writing key:[%s] from %s to %s ", argv[0], isolation.OriginAddr, isolation.TargetAddr)
} else {
log.Infof("Writing key:[%s] from %s to %s ", theEntry.Keys[0], isolation.OriginAddr, isolation.TargetAddr)
}
// immediately set the command of BISYNC to the target cluster
seconds, err := strconv.ParseInt(bisyncCommand[2], 10, 64)
err = standalonWriter.TargetClient.SetEx(context.Background(), bisyncCommand[1], bisyncCommand[3], time.Duration(seconds)*time.Second).Err()
if err != nil {
log.Panicf("SETEX failed: %w", err)
}
} else {
is_rep_key = true
}
}
}

// update writer status
if config.Opt.Advanced.StatusPort != 0 {
status.AddWriteCount(theEntry.CmdName)
if !is_rep_key {
theWriter.Write(theEntry)
// update writer status
if config.Opt.Advanced.StatusPort != 0 {
status.AddWriteCount(theEntry.CmdName)
}
// update log entry count
atomic.AddUint64(&logEntryCount.WriteCount, 1)
}
// update log entry count
atomic.AddUint64(&logEntryCount.WriteCount, 1)
}
}
readerDone <- true
Expand Down
3 changes: 3 additions & 0 deletions docs/src/zh/guide/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

Cluster Reader 即集群读入类,其根据源端分片数量创建同等数量的 Standalone Reader,每个 Standalone Reader 开启一个协程(Goroutinue)并行的从每个源端分片进行读入,并将数据存入相应的管道(Reader Channel)交付给下一环节处理。

### 双向无环数据同步实现
采用目标 Redis 标记法,当 Redis-A 向 Redis-B 同步数据【set name jay】时,首先检查源Redis-A是否存在标记,如果不存在则将命令进行 Hash 为MD5码 【8we34o4ew9msd03i83sfs01syer】,向目标 Redis-B 写入标记【 setex prefix:8we34o4ew9msd03i83sfs01syer 120 源Redis地址@时间戳】。

### Main

Main 即主函数,其根据 Reader Channel 数量开启多个协程,并行的对管道中数据分别执行 Parse、Filter、Function 操作,再调用 Cluster Writer 的 Write 方法,将数据分发给写入端。
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/go-stack/stack v1.8.1
github.com/gofrs/flock v0.8.1
github.com/mcuadros/go-defaults v1.2.0
github.com/redis/go-redis/v9 v9.12.0
github.com/rs/zerolog v1.28.0
github.com/spf13/viper v1.18.1
github.com/stretchr/testify v1.8.4
Expand All @@ -16,8 +17,10 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
Expand Down Expand Up @@ -47,6 +55,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.12.0 h1:XlVPGlflh4nxfhsNXPA8Qp6EmEfTo0rp8oaBzPipXnU=
github.com/redis/go-redis/v9 v9.12.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
Expand Down
55 changes: 52 additions & 3 deletions internal/client/func.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package client

import (
"bytes"
"strings"

"RedisShake/internal/client/proto"
"RedisShake/internal/log"
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"strings"
)

func EncodeArgv(argv []string, buf *bytes.Buffer) {
Expand All @@ -26,3 +29,49 @@ func (r *Redis) IsCluster() bool {
reply := r.DoWithStringReply("INFO", "Cluster")
return strings.Contains(reply, "cluster_enabled:1")
}

// createTLSConfig 根据配置创建 TLS 配置
func CreateTLSConfig(keyFilePath, CACertFilePath, certFilePath string) (*tls.Config, error) {
if keyFilePath == "" && CACertFilePath == "" && certFilePath == "" {
return &tls.Config{
InsecureSkipVerify: true, // 如果没有配置,默认不验证
}, nil
}

config := &tls.Config{
MinVersion: tls.VersionTLS12, // 设置最低 TLS 版本
}

// 加载 CA 证书(如果配置)
if CACertFilePath != "" {
caCert, err := ioutil.ReadFile(CACertFilePath)
if err != nil {
return nil, err
}

caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA cert")
}
config.RootCAs = caCertPool
}

// 加载客户端证书和私钥(如果配置,用于双向认证)
if certFilePath != "" && keyFilePath != "" {
cert, err := tls.LoadX509KeyPair(certFilePath, keyFilePath)
if err != nil {
return nil, err
}
config.Certificates = []tls.Certificate{cert}
}

// 如果配置了CA证书,则启用服务器证书验证
if CACertFilePath != "" {
config.InsecureSkipVerify = false
} else {
config.InsecureSkipVerify = true
log.Warnf("No CA certificate provided, using insecure TLS connection")
}

return config, nil
}
12 changes: 12 additions & 0 deletions internal/entry/isolation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package entry

type Isolation struct {
OriginAddr string
OriginUser string "Default"
TargetAddr string
TargetUser string "Default"
Prefix string
OriginIsCluster bool
TargetIsCLuster bool
BySync bool
}
Loading