From 6d8c591244e44919bad9fab1e2a5b0d9349cefac Mon Sep 17 00:00:00 2001 From: WentaoJin <2362804997@qq.com> Date: Fri, 12 Mar 2021 11:47:46 +0800 Subject: [PATCH] add online checker --- pkg/dbutil/common.go | 4 +++- pkg/diff/diff.go | 31 ++++++++++++++++++++++++------- sync_diff_inspector/config.go | 15 +++++++++++++++ sync_diff_inspector/diff.go | 10 ++++++++-- 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/pkg/dbutil/common.go b/pkg/dbutil/common.go index e00a817ed..e101abfe6 100644 --- a/pkg/dbutil/common.go +++ b/pkg/dbutil/common.go @@ -387,7 +387,9 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", ColumnName(col.Name.O))) } - query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;", + // TiDB EXPLAIN is wrong ,need tidb fixed + // FORCE PRIMARY INDEX, NOT better solution, Because maybe unique index + query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s FORCE INDEX(PRIMARY) WHERE %s;", strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange) log.Debug("checksum", zap.String("sql", query), zap.Reflect("args", args)) diff --git a/pkg/diff/diff.go b/pkg/diff/diff.go index e61688cf8..344c8cdf8 100644 --- a/pkg/diff/diff.go +++ b/pkg/diff/diff.go @@ -72,6 +72,9 @@ type TableDiff struct { // size of the split chunk ChunkSize int `json:"chunk-size"` + FailedRetryTimes int `json:"failed-retry-times"` + FailedRetrySleep int `json:"failed-retry-sleep"` + // sampling check percent, for example 10 means only check 10% data Sample int `json:"sample"` @@ -271,6 +274,8 @@ func (t *TableDiff) CheckTableData(ctx context.Context) (equal bool, err error) checkWg.Add(1) go func(j int) { defer checkWg.Done() + // check chunk data equal + // if chunk data checksum isn't consistent, then retry failedRetryTimes and sleep failedRetrySleep t.checkChunksDataEqual(ctx, t.Sample < 100 && !fromCheckpoint, checkWorkerCh[j], checkResultCh) }(i) } @@ -450,13 +455,25 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, update() if t.UseChecksum { - // first check the checksum is equal or not - equal, err = t.compareChecksum(ctx, chunk) - if err != nil { - return false, errors.Trace(err) - } - if equal { - return true, nil + // if chunk data checksum isn't consistent, then retry failedRetryTimes and sleep failedRetrySleep + for i := 0; i < (t.FailedRetryTimes + 1); i++ { + equal, err = t.compareChecksum(ctx, chunk) + if err != nil { + return false, errors.Trace(err) + } + if equal { + return true, nil + } else { + // sleep time and retry + log.Warn("schema table checksum failed, failed retry, please waiting", + zap.String("downstream schema", t.TargetTable.Schema), + zap.String("downstream table", t.TargetTable.Table), + zap.String("where", dbutil.ReplacePlaceholder(chunk.Where, chunk.Args)), + zap.Int("retry-counts", i), + zap.Duration("sleep", time.Duration(t.FailedRetrySleep)*time.Second)) + time.Sleep(time.Duration(t.FailedRetrySleep) * time.Second) + continue + } } } diff --git a/sync_diff_inspector/config.go b/sync_diff_inspector/config.go index ff03973a8..1b54b53b1 100644 --- a/sync_diff_inspector/config.go +++ b/sync_diff_inspector/config.go @@ -169,6 +169,12 @@ type Config struct { // size of the split chunk ChunkSize int `toml:"chunk-size" json:"chunk-size"` + // Chunk check failed, retry times + FailedRetryTimes int `toml:"failed-retry-times" json:"failed-retry-times"` + + // Chunk retry sleep + FailedRetrySleep int `toml:"failed-retry-sleep" json:"failed-retry-sleep"` + // sampling check percent, for example 10 means only check 10% data Sample int `toml:"sample-percent" json:"sample-percent"` @@ -299,6 +305,15 @@ func (c *Config) checkConfig() bool { return false } + if c.FailedRetryTimes <= 0 { + log.Error("failed-retry-times must greater than 0!") + return false + } + + if c.FailedRetrySleep <= 0 { + log.Error("failed-retry-sleep must greater than 0!") + return false + } if len(c.DMAddr) != 0 { u, err := url.Parse(c.DMAddr) if err != nil || u.Scheme == "" || u.Host == "" { diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index ba2cbcd2f..6a4ecbccc 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -39,6 +39,8 @@ type Diff struct { sourceDBs map[string]DBConfig targetDB DBConfig chunkSize int + failedRetryTimes int + failedRetrySleep int sample int checkThreadCount int useChecksum bool @@ -66,6 +68,8 @@ func NewDiff(ctx context.Context, cfg *Config) (diff *Diff, err error) { diff = &Diff{ sourceDBs: make(map[string]DBConfig), chunkSize: cfg.ChunkSize, + failedRetryTimes: cfg.FailedRetryTimes, + failedRetrySleep: cfg.FailedRetrySleep, sample: cfg.Sample, checkThreadCount: cfg.CheckThreadCount, useChecksum: cfg.UseChecksum, @@ -598,8 +602,10 @@ func (df *Diff) Equal() (err error) { } td := &diff.TableDiff{ - SourceTables: sourceTables, - TargetTable: targetTableInstance, + SourceTables: sourceTables, + TargetTable: targetTableInstance, + FailedRetryTimes: df.failedRetryTimes, + FailedRetrySleep: df.failedRetrySleep, IgnoreColumns: table.IgnoreColumns,